MapReuce 编程总结-多MapReduce执行
学习hadoop,必不可少的就是写MapReduce程序,当然,对于简单的分析程序,我们只需一个MapReduce就能搞定,这里就不提单MapReuce的情况了,网上例子很多,大家可以百度Google一下。对于比较复杂的分析程序,我们可能需要多个Job或者多个Map或者Reduce进行分析计算。
多Job或者多MapReduce的编程形式有以下几种:
1、迭代式MapReduce
MapReduce迭代方式,通常是前一个MapReduce的输出作为下一个MapReduce的输入,最终可只保留最终结果,中间数据可以删除或保留,根据业务需要自己决定
示例代码如下:
....private NutchTool currentTool = null;....private Map<String, Object> runTool(Class<? extends NutchTool> toolClass,Map<String, Object> args) throws Exception {currentTool = (NutchTool) ReflectionUtils.newInstance(toolClass,getConf());return currentTool.run(args);}...@Overridepublic Map<String, Object> run(Map<String, Object> args) throws Exception {results.clear();status.clear();String crawlId = (String) args.get(Nutch.ARG_CRAWL);if (crawlId != null) {getConf().set(Nutch.CRAWL_ID_KEY, crawlId);}String seedDir = null;String seedList = (String) args.get(Nutch.ARG_SEEDLIST);if (seedList != null) { // takes precedenceString[] seeds = seedList.split("\\s+");// create tmp. dirString tmpSeedDir = getConf().get("hadoop.tmp.dir") + "/seed-"+ System.currentTimeMillis();FileSystem fs = FileSystem.get(getConf());Path p = new Path(tmpSeedDir);fs.mkdirs(p);Path seedOut = new Path(p, "urls");OutputStream os = fs.create(seedOut);for (String s : seeds) {os.write(s.getBytes());os.write('\n');}os.flush();os.close();cleanSeedDir = true;seedDir = tmpSeedDir;} else {seedDir = (String) args.get(Nutch.ARG_SEEDDIR);}Integer depth = (Integer) args.get(Nutch.ARG_DEPTH);if (depth == null)depth = 1;boolean parse = getConf().getBoolean(FetcherJob.PARSE_KEY, false);String solrUrl = (String) args.get(Nutch.ARG_SOLR);int onePhase = 3;if (!parse)onePhase++;float totalPhases = depth * onePhase;if (seedDir != null)totalPhases++;float phase = 0;Map<String, Object> jobRes = null;LinkedHashMap<String, Object> subTools = new LinkedHashMap<String, Object>();status.put(Nutch.STAT_JOBS, subTools);results.put(Nutch.STAT_JOBS, subTools);// inject phaseif (seedDir != null) {status.put(Nutch.STAT_PHASE, "inject");jobRes = runTool(InjectorJob.class, args);if (jobRes != null) {subTools.put("inject", jobRes);}status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);if (cleanSeedDir && tmpSeedDir != null) {LOG.info(" - cleaning tmp seed list in " + tmpSeedDir);FileSystem.get(getConf()).delete(new Path(tmpSeedDir), true);}}if (shouldStop) {return results;}// run "depth" cyclesfor (int i = 0; i < depth; i++) {status.put(Nutch.STAT_PHASE, "generate " + i);jobRes = runTool(GeneratorJob.class, args);if (jobRes != null) {subTools.put("generate " + i, jobRes);}status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);if (shouldStop) {return results;}status.put(Nutch.STAT_PHASE, "fetch " + i);jobRes = runTool(FetcherJob.class, args);if (jobRes != null) {subTools.put("fetch " + i, jobRes);}status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);if (shouldStop) {return results;}if (!parse) {status.put(Nutch.STAT_PHASE, "parse " + i);jobRes = runTool(ParserJob.class, args);if (jobRes != null) {subTools.put("parse " + i, jobRes);}status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);if (shouldStop) {return results;}}status.put(Nutch.STAT_PHASE, "updatedb " + i);jobRes = runTool(DbUpdaterJob.class, args);if (jobRes != null) {subTools.put("updatedb " + i, jobRes);}status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);if (shouldStop) {return results;}}if (solrUrl != null) {status.put(Nutch.STAT_PHASE, "index");jobRes = runTool(SolrIndexerJob.class, args);if (jobRes != null) {subTools.put("index", jobRes);}}return results;}