读书人

nutch源码翻阅(9)-Fetch

发布时间: 2013-07-08 14:13:00 作者: rapoo

nutch源码阅读(9)-Fetch
...................................................... for (i = 0; i < depth; i++) { // generate new segment Path[] segs = generator.generate(crawlDb, segments, -1, topN, System .currentTimeMillis()); if (segs == null) { LOG.info("Stopping at depth=" + i + " - no more URLs to fetch."); break; } //通过之前生成的segments抓取 fetcher.fetch(segs[0], threads); // fetch it if (!Fetcher.isParsing(job)) { parseSegment.parse(segs[0]); // parse it, if needed } crawlDbTool.update(crawlDb, segs, true, true); // update crawldb }......................................................

?

?

?

//threads默认通过fetcher.threads.fetch设置,也可通过参数传递 public void fetch(Path segment, int threads)    throws IOException {    //设置agentName    //在http.agent.name中设置,会检查在http.robots.agents中是否存在    checkConfiguration();    //记录开始时间,以及segment路径    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");    long start = System.currentTimeMillis();    if (LOG.isInfoEnabled()) {      LOG.info("Fetcher: starting at " + sdf.format(start));      LOG.info("Fetcher: segment: " + segment);    }    // set the actual time for the timelimit relative    // to the beginning of the whole job and not of a specific task    // otherwise it keeps trying again if a task fails    long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);    if (timelimit != -1) {      timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);      LOG.info("Fetcher Timelimit set for : " + timelimit);      getConf().setLong("fetcher.timelimit", timelimit);    }    // Set the time limit after which the throughput threshold feature is enabled    timelimit = getConf().getLong("fetcher.throughput.threshold.check.after", 10);    timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);    getConf().setLong("fetcher.throughput.threshold.check.after", timelimit);    int maxOutlinkDepth = getConf().getInt("fetcher.follow.outlinks.depth", -1);    if (maxOutlinkDepth > 0) {      LOG.info("Fetcher: following outlinks up to depth: " + Integer.toString(maxOutlinkDepth));      int maxOutlinkDepthNumLinks = getConf().getInt("fetcher.follow.outlinks.num.links", 4);      int outlinksDepthDivisor = getConf().getInt("fetcher.follow.outlinks.depth.divisor", 2);      int totalOutlinksToFollow = 0;      for (int i = 0; i < maxOutlinkDepth; i++) {        totalOutlinksToFollow += (int)Math.floor(outlinksDepthDivisor / (i + 1) * maxOutlinkDepthNumLinks);      }      LOG.info("Fetcher: maximum outlinks to follow: " + Integer.toString(totalOutlinksToFollow));    }    JobConf job = new NutchJob(getConf());    job.setJobName("fetch " + segment);    //配置线程数    job.setInt("fetcher.threads.fetch", threads);    job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());    // for politeness, don't permit parallel execution of a single task    job.setSpeculativeExecution(false);    //配置输入    FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));    job.setInputFormat(InputFormat.class);    //配置MapRunner    job.setMapRunnerClass(Fetcher.class);    //配置输出    FileOutputFormat.setOutputPath(job, segment);    job.setOutputFormat(FetcherOutputFormat.class);    job.setOutputKeyClass(Text.class);    job.setOutputValueClass(NutchWritable.class);    JobClient.runJob(job);    long end = System.currentTimeMillis();    LOG.info("Fetcher: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));  }

?

?

public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,                                      final JobConf job,                                      final String name,                                      final Progressable progress) throws IOException {        //定义输出目录    Path out = FileOutputFormat.getOutputPath(job);    //定义抓取的输出目录    final Path fetch =      new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);    //定义抓取内容的输出目录    final Path content =      new Path(new Path(out, Content.DIR_NAME), name);    //定义压缩类型    final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job);    //定义输出对象    final MapFile.Writer fetchOut =      new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,          compType, progress);        return new RecordWriter<Text, NutchWritable>() {        private MapFile.Writer contentOut;        private RecordWriter<Text, Parse> parseOut;        {          //如果配置"fetcher.store.content"为true,则生成content          if (Fetcher.isStoringContent(job)) {            contentOut = new MapFile.Writer(job, fs, content.toString(),                                            Text.class, Content.class,                                            compType, progress);          }          //如果配置"fetcher.parse"为true,而抽取出外连接          if (Fetcher.isParsing(job)) {            parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress);          }        }        public void write(Text key, NutchWritable value)          throws IOException {          Writable w = value.get();          //?对对象类型进行判断,调用相应的抽象输出,写到不同的文件中去?          if (w instanceof CrawlDatum)            fetchOut.append(key, w);          else if (w instanceof Content)            contentOut.append(key, w);          else if (w instanceof Parse)            parseOut.write(key, (Parse)w);        }        public void close(Reporter reporter) throws IOException {          fetchOut.close();          if (contentOut != null) {            contentOut.close();          }          if (parseOut != null) {            parseOut.close(reporter);          }        }      };  }  

?

?

?

?

读书人网 >开源软件

热点推荐