mapReduce学习
MapReduce论文中给出了这样一个例子:在一个文档集合中统计每个单词出现的次数。
Map操作的输入是每一篇文档,将输入文档中每一个单词的出现输出到中间文件中去。
map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w, “1″);
比如我们有两篇文档,内容分别是
A - “I love programming”
B - “I am a blogger, you are also a blogger”。
B文档经过Map运算后输出的中间文件将会是:
I,1
am,1
a,1
blogger,1
you,1
are,1
a,1
blogger,1
Reduce操作的输入是单词和出现次数的序列。用上面的例子来说,就是 (“I”, [1, 1]), (“love”, [1]), (“programming”, [1]), (“am”, [1]), (“a”, [1,1]) 等。然后根据每个单词,算出总的出现次数。
reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));
最后输出的最终结果就会是:(“I”, 2″), (“a”, 2″)……
实际的执行顺序是:
.MapReduce Library将Input分成M份。这里的Input Splitter也可以是多台机器并行Split。.Master将M份Job分给Idle状态的M个worker来处理;.对于输入中的每一个<key, value> pair 进行Map操作,将中间结果Buffer在Memory里;.定期的(或者根据内存状态),将Buffer中的中间信息Dump到本地磁盘上,并且把文件信息传回给Master(Master需要把这些信息发送给Reduce worker)。这里最重要的一点是,在写磁盘的时候,需要将中间文件做Partition(比如R个)。拿上面的例子来举例,如果把所有的信息存到一个文件,Reduce worker又会变成瓶颈。我们只需要保证相同Key能出现在同一个Partition里面就可以把这个问题分解。.R个Reduce worker开始工作,从不同的Map worker的Partition那里拿到数据(read the buffered data from the local disks of the map workers),用key进行排序(如果内存中放不下需要用到外部排序 external sort)。很显然,排序(或者说Group)是Reduce函数之前必须做的一步。 这里面很关键的是,每个Reduce worker会去从很多Map worker那里拿到X(0<X<R) Partition的中间结果,这样,所有属于这个Key的信息已经都在这个worker上了。.Reduce worker遍历中间数据,对每一个唯一Key,执行Reduce函数(参数是这个key以及相对应的一系列Value)。.执行完毕后,唤醒用户程序,返回结果(最后应该有R份Output,每个Reduce Worker一个)。
可见,这里的分—ivide)体现在两步,分别是将输入分成M份,以及将Map的中间结果分成R份。将输入分开通常很简单,Map的中间结果通常用”hash(key) mod R”这个结果作为标准,保证相同的Key出现在同一个Partition里面。当然,使用者也可以指定自己的Partition Function,比如,对于Url Key,如果希望同一个Host的URL出现在同一个Partition,可以用”hash(Hostname(urlkey)) mod R”作为Partition Function。
对于上面的例子来说,每个文档中都可能会出现成千上万的 (“the”, 1)这样的中间结果,琐碎的中间文件必然导致传输上的损失。因此,MapReduce还支持用户提供Combiner Function。这个函数通常与Reduce Function有相同的实现,不同点在于Reduce函数的输出是最终结果,而Combiner函数的输出是Reduce函数的某一个输入的中间文件。