读书人

多线程写Lucene目录

发布时间: 2013-08-10 21:14:06 作者: rapoo

多线程写Lucene索引

import java.io.IOException;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import org.apache.lucene.analysis.Analyzer;import org.apache.lucene.document.Document;import org.apache.lucene.index.CorruptIndexException;import org.apache.lucene.index.IndexWriter;import org.apache.lucene.index.Term;import org.apache.lucene.store.Directory;public class ThreadedIndexWriter extends IndexWriter {private ExecutorService threadPool;private Analyzer defaultAnalyzer;private class Job implements Runnable { // 保留要加入索引的一个文档Document doc;Analyzer analyzer;Term delTerm;public Job(Document doc, Term delTerm, Analyzer analyzer) {this.doc = doc;this.analyzer = analyzer;this.delTerm = delTerm;}public void run() { // 实际增加和更新文档try {if (delTerm != null) {ThreadedIndexWriter.super.updateDocument(delTerm, doc,analyzer);} else {ThreadedIndexWriter.super.addDocument(doc, analyzer);}} catch (IOException ioe) {throw new RuntimeException(ioe);}}}@SuppressWarnings("deprecation")public ThreadedIndexWriter(Directory dir, Analyzer a, boolean create,int numThreads, int maxQueueSize, IndexWriter.MaxFieldLength mfl)throws CorruptIndexException, IOException {super(dir, a, create, mfl);defaultAnalyzer = a;threadPool = new ThreadPoolExecutor(// 创建线程池numThreads, numThreads, 0, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(maxQueueSize, false),new ThreadPoolExecutor.CallerRunsPolicy());}public void addDocument(Document doc) { // 让线程池增加文档threadPool.execute(new Job(doc, null, defaultAnalyzer));}public void addDocument(Document doc, Analyzer a) { // 让线程池增加文档threadPool.execute(new Job(doc, null, a));}public void updateDocument(Term term, Document doc) { // 让线程池更新文档threadPool.execute(new Job(doc, term, defaultAnalyzer));}// 让线程池更新文档public void updateDocument(Term term, Document doc, Analyzer a) {threadPool.execute(new Job(doc, term, a));}public void close() throws CorruptIndexException, IOException {finish();super.close();}public void close(boolean doWait) throws CorruptIndexException, IOException {finish();super.close(doWait);}public void rollback() throws CorruptIndexException, IOException {finish();super.rollback();}private void finish() { // 关闭线程池threadPool.shutdown();while (true) {try {if (threadPool.awaitTermination(Long.MAX_VALUE,TimeUnit.SECONDS)) {break;}} catch (InterruptedException ie) {Thread.currentThread().interrupt();throw new RuntimeException(ie);}}}}

读书人网 >编程

热点推荐