读书人

nutch源码翻阅(10)-Fetch

发布时间: 2013-07-01 12:33:04 作者: rapoo

nutch源码阅读(10)-Fetch

private static class QueueFeeder extends Thread {    private RecordReader<Text, CrawlDatum> reader;        private FetchItemQueues queues;   //生产者和消费者的共享序列,分层,一层对应一个host    private int size;    private long timelimit = -1;    public QueueFeeder(RecordReader<Text, CrawlDatum> reader,        FetchItemQueues queues, int size) {      this.reader = reader;      this.queues = queues;      this.size = size;      this.setDaemon(true);      this.setName("QueueFeeder");    }    public void setTimeLimit(long tl) {      timelimit = tl;    }    public void run() {      boolean hasMore = true;      int cnt = 0;      int timelimitcount = 0;      while (hasMore) {    // 这里判断是否设置了这个过滤机制,如果设置了,判断相前时间是否大于这个timelimit,如果大于timelimit,过滤所有的FetchItem         if (System.currentTimeMillis() >= timelimit && timelimit != -1) {          // enough .. lets' simply          // read all the entries from the input without processing them          try {            Text url = new Text();            CrawlDatum datum = new CrawlDatum();            hasMore = reader.next(url, datum);            timelimitcount++;          } catch (IOException e) {            LOG.error("QueueFeeder error reading input, record " + cnt, e);            return;          }          continue;        }        int feed = size - queues.getTotalSize();        if (feed <= 0) {          // queues are full - spin-wait until they have some free space          try {            Thread.sleep(1000);          } catch (Exception e) {};          continue;        } else {          LOG.debug("-feeding " + feed + " input urls ...");          while (feed > 0 && hasMore) {            try {              Text url = new Text();              CrawlDatum datum = new CrawlDatum();              hasMore = reader.next(url, datum);              if (hasMore) {                queues.addFetchItem(url, datum);                cnt++; // 统计总数                 feed--; // 剩余队列空间减1               }            } catch (IOException e) {              LOG.error("QueueFeeder error reading input, record " + cnt, e);              return;            }          }        }      }      LOG.info("QueueFeeder finished: total " + cnt + " records + hit by time limit :"          + timelimitcount);    }  }

??这个类主要负责向队列中放数据。

?

public void run() {      activeThreads.incrementAndGet(); // count threads      FetchItem fit = null;      try {        while (true) {          fit = fetchQueues.getFetchItem();          if (fit == null) {        //如果生产者还存活,或者队列里还有数据,等待            if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {              LOG.debug(getName() + " spin-waiting ...");              // spin-wait.              spinWaiting.incrementAndGet();              try {                Thread.sleep(500);              } catch (Exception e) {}                spinWaiting.decrementAndGet();              continue;            } else {              //认为已经处理完,结束              // all done, finish this thread              return;            }          }                    lastRequestStart.set(System.currentTimeMillis());          //获得url          Text reprUrlWritable =            (Text) fit.datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);          if (reprUrlWritable == null) {            reprUrl = fit.url.toString();          } else {            reprUrl = reprUrlWritable.toString();          }          try {            // fetch the page            redirecting = false;            redirectCount = 0;            do {              if (LOG.isInfoEnabled()) { LOG.info("fetching " + fit.url); }              if (LOG.isDebugEnabled()) {                LOG.debug("redirectCount=" + redirectCount);              }              redirecting = false;              //从这个url中分析出所使用的协议              Protocol protocol = this.protocolFactory.getProtocol(fit.url.toString());              //根据对应的协议处理              RobotRules rules = protocol.getRobotRules(fit.url, fit.datum);              //不符合规则,过滤掉              if (!rules.isAllowed(fit.u)) {                // unblock                fetchQueues.finishFetchItem(fit, true);                if (LOG.isDebugEnabled()) {                  LOG.debug("Denied by robots.txt: " + fit.url);                }                output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);                reporter.incrCounter("FetcherStatus", "robots_denied", 1);                continue;              }              //如果delayTime>maxCrawlDelay 过滤掉              if (rules.getCrawlDelay() > 0) {                if (rules.getCrawlDelay() > maxCrawlDelay) {                  // unblock                  fetchQueues.finishFetchItem(fit, true);                  LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping");                  output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);                  reporter.incrCounter("FetcherStatus", "robots_denied_maxcrawldelay", 1);                  continue;                } else {                  FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);                  fiq.crawlDelay = rules.getCrawlDelay();                }              }              //进行抓取              ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum);              //抓取状态              ProtocolStatus status = output.getStatus();              //抓取内容              Content content = output.getContent();              ParseStatus pstatus = null;              // unblock queue              fetchQueues.finishFetchItem(fit);              String urlString = fit.url.toString();              //计数              reporter.incrCounter("FetcherStatus", status.getName(), 1);              //根据状态执行//              如果状态为WOULDBLOCK,那就进行retry,把当前url放加FetchItemQueues中,进行重试//              如果是MOVED或者TEMP_MOVED,这时这个网页可以被重定向了,对其重定向的内容进行解析,得到重定向的网址,这时要生成一个新的FetchItem,根据其QueueID放到相应的队列的inProgress集合中,然后再对这个重定向的网页进行抓取//              如果状态是EXCEPTION,对当前url所属的FetchItemQueue进行检测,看其异常的网页数有没有超过最大异常网页数,如果大于,那就清空这个队列,认为这个队列中的所有网页都有问题。//              如果状态是RETRY或者是BLOCKED,那就输出CrawlDatum,将其状态设置成STATUS_FETCH_RETRY,在下一轮进行重新抓取//              如果状态是GONE,NOTFOUND,ACCESS_DENIED,ROBOTS_DENIED,那就输出CrawlDatum,设置其状态为STATUS_FETCH_GONE,可能在下一轮中就不进行抓取了,//              如果状态是NOTMODIFIED,那就认为这个网页没有改变过,那就输出其CrawlDatum,将其状态设成成STATUS_FETCH_NOTMODIFIED.//              如果所有状态都没有找到,那默认输出其CrawlDatum,将其状态设置成STATUS_FETCH_RETRY,在下一轮抓取中再重试              switch(status.getCode()) {              case ProtocolStatus.WOULDBLOCK:                // retry ?            // 重试                              fetchQueues.addFetchItem(fit);                break;                             case ProtocolStatus.SUCCESS:        // got a page             //获得页面                pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth);                updateStatus(content.getContent().length);                //如果是ParseStatus.SUCCESS_REDIRECT                if (pstatus != null && pstatus.isSuccess() &&                        pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {                  //获得跳转url                  String newUrl = pstatus.getMessage();                  int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);                  //跳转                  Text redirUrl =                    handleRedirect(fit.url, fit.datum,                                   urlString, newUrl,                                   refreshTime < Fetcher.PERM_REFRESH_TIME,                                   Fetcher.CONTENT_REDIR);                  //获得内容不为空                  if (redirUrl != null) {                 //生成对应的CrawlDatum                    CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,                        fit.datum.getFetchInterval(), fit.datum.getScore());                    // transfer existing metadata to the redir                    newDatum.getMetaData().putAll(fit.datum.getMetaData());                    scfilters.initialScore(redirUrl, newDatum);                    if (reprUrl != null) {                      newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,                          new Text(reprUrl));                    }                    //生成对应的FetchItem                    fit = FetchItem.create(redirUrl, newDatum, queueMode);                    if (fit != null) {                    //放入队列中待抓取                      FetchItemQueue fiq =                        fetchQueues.getFetchItemQueue(fit.queueID);                      fiq.addInProgressFetchItem(fit);                    } else {                      //跳转失败,计数                      // stop redirecting                      redirecting = false;                      reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect", 1);                    }                  }                }                break;              case ProtocolStatus.MOVED:         // redirect              case ProtocolStatus.TEMP_MOVED:                int code;                boolean temp;                if (status.getCode() == ProtocolStatus.MOVED) {                  code = CrawlDatum.STATUS_FETCH_REDIR_PERM;                  temp = false;                } else {                  code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;                  temp = true;                }                output(fit.url, fit.datum, content, status, code);                String newUrl = status.getMessage();                Text redirUrl =                  handleRedirect(fit.url, fit.datum,                                 urlString, newUrl, temp,                                 Fetcher.PROTOCOL_REDIR);                if (redirUrl != null) {                  CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,                      fit.datum.getFetchInterval(), fit.datum.getScore());                  // transfer existing metadata                  newDatum.getMetaData().putAll(fit.datum.getMetaData());                  scfilters.initialScore(redirUrl, newDatum);                  if (reprUrl != null) {                    newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,                        new Text(reprUrl));                  }                  fit = FetchItem.create(redirUrl, newDatum, queueMode);                  if (fit != null) {                    FetchItemQueue fiq =                      fetchQueues.getFetchItemQueue(fit.queueID);                    fiq.addInProgressFetchItem(fit);                  } else {                    // stop redirecting                    redirecting = false;                    reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect", 1);                  }                } else {                  // stop redirecting                  redirecting = false;                }                break;              case ProtocolStatus.EXCEPTION:                logError(fit.url, status.getMessage());                int killedURLs = fetchQueues.checkExceptionThreshold(fit.getQueueID());                if (killedURLs!=0)                   reporter.incrCounter("FetcherStatus", "AboveExceptionThresholdInQueue", killedURLs);                /* FALLTHROUGH */              case ProtocolStatus.RETRY:          // retry              case ProtocolStatus.BLOCKED:                output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);                break;              case ProtocolStatus.GONE:           // gone              case ProtocolStatus.NOTFOUND:              case ProtocolStatus.ACCESS_DENIED:              case ProtocolStatus.ROBOTS_DENIED:                output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_GONE);                break;              case ProtocolStatus.NOTMODIFIED:                output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_NOTMODIFIED);                break;              default:                if (LOG.isWarnEnabled()) {                  LOG.warn("Unknown ProtocolStatus: " + status.getCode());                }                output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);              }              if (redirecting && redirectCount > maxRedirect) {                fetchQueues.finishFetchItem(fit);                if (LOG.isInfoEnabled()) {                  LOG.info(" - redirect count exceeded " + fit.url);                }                output(fit.url, fit.datum, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatum.STATUS_FETCH_GONE);              }            } while (redirecting && (redirectCount <= maxRedirect));          } catch (Throwable t) {                 // unexpected exception            // unblock            fetchQueues.finishFetchItem(fit);            logError(fit.url, StringUtils.stringifyException(t));            output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED, CrawlDatum.STATUS_FETCH_RETRY);          }        }      } catch (Throwable e) {        if (LOG.isErrorEnabled()) {          LOG.error("fetcher caught:"+e.toString());        }      } finally {        if (fit != null) fetchQueues.finishFetchItem(fit);        activeThreads.decrementAndGet(); // count threads        LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads);      }    }

?

   private ParseStatus output(Text key, CrawlDatum datum,                        Content content, ProtocolStatus pstatus, int status, int outlinkDepth) {      //封装CrawlDatum      datum.setStatus(status);      datum.setFetchTime(System.currentTimeMillis());      if (pstatus != null) datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);            ParseResult parseResult = null;      if (content != null) {            Metadata metadata = content.getMetadata();        //记录content type         // store the guessed content type in the crawldatum        if (content.getContentType() != null) datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE), new Text(content.getContentType()));                // add segment to metadata        metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);        // add score to content metadata so that ParseSegment can pick it up.        try {                  scfilters.passScoreBeforeParsing(key, datum, content);        } catch (Exception e) {          if (LOG.isWarnEnabled()) {            LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");          }        }        /* Note: Fetcher will only follow meta-redirects coming from the         * original URL. */        if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {          if (!skipTruncated || (skipTruncated && !ParseSegment.isTruncated(content))) {            try {              //对抓到的源码解析              parseResult = this.parseUtil.parse(content);            } catch (Exception e) {              LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e));            }          }            if (parseResult == null) {            byte[] signature =              SignatureFactory.getSignature(getConf()).calculate(content,                  new ParseStatus().getEmptyParse(conf));            datum.setSignature(signature);          }        }        /* Store status code in content So we can read this value during         * parsing (as a separate job) and decide to parse or not.         */        content.getMetadata().add(Nutch.FETCH_STATUS_KEY, Integer.toString(status));      }      try {        output.collect(key, new NutchWritable(datum));        if (content != null && storingContent)          output.collect(key, new NutchWritable(content));        if (parseResult != null) {          for (Entry<Text, Parse> entry : parseResult) {            Text url = entry.getKey();            Parse parse = entry.getValue();            ParseStatus parseStatus = parse.getData().getStatus();            ParseData parseData = parse.getData();            if (!parseStatus.isSuccess()) {              LOG.warn("Error parsing: " + key + ": " + parseStatus);              parse = parseStatus.getEmptyParse(getConf());            }            // Calculate page signature. For non-parsing fetchers this will            // be done in ParseSegment            byte[] signature =              SignatureFactory.getSignature(getConf()).calculate(content, parse);            // Ensure segment name and score are in parseData metadata            parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY,                segmentName);            parseData.getContentMeta().set(Nutch.SIGNATURE_KEY,                StringUtil.toHexString(signature));            // Pass fetch time to content meta            parseData.getContentMeta().set(Nutch.FETCH_TIME_KEY,                Long.toString(datum.getFetchTime()));            if (url.equals(key))              datum.setSignature(signature);            try {              scfilters.passScoreAfterParsing(url, content, parse);            } catch (Exception e) {              if (LOG.isWarnEnabled()) {                LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");              }            }            String fromHost;            // collect outlinks for subsequent db update            Outlink[] links = parseData.getOutlinks();            int outlinksToStore = Math.min(maxOutlinks, links.length);            if (ignoreExternalLinks) {              try {                fromHost = new URL(url.toString()).getHost().toLowerCase();              } catch (MalformedURLException e) {                fromHost = null;              }            } else {              fromHost = null;            }            int validCount = 0;            // Process all outlinks, normalize, filter and deduplicate            List<Outlink> outlinkList = new ArrayList<Outlink>(outlinksToStore);            HashSet<String> outlinks = new HashSet<String>(outlinksToStore);            for (int i = 0; i < links.length && validCount < outlinksToStore; i++) {              String toUrl = links[i].getToUrl();              toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl, fromHost, ignoreExternalLinks, urlFilters, normalizers);              if (toUrl == null) {                continue;              }              validCount++;              links[i].setUrl(toUrl);              outlinkList.add(links[i]);              outlinks.add(toUrl);            }            // Only process depth N outlinks            if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) {              reporter.incrCounter("FetcherOutlinks", "outlinks_detected", outlinks.size());              // Counter to limit num outlinks to follow per page              int outlinkCounter = 0;              // Calculate variable number of outlinks by depth using the divisor (outlinks = Math.floor(divisor / depth * num.links))              int maxOutlinksByDepth = (int)Math.floor(outlinksDepthDivisor / (outlinkDepth + 1) * maxOutlinkDepthNumLinks);              String followUrl;              // Walk over the outlinks and add as new FetchItem to the queues              Iterator<String> iter = outlinks.iterator();              while(iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks) {                followUrl = iter.next();                // Check whether we'll follow external outlinks                if (outlinksIgnoreExternal) {                  if (!URLUtil.getHost(url.toString()).equals(URLUtil.getHost(followUrl))) {                    continue;                  }                }                reporter.incrCounter("FetcherOutlinks", "outlinks_following", 1);                // Create new FetchItem with depth incremented                FetchItem fit = FetchItem.create(new Text(followUrl), new CrawlDatum(CrawlDatum.STATUS_LINKED, interval), queueMode, outlinkDepth + 1);                fetchQueues.addFetchItem(fit);                outlinkCounter++;              }            }            // Overwrite the outlinks in ParseData with the normalized and filtered set            parseData.setOutlinks((Outlink[])outlinkList.toArray(new Outlink[outlinkList.size()]));            output.collect(url, new NutchWritable(                    new ParseImpl(new ParseText(parse.getText()),                                  parseData, parse.isCanonical())));          }        }      } catch (IOException e) {        if (LOG.isErrorEnabled()) {          LOG.error("fetcher caught:"+e.toString());        }      }      // return parse status if it exits      if (parseResult != null && !parseResult.isEmpty()) {        Parse p = parseResult.get(content.getUrl());        if (p != null) {          reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[p.getData().getStatus().getMajorCode()], 1);          return p.getData().getStatus();        }      }      return null;    }  }

?

读书人网 >开源软件

热点推荐