读书人

瞧nutch学习hadoop的编程

发布时间: 2012-07-08 17:43:43 作者: rapoo

看nutch学习hadoop的编程

刚下了最新版本的nutch1.0.*,发现nutch的搜索已经转由solr来实现了。nutch上有很多hadoop的应用,可以作为案例学习,看人家如何使用hadoop来实现,这对于刚接触hadoop编程的人来说,这是一个比较好的选择,怎么说nutch也算是hadoop的起源地。。。新版本的nutch使用的hadoop也是比较新的版本。。


看一下nutch的index模块,使用的hadoop的mapreduce方式来实现建索引:

map: 从序列化的文件解析出key与value值。

public void map(Textkey,Writable value,

OutputCollector<Text,NutchWritable>output, Reporter reporter) throws IOException {

output.collect(key,new NutchWritable(value));

}


reduce:key为文档主键,value为文档+对应操作

public void reduce(Textkey,Iterator<NutchWritable>values,

OutputCollector<Text,NutchIndexAction>output, Reporter reporter)

throws IOException {

.......

reporter.incrCounter("IndexerStatus","Documents added",1);


NutchIndexAction action = new NutchIndexAction(doc,NutchIndexAction.ADD);

output.collect(key,action);

.......

}


map只是简单的将输入文件简单的收集起来,而reduce也只是封装了文档数据,这样大量的输入源可以并发处理封装数据源。真正提交索引的是在FileOutputFormat上操作,reduce中的key值是文档主键,value封装了是文档与之对应的操作


public classIndexerOutputFormatextendsFileOutputFormat<Text,NutchIndexAction> {


@Override

public RecordWriter<Text,NutchIndexAction> getRecordWriter(FileSystemignored,

JobConf job, String name, Progressable progress) throws IOException {

// populate JobConf with field indexing options

IndexingFilters filters =new IndexingFilters(job);

finalNutchIndexWriter[]writers =

NutchIndexWriterFactory.getNutchIndexWriters(job);

//初始化每个witer

for (finalNutchIndexWriterwriter :writers) {

writer.open(job,name);

}

return new RecordWriter<Text,NutchIndexAction>() {


public void close(Reporterreporter)throws IOException {

for (final NutchIndexWriter writer : writers) {

writer.close();

}

}

//处理每个记录

public void write(Textkey,NutchIndexAction indexAction) throws IOException {

for (final NutchIndexWriter writer : writers) {

if (indexAction.action ==NutchIndexAction.ADD) {

writer.write(indexAction.doc);

}

if (indexAction.action ==NutchIndexAction.DELETE) {

writer.delete(key.toString());

}

}

}

};

}

}


处理每一个reduce后的记录:RecordWriter,调用 public void write(Text key, NutchIndexAction indexAction) 。nutch只是简单地使用solr的提交索引方式 ,实现的SolrWriter


首先初始化:

public void open(JobConfjob,String name)throws IOException {

SolrServerserver =SolrUtils.getCommonsHttpSolrServer(job);

init(server, job);

}


处理每条记录,首先是删除操作:

public void delete(Stringkey)throws IOException {

if (delete) {

try {

solr.deleteById(key);

numDeletes++;

} catch (finalSolrServerExceptione) {

throwmakeIOException(e);

}

}

}


增加操作:将增加的文档放到缓冲区上,达到一定数量时才进行提交 或者结束前调用

inputDocs.add(inputDoc);

if (inputDocs.size() +numDeletes >=commitSize) {

try {

LOG.info("Indexing " +Integer.toString(inputDocs.size()) +" documents");

LOG.info("Deleting " +Integer.toString(numDeletes) +" documents");

numDeletes = 0;

UpdateRequest req =new UpdateRequest();

req.add(inputDocs);

req.setParams(params);

req.process(solr);

} catch (finalSolrServerExceptione) {

throwmakeIOException(e);

}

inputDocs.clear();

}







读书人网 >编程

热点推荐