Accelerating Applications with Java 5 Concurrency
无意中找到这篇文章,感觉非常的好,保留做一个记录,主要解决的问题场景为:
如果一个请求提交多个任务,为了提高效率多线程并行处理。一旦多个请求的任务数非常多。导致创建的数非常多,消耗太多的资源得不偿失。进一步考虑使用Executors.newFixedThreadPool(50)创建线程池,这样保证了线程的重复利用,并且控制了创建线程的数目。但这样做可能带来另外一个问题,如果一个请求任务数大于49个,线程池被这个请求完全占用,可能导致其它请求不能获得线程,影响其它请求的响应时间。需要进一步控制每一个请求的并行处理任务数目。如果每一个请求的并发处理5个任务,这样至少有10个请求能够被并行处理。提高了整体请求平均响应时间。 这篇blog文提供了一个很好的实现。在平常的设计中应用的还是比较多的。。。。。。
http://weblogs.java.net/blog/2008/12/22/accelerating-applications-java-5-concurrency
import java.util.concurrent.BlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.CompletionService;import java.util.concurrent.Executor;import java.util.concurrent.Future;import java.util.concurrent.FutureTask;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;/** * Exactly like ExecutorCompletionService, except uses a * Semaphore to only permit X tasks to run concurrently * on the passed Executor */public class BoundedCompletionService<V> implements CompletionService<V> { private final Semaphore semaphore; private final Executor executor; private final BlockingQueue<Future<V>> completionQueue; // FutureTask to release Semaphore as completed private class BoundedFuture extends FutureTask { BoundedFuture(Callable<V> c) { super(c); } BoundedFuture(Runnable t, V r) { super(t, r); } protected void done() { semaphore.release(); completionQueue.add(this); } } public BoundedCompletionService(final Executor executor, int permits) { this.executor = executor; this.semaphore = new Semaphore(permits); this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } public Future<V> poll() { return this.completionQueue.poll(); } public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return this.completionQueue.poll(timeout, unit); } public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); try { BoundedFuture f = new BoundedFuture(task); this.semaphore.acquire(); // waits this.executor.execute(f);return f; } catch (InterruptedException e) { // do nothing } return null; } public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); try { BoundedFuture f = new BoundedFuture(task, result); this.semaphore.acquire(); // waits this.executor.execute(f); return f; } catch (InterruptedException e) { // do nothing } return null; } public Future<V> take() throws InterruptedException { return this.completionQueue.take(); }}import java.util.ArrayList;import java.util.Collection;import java.util.List;import java.util.concurrent.BlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executor;import java.util.concurrent.Future;import java.util.concurrent.FutureTask;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class ParallelTask<V> implements Future<Collection<V>> { // FutureTask to release Semaphore as completed private class BoundedFuture extends FutureTask<V> { BoundedFuture(Callable<V> c) { super(c); } BoundedFuture(Runnable t, V r) { super(t, r); } protected void done() { semaphore.release(); completedQueue.add(this); } } private final List<BoundedFuture> submittedQueue; private final BlockingQueue completedQueue; private final Semaphore semaphore; private final Executor executor; private final int size; private boolean cancelled = false; public ParallelTask(Executor exec, Collection<Callable<V>> callable, int permits) { if (exec == null || callable == null) throw new NullPointerException(); this.executor = exec; this.semaphore = new Semaphore(permits); this.size = callable.size(); this.submittedQueue = new ArrayList<BoundedFuture>(size); this.completedQueue = new LinkedBlockingQueue<BoundedFuture>(size); for (Callable<V> c : callable) { this.submittedQueue.add(new BoundedFuture(c)); } } public boolean cancel(boolean mayInterruptIfRunning) { if (this.isDone()) return false; this.cancelled = true; for (Future f : this.submittedQueue) { f.cancel(mayInterruptIfRunning); } return this.cancelled; } public Collection<V> get() throws InterruptedException, ExecutionException { Collection<V> result = new ArrayList<V?(this.submittedQueue.size()); boolean done = false; try { for (BoundedFuture f : this.submittedQueue) { if (this.isCancelled()) break; this.semaphore.acquire(); this.executor.execute(f); } for (int i = 0; i < this.size; i++) { if (this.isCancelled()) break; result.add(this.completedQueue.take().get()); } done = true; } finally { if (!done) this.cancel(true); } return result; } public Collection<V> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // timeout handling isn't perfect, but it's an attempt to // replicate the behavior found in AbstractExecutorService long nanos = unit.toNanos(timeout); long totalTime = System.nanoTime() + nanos; boolean done = false; Collection<V> result = new ArrayList<V>(this.submittedQueue.size()); try { for (BoundedFuture f : this.submittedQueue) { if (System.nanoTime() >= totalTime) throw new TimeoutException(); if (this.isCancelled()) break; this.semaphore.acquire(); this.executor.execute(f); } for (int i = 0; i < this.size; i++) { if (this.isCancelled()) break; long nowTime = System.nanoTime(); if (nowTime >= totalTime) throw new TimeoutException(); BoundedFuture f = this.completedQueue.poll(totalTime - nowTime, TimeUnit.NANOSECONDS); if (f == null) throw new TimeoutException(); result.add(f.get()); } done = true; } finally { if (!done) this.cancel(true); } return result; } public boolean isCancelled() { return this.cancelled; } public boolean isDone() { return this.completedQueue.size() == this.size; }}// Shared Thread Poolprivate ExecutorService pool = Executors.newFixedThreadPool(50);// Using new ParallelTaskpublic List<Output> processParallelTask(List<Stuff> request) { Collection<Callable<Output>> tasks = new ArrayList<Callable<Output>>(request.size()); for (final Stuff s : request) { tasks.add(new Callable<Output>() { // no wait public Output call() throws Exception { return lengthyExternalProcess(s); } }); } // timeout after 6 seconds if necessary return new ParallelTask<Output>(this.pool,tasks,5).get(6, TimeUnit.SECONDS);}