Hadoop In Action 第四章(1)
第四章 编写基本的MapReduce程序?
?
图 4.1 将专利引用数据的一部分看作一张图。每个专利显示为一个顶点(节点),而每个引用是一条有向边(箭头)。?
?
第一行包含了一些属性的名称,这只有对专利专家有意义。尽管我们不了解所有的属性,了解它们中的一部分仍然是十分有用的。表 4.1描述了前10行。
?
表 4.1 专利描述数据集前10个属性的定义?
属性名称内容PATENT专利号GYEAR授权年份GDATE授权日期, 从1960年1月1日算起的日期数APPYEAR申请日期(只对1967年之后授权的专利有效)COUNTRY第一发明人的国家POSTATE第一发明人所在的州(如果国家是美国)ASSIGNEE专利受让人的数字标识(例如,专利拥有者)ASSCODE一位数(1-9)表示的受让人类型。 (受让人类型包括美国个人,美国政府,美国组织,非美国个人,等等)CLAIMS索赔金额(只对1975年之后授权的专利有效)NCLASS三位数表示的专利类别
?
既然我们已经有了两个专利数据集,那么让我们编写Hadoop程序来处理这些数据吧。
?
4.2 建立MapReduce程序的基本模板
?
我们的大多数MapReduce程序是简短的并且是在一个模板上进行变化的。担负编写一个新的MapReduce程序时,您通常需要在一个现有的MapReduce程序上进行修改,直到它成为您想要的样子。在这个小节里,我们将编写第一个MapReduce程序并解释它的不同部分。这个程序可以作为将来的MapReduce程序的模板。我们的第一个程序将把专利引用数据作为输入,并将它反转。对每个专利,我们想要找出引用它的专利并将它们分组。我们的输出如下:
public class MyJob extends Configured implements Tool {public static class MapClass extends MapReduceBase implementsMapper<Text, Text, Text, Text> {public void map(Text key, Text value,OutputCollector<Text, Text> output, Reporter reporter)throws IOException {output.collect(value, key);}}public static class Reduce extends MapReduceBase implementsReducer<Text, Text, Text, Text> {public void reduce(Text key, Iterator<Text> values,OutputCollector<Text, Text> output, Reporter reporter)throws IOException {String csv = "";while (values.hasNext()) {if (csv.length() > 0)csv += ",";csv += values.next().toString();}output.collect(key, new Text(csv));}}public int run(String[] args) throws Exception {Configuration conf = getConf();JobConf job = new JobConf(conf, MyJob.class);Path in = new Path(args[0]);Path out = new Path(args[1]);FileInputFormat.setInputPaths(job, in);FileOutputFormat.setOutputPath(job, out);job.setJobName("MyJob");job.setMapperClass(MapClass.class);job.setReducerClass(Reduce.class);job.setInputFormat(KeyValueTextInputFormat.class);job.setOutputFormat(TextOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.set("key.value.separator.in.input.line", ",");JobClient.runJob(job);return 0;}public static void main(String[] args) throws Exception {int res = ToolRunner.run(new Configuration(), new MyJob(), args);System.exit(res);}}
我们的惯例是使用单一的类,如这个例子里的MyJob,完全地定义每个MapReduce任务。Hadoop需要将Mapper和Reducer作为它们自己的静态类。这些类很小,并且我们的模板将它们作为MyJob类的内部类。但是请记住,这些内部类是独立的,并且不与MyJob类交互。在任务执行的过程中,不同Java虚拟机上的多个节点将复制并运行Mapper和Reducer,而job类剩下的部分只在客户端机器上运行。
我们先探讨一下Mapper类和Reducer类。不考虑这些类的话,MyJob类的基本结构如下:?
public class MyJob extends Configured implements Tool {public int run(String[] args) throws Exception {Configuration conf = getConf();JobConf job = new JobConf(conf, MyJob.class);Path in = new Path(args[0]);Path out = new Path(args[1]);FileInputFormat.setInputPaths(job, in);FileOutputFormat.setOutputPath(job, out);job.setJobName("MyJob");job.setMapperClass(MapClass.class);job.setReducerClass(Reduce.class);job.setInputFormat(KeyValueTextInputFormat.class);job.setOutputFormat(TextOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.set("key.value.separator.in.input.line", ",");JobClient.runJob(job);return 0;}public static void main(String[] args) throws Exception {int res = ToolRunner.run(new Configuration(), new MyJob(), args);System.exit(res);}}? 这个骨架的核心在run()方法中,也可以把它称为driver。driver实例化、配置并将一个被命名为job的JobConf传递给JobClient。用runJob()来启动MapReduce job。(JobClient类会与JobTracker交互以通过集群启动job。)JobConf对象包含了运行job所必需的所有配置参数。driver需要需要指定job的输入路径、输出路径,Mapper类和Reducer类——每个job的基础参数。此外,每个job将重置job的默认属性,如InputFOrmat,OutputFormat等等。可以调用JobConf对象的set()方法来设置配置参数。一旦您将JobConf对象传递给JobClient.runJob(),它将会被作为job的总体规划(master plan)。它将会称为如何运行job的蓝图。
JobConf对象可能会有很多参数,但我们不会在driver中设置所有参数。Hadoop安装的配置文件是一个号的起点。当通过命令行来启动一个Job时,用户可能会想要传递其余的参数来修改job的配置。driver自己可以定义它自己的命令,并处理用户输入的参数,使得用户可以修改配置参数。由于这项任务将会需要频繁地进行,Hadoop框架提供了ToolRunner、Tool和Configured来简化它。当与上面的MyJob骨架一起使用时,这些类将会使得我们的job理解用户定义的,并且被GenericOptionsParser所支持的选项。例如,我们之前使用这个命令行来执行MyJob类:
bin/hadoop jar playground/MyJob.jar MyJob input/cite75_99.txt output?
如果我们只是想运行job并查看mapper的输出(可能在您进行调试的时候需要这么做),我们可以使用如下选项将reducer的数量设置为0:
bin/hadoop jar playground/MyJob.jar MyJob -D mapred.reduce.tasks=0? input/cite75_99.txt output ?
这在我们的程序并不显式地解释-D选项时也仍然是有效的。通过使用ToolRunner,MyJob可以自动支持表4.2中的选项。通过使用ToolRunner,MyJob将自动支持表1.2中列出的选项。?
?
表 4.2 GenericOptionsParser支持的选项?
选项描述-conf <configurationfile>指定一个配置文件。-D <property=value>设置JobConf的属性。-fs <local|namenode:port>指定一个NameNode,可以为“local”。-jt <local|jobtracker:port>指定一个JobTracker。-files <list of fi les>
指定一个用逗号分隔的文件列表,这些文件将在MapReduce job中被用到。
这些文件将被自动地分配到所有的任务节点上,使得在本地可以使用。
-libjars <list of jars>指定一个用逗号分隔的jar文件的列表,它们被包含于所有的任务JVM的classpath中。-archives <list of archives>指定一个用逗号分隔的压缩文件列表,将在所有节点上被解压??
我们的模板的惯例是将Mapper类命名为MapClass,并将Reducer类命名为Reduce。更对称的命名方法是将Mapper类命名为Map,但Java已经有一个名为Map的类(接口)了。Mapper和Reducer都继承自MapReduceBase,这个基类提供了这两个接口所需要的configure()和close()方法(但没有进行任何操作)。? 我们使用configure()和close()方法来建立map(reduce)任务。除非需要使用更高级的job,否则我们不需要覆盖它们。
Mapper类和Reducer类的方法签名如下:
public static class MapClass extends MapReduceBase implementsMapper<K1, V1, K2, V2> {public void map(K1 key, V1 value, OutputCollector<K2, V2> output,Reporter reporter) throws IOException {}}public static class Reduce extends MapReduceBase implementsReducer<K2, V2, K3, V3> {public void reduce(K2 key, Iterator<V2> values,OutputCollector<K3, V3> output, Reporter reporter)throws IOException {}}?Mapper类和Reducer类的核心操作分别是map()和reduce()方法。每个对map()方法的调用都需要提供类型分别为K1和V1的键/值对。这个键/值对是由mapper生成的,并且通过OutputCollector对象的collect() 方法输出。在您的map()方法中的某处,您需要调用
output.collect((K2) k, (V2) v);?
每个对reducer的reduce()方法的调用都需要提供类型为K2的键和类型为V2的值的列表。请注意这与在Mapper中使用的K2和V2必须是相同的。reduce()方法可能会有一个用于遍历类型为V2的值的循环。
while (values.hasNext()) {V2? v = values.next();...}?reduce()方法同时也有一个用于收集键/值输出的OutputCollector,类型是K3/V3。在reduce()方法中的某处您需要调用output.collect((K3) k, (V3) v);除了在Mapper和Reducer中使用一致的K2和V2类型,您还需要确保Mapper和Reducer中使用的键/值类型与driver中设置的输入格式、输出键类型和值类型是一致的。使用KeyValueTextInputFormat 意味着K1和V1都需要是Text类型的。driver需要分别用K2类和V2类来调用setOutputKeyClass()和setOutputValueClass()。??
最后,键和值的类型需要是Writable的子类,以确保Hadoop的序列化接口可以将数据分发到分布式集群中。事实上,键类型实现了WritableComparable,是Writable的子接口。键类型需要额外地支持compareTo()方法,因为键需要在MapReduce框架中的多个地方进行排序。
高手,你自己一点点翻译的啊?。。。我也正在看。