ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

nutch核心代码分析——crawl.Indexer

2021-11-24 21:31:02  阅读:142  来源: 互联网

标签:nutch CrawlDatum doc segment Indexer job value null crawl



    这个类的任务是另一方面的工作了,它是基于hadoop和lucene的分布式索引。它就是为前面爬虫抓取回来的数据进行索引好让用户可以搜索到这些数据。
    这里的输入就比较多了,有segments下的fetch_dir,parseData和parseText,还有crawldb下的 current_dir和linkdb下的current_dir。
    1,在这个类里,map将所有输入都装载到一个容器里边,
    2,再到reduce进行分类处理,
    3,实现拦截 --> this.filters.filter(doc, parse, key, fetchDatum, inlinks);
    4,打分 --> this.scfilters.indexerScore(key, doc, dbDatum,fetchDatum, parse, inlinks, boost);
    5,当然要把这些数据体组合成一个 lucene的document让它索引了。
    6,在reduce里组装好后收集时是<url,doc>,最后在输出的OutputFormat类里进行真正的索引。
        doc里有如下几个field
            content(正文)
            site    (所属主地址 )
            title    (标题)
            host    (host)
            segement    (属于哪个segement)
            digest    (MD5码,去重时候用到) 
            tstamp    (时间戳)
            url    (当前URL地址)
            载了一个例子:
                doc = 
                    {content=[biaowen - JavaEye技术网站 首页 新闻 论坛 博客 招聘 更多 ▼ 问答 ………………(内容省略)………… biaowen 永NF/ICP备05023328号], 
                    site=[biaowen.iteye.com], 
                    title=[biaowen - JavaEye技术网站], 
                    host=[biaowen.iteye.com], 
                    segment=[20090725083125], 
                    digest=[063ba8430fa84e614ce71276e176f4ce], 
                    tstamp=[20090725003318265], 
                    url=[http://biaowen.iteye.com/]}

public static void initMRJob(Path crawlDb, Path linkDb,
                           Collection<Path> segments,
                           JobConf job) {
 
 
    LOG.info("IndexerMapReduce: crawldb: " + crawlDb);
    LOG.info("IndexerMapReduce: linkdb: " + linkDb);
 
 
	// 加入segment中要建立索引的目录
    for (final Path segment : segments) {
      LOG.info("IndexerMapReduces: adding segment: " + segment);
      FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.FETCH_DIR_NAME));  // crawl_fetch
      FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.PARSE_DIR_NAME));  // fetch_parse
      FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));         // parse_data
      FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));         // parse_text
    }
 
 
    FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));         // crawldb/current
    FileInputFormat.addInputPath(job, new Path(linkDb, LinkDb.CURRENT_NAME));           // linkdb/current
    job.setInputFormat(SequenceFileInputFormat.class);                    // 设置输入的文件格式, 这里所有目录中的文件格式都是SequenceFileInputFormat,
 
 
	// 设置Map与Reduce的类型
    job.setMapperClass(IndexerMapReduce.class);
    job.setReducerClass(IndexerMapReduce.class);
 
 
	// 设置输出类型
    job.setOutputFormat(IndexerOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setMapOutputValueClass(NutchWritable.class);   // 这里设置了Map输出的Value的类型,key类型还是上面的Text
    job.setOutputValueClass(NutchWritable.class);
  }
 
 
  	IndexerMapRducer中的Map只是读入<key,value>对,把value做NutchWritable进行了封装再输出,下面来看一下IndexerMapReduce中的Reduce方法做了些什么
	 public void reduce(Text key, Iterator<NutchWritable> values,
                     OutputCollector<Text, NutchDocument> output, Reporter reporter)
    throws IOException {
    Inlinks inlinks = null;
    CrawlDatum dbDatum = null;
    CrawlDatum fetchDatum = null;
    ParseData parseData = null;
    ParseText parseText = null;
	// 这一块代码是判断相同key的value的类型,根据其类型来对
	// inlinks,dbDatum,fetchDatum,parseData,praseText对象进行赋值
    while (values.hasNext()) {
      final Writable value = values.next().get(); // unwrap
      if (value instanceof Inlinks) {
        inlinks = (Inlinks)value;
      } else if (value instanceof CrawlDatum) {
        final CrawlDatum datum = (CrawlDatum)value;
        if (CrawlDatum.hasDbStatus(datum))
          dbDatum = datum;
        else if (CrawlDatum.hasFetchStatus(datum)) {
          // don't index unmodified (empty) pages
          if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED)
            fetchDatum = datum;
        } else if (CrawlDatum.STATUS_LINKED == datum.getStatus() ||
                   CrawlDatum.STATUS_SIGNATURE == datum.getStatus() ||
                   CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {
          continue;
        } else {
          throw new RuntimeException("Unexpected status: "+datum.getStatus());
        }
      } else if (value instanceof ParseData) {
        parseData = (ParseData)value;
      } else if (value instanceof ParseText) {
        parseText = (ParseText)value;
      } else if (LOG.isWarnEnabled()) {
        LOG.warn("Unrecognized type: "+value.getClass());
      }
    }
 
 
    if (fetchDatum == null || dbDatum == null
        || parseText == null || parseData == null) {
      return;                                     // only have inlinks
    }
 
 
    if (!parseData.getStatus().isSuccess() ||
        fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {
      return;
    }
 
 
	// 生成一个可以索引的文档对象,在Lucene中,Docuemnt就是一个抽象的文档对象,其有Fields组成,而Field又由Terms组成
    NutchDocument doc = new NutchDocument();
    final Metadata metadata = parseData.getContentMeta();
 
 
    // add segment, used to map from merged index back to segment files
    doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY));
 
 
    // add digest, used by dedup
    doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));
 
 
    final Parse parse = new ParseImpl(parseText, parseData);
    try {
      // extract information from dbDatum and pass it to
      // fetchDatum so that indexing filters can use it
      final Text url = (Text) dbDatum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
      if (url != null) {
        fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url);
      }
      // run indexing filters
      doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks);
    } catch (final IndexingException e) {
      if (LOG.isWarnEnabled()) { LOG.warn("Error indexing "+key+": "+e); }
      return;
    }
 
 
    // skip documents discarded by indexing filters
    if (doc == null) return;
 
 
    float boost = 1.0f;
    // run scoring filters
    try {
      boost = this.scfilters.indexerScore(key, doc, dbDatum,
              fetchDatum, parse, inlinks, boost);
    } catch (final ScoringFilterException e) {
      if (LOG.isWarnEnabled()) {
        LOG.warn("Error calculating score " + key + ": " + e);
      }
      return;
    }
    // apply boost to all indexed fields.
    doc.setWeight(boost);
    // store boost for use by explain and dedup
    doc.add("boost", Float.toString(boost));
 
 
	// 收集输出结果,用下面的IndexerOutputFormat写到Solr中去
    output.collect(key, doc);
  }

—

标签:nutch,CrawlDatum,doc,segment,Indexer,job,value,null,crawl
来源: https://blog.csdn.net/weixin_47876869/article/details/121525421

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有