读书人

读书笔记二:Hadoop组件-(2)剖析Map

发布时间: 2013-01-23 10:44:49 作者: rapoo

读书笔记2:Hadoop组件-(2)剖析MapReduce程序

?

3,剖析MapReduce程序

<1>hadoop的数据类型

实现Writable接口额类可以是值,而实现WritableComparable接口的类既可以是键也可以是值。

以下这些是常用的数据类型,均用于实现WritableComparable借口:

BooleanWritable

ByteWritable

DoubleWritable

FloatWritable

IntWritable

LongWritable

Text使用UTF8格式的文本封装

NullWritable无键值时的占位符

也可以自定义数据类型,但要实现Writable接口或者WritableComparable接口。

<2>Mapper

一个类要作为mapper,需要继承MapReduceBase基类和实现Mapper借口。

mapper和reducer的基类均为MapReduceBase,它包含类的构造和解构方法:

void?configure(JobConf?job);该函数提取XML配置文件或者应用程序主类中的参数,在数据处理之前调用该函。

void?close();作为map任务结束前的最后一个操作,该函数完成所有的结尾工作,如关闭数据库、打开文件等。

Mapper借口负责数据处理阶段,采用的形式是Mapper<K1,V1,K2,V2>java泛型,键类和值类分别实现WritableComparable和Writable借口。

Mapper只有一个方法map用于处理一个单独的键值对。

void?map(K1?key,V1?value,OutputCollector<K2,V2>?output,Reporter?reporter)

该函数处理一个给定的键值对(K1,V1),生成一个键值对(K2,V2)的列表。

OutputCollector接收这个映射过程的输入。

Reporter?可提供对Mapper相关附加信息的记录,形成任务进度。

?

Hadoop预定义的Mapper的实现:

IdentityMapper<K,V>

实现Mapper<K,V,K,V>就输入直接映射到输出

InverseMapper<K,V>

实现Mapper<K,V,K,V>发转键值对

RegexMapper<K>

实现Mapper<K,Text,Text,LongWritable>,为每个常规表达式的匹配项生成一个(mathc,1)对

TokenCountMapper<K>

实现Mapper<K,Text,Text,LongWritable>,当输入的值为分词时,生成一个(token,1)对

<3>Reducer

reducer的实现也必须继承MapReduceBase类和实现Reducer接口。

Reducer接口只有一个方法,及时reduce:

Void?reduce?(K2,key,Iterator<V2>?values,OutputCollector<K3,V3>?output,Reporter?r)

Reducer任务接收来自各个mapper的输出时,按照键对输入数据进行排序,并将相同的键的值归并,然后调用reduce函数,并通过迭代处理那些与指定键相关联的值,生成一个列表<K3,V3>

OutputCollector接收reduce阶段的输出,并写入输出文件。

Reporter?可提供对Mapper相关附加信息的记录,形成任务进度。

?

Hadoop预定义的Reducer的实现:

IdentityReducer<K,V>

实现Reducer<K,V,K,V>,将输入直接映射到输出

LongSumReducer<K>

实现<K,LongWritable,K,LongWritable>,计算与给定键相对应的所有值的和

<4>Partitioner:重定向Mapper输出

其实在mapper和reducer之间有一个非常重要的环节,就是将mapper的结果输出给不同的reducer,这就是Partitioner的工作:(此工作常被称为:洗牌)

一般reducer是多个,那么mapper应该将键值的输出给谁呢?Hadoop默认的机制是对键进行散列来确定reducer,Hadoop通过HashPartitioner类强制执行此策略。但是有的时候HashPartitioner会使得程序出错,即他的分发策略不符合实际的要求,那么此时就需要我们定制自己的Partitioner:(例如针对自定义的数据类型Edge)

public?class?EdgePartitioner?implements?Partitioner<Edge,Writable>

{

@override

public?int?getPartition(Edge?key,Writable?value,int?numPartitions)

{

return?key.getDepartureNode().hashCode()?%?numPartitions;

}

@override

public?void?configure(JobConf?conf)

{

}

}

自己定制的Partitioner只需要实现getPartition方法和configure方法即可。前者将Hadoop对作业的配置应用在Partitioner上,而后者返回一个介于0和reducer任务数之间的整数,指向键值对要发送到的reducer。

Partitioner的形象的表达:(如下图)

读书笔记二:Hadoop组件-(2)剖析MapReduce程序

<5>Combiner:本地reduce

在分发mapper之前先做一下“本地reduce”,也被成为合并,后面详细讲述。

<6>预定义Mapper和Reducer

使用预定义Mapper和Reducer改写前面的统计单词数量的程序:WordCount2.java

public?class?WordCount2?{

public?static?void?main(String?args[])

{

JobClient?client?=?new?JobClient();

JobConf?conf?=??new?JobConf(WordCount2.class);

FileInputFormat.addInputPath(conf,new?Path(args[0]));

FileOutputFormat.setOutputPath(conf,new?Path(args[1]));

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(LongWritable.class);

conf.setMapperClass(TokenCountMapper.class);//hadoop自己的TokenCountMapper

conf.setCombinerClass(LongSumReducer.class);//hadoop自己的LongSumReducer

conf.setReducerClass(LongSumReducer.class);

client.setConf(conf);

try

{

JobClient.runJob(conf);

}catch(Exception?e)

{

e.printStackTrace();

}

}

}

读书人网 >互联网

热点推荐