读书人

MapReduce高级编程自定义InputFor

发布时间: 2012-07-18 12:05:40 作者: rapoo

MapReduce高级编程——自定义InputFormat——深入理解

0、本文承接上文?MapReduce高级编程——自定义InputFormat

import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;public class Point3DMapper extends Mapper<Text, Point3D, Text, Point3D>{protected void map(Text key, Point3D value, Context context) throws IOException, InterruptedException{context.write(key, value);}}

?

? ?进入Mapper源代码可以看到

?

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {  public class Context     extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {    public Context(Configuration conf, TaskAttemptID taskid,                   RecordReader<KEYIN,VALUEIN> reader,                   RecordWriter<KEYOUT,VALUEOUT> writer,                   OutputCommitter committer,                   StatusReporter reporter,                   InputSplit split) throws IOException, InterruptedException {      super(conf, taskid, reader, writer, committer, reporter, split);    }  }    /**   * Called once at the beginning of the task.   */  protected void setup(Context context                       ) throws IOException, InterruptedException {    // NOTHING  }  /**   * Called once for each key/value pair in the input split. Most applications   * should override this, but the default is the identity function.   */  @SuppressWarnings("unchecked")  protected void map(KEYIN key, VALUEIN value,                      Context context) throws IOException, InterruptedException {    context.write((KEYOUT) key, (VALUEOUT) value);  }
?

?

即,/**

* Text      -> KEYIN* Point3D  -> VALUEIN* Text       -> KEYOUT* Point3D  -> VALUEOUT**/Mapper<Text, Point3D, Text, Point3D>

?

而,// Text -> ball etc. -> KEYOUT

// value -> .5, 12.7, 9.0 etc. -> VALUEOUTcontext.write(key, value);

?

同理,我们查看Reducer源代码,可以看到,Mapper的KEYOUT类型和VALUEOUT类型,必须对应Reducer的KEYIN类型和VLUEIN类型。

?

public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {  public class Context     extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {    public Context(Configuration conf, TaskAttemptID taskid,                   RawKeyValueIterator input,                    Counter inputKeyCounter,                   Counter inputValueCounter,                   RecordWriter<KEYOUT,VALUEOUT> output,                   OutputCommitter committer,                   StatusReporter reporter,                   RawComparator<KEYIN> comparator,                   Class<KEYIN> keyClass,                   Class<VALUEIN> valueClass                   ) throws IOException, InterruptedException {      super(conf, taskid, input, inputKeyCounter, inputValueCounter,            output, committer, reporter,             comparator, keyClass, valueClass);    }  }  /**   * Called once at the start of the task.   */  protected void setup(Context context                       ) throws IOException, InterruptedException {    // NOTHING  }  /**   * This method is called once for each key. Most applications will define   * their reduce class by overriding this method. The default implementation   * is an identity function.   */  @SuppressWarnings("unchecked")  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context                        ) throws IOException, InterruptedException {    for(VALUEIN value: values) {      context.write((KEYOUT) key, (VALUEOUT) value);    }  }

?

?

本文的目的是因为有些时候文档不是很清楚,而最好的方法是看源代码。“源码之前,了无秘密”。

?

对于MapReduce的细致执行流程,我推荐看例子经典,博客作者一流的Map Reduce the Free Lunch is not over?。相信看完会有所收获!

?

?

读书人网 >编程

热点推荐