读书人

Accelerating Applications with Java

发布时间: 2012-10-24 14:15:58 作者: rapoo

Accelerating Applications with Java 5 Concurrency
无意中找到这篇文章,感觉非常的好,保留做一个记录,主要解决的问题场景为:

如果一个请求提交多个任务,为了提高效率多线程并行处理。一旦多个请求的任务数非常多。导致创建的数非常多,消耗太多的资源得不偿失。进一步考虑使用Executors.newFixedThreadPool(50)创建线程池,这样保证了线程的重复利用,并且控制了创建线程的数目。但这样做可能带来另外一个问题,如果一个请求任务数大于49个,线程池被这个请求完全占用,可能导致其它请求不能获得线程,影响其它请求的响应时间。需要进一步控制每一个请求的并行处理任务数目。如果每一个请求的并发处理5个任务,这样至少有10个请求能够被并行处理。提高了整体请求平均响应时间。 这篇blog文提供了一个很好的实现。在平常的设计中应用的还是比较多的。。。。。。


http://weblogs.java.net/blog/2008/12/22/accelerating-applications-java-5-concurrency


import java.util.concurrent.BlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.CompletionService;import java.util.concurrent.Executor;import java.util.concurrent.Future;import java.util.concurrent.FutureTask;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;/** * Exactly like ExecutorCompletionService, except uses a * Semaphore to only permit X tasks to run concurrently * on the passed Executor */public class BoundedCompletionService<V> implements CompletionService<V> {  private final Semaphore semaphore;  private final Executor executor;  private final BlockingQueue<Future<V>> completionQueue;  // FutureTask to release Semaphore as completed  private class BoundedFuture extends FutureTask {        BoundedFuture(Callable<V> c) { super(c); }        BoundedFuture(Runnable t, V r) { super(t, r); }        protected void done() {        semaphore.release();        completionQueue.add(this);        }    }        public BoundedCompletionService(final Executor executor, int permits) {      this.executor = executor;      this.semaphore = new Semaphore(permits);      this.completionQueue = new LinkedBlockingQueue<Future<V>>();    }    public Future<V> poll() {      return this.completionQueue.poll();    }    public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {      return this.completionQueue.poll(timeout, unit);    }    public Future<V> submit(Callable<V> task)  {      if (task == null) throw new NullPointerException();      try {        BoundedFuture f = new BoundedFuture(task);        this.semaphore.acquire(); // waits        this.executor.execute(f);return f;      } catch (InterruptedException e) {        // do nothing      }      return null;    }    public Future<V> submit(Runnable task, V result) {      if (task == null) throw new NullPointerException();      try {        BoundedFuture f = new BoundedFuture(task, result);        this.semaphore.acquire(); // waits        this.executor.execute(f);        return f;      } catch (InterruptedException e) {        // do nothing      }      return null;    }    public Future<V> take() throws InterruptedException {      return this.completionQueue.take();    }}import java.util.ArrayList;import java.util.Collection;import java.util.List;import java.util.concurrent.BlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executor;import java.util.concurrent.Future;import java.util.concurrent.FutureTask;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class ParallelTask<V> implements Future<Collection<V>> {  // FutureTask to release Semaphore as completed  private class BoundedFuture extends FutureTask<V> {    BoundedFuture(Callable<V> c) { super(c); }    BoundedFuture(Runnable t, V r) { super(t, r); }    protected void done() {      semaphore.release();      completedQueue.add(this);    }  }  private final List<BoundedFuture> submittedQueue;  private final BlockingQueue completedQueue;  private final Semaphore semaphore;  private final Executor executor;  private final int size;  private boolean cancelled = false;  public ParallelTask(Executor exec, Collection<Callable<V>> callable, int permits) {    if (exec == null || callable == null) throw new NullPointerException();    this.executor = exec;    this.semaphore = new Semaphore(permits);    this.size = callable.size();    this.submittedQueue = new ArrayList<BoundedFuture>(size);    this.completedQueue = new LinkedBlockingQueue<BoundedFuture>(size);    for (Callable<V> c : callable) {      this.submittedQueue.add(new BoundedFuture(c));    }  }  public boolean cancel(boolean mayInterruptIfRunning) {    if (this.isDone()) return false;    this.cancelled = true;    for (Future f : this.submittedQueue) {      f.cancel(mayInterruptIfRunning);    }    return this.cancelled;  }  public Collection<V> get() throws InterruptedException, ExecutionException {    Collection<V> result = new ArrayList<V?(this.submittedQueue.size());    boolean done = false;    try {      for (BoundedFuture f : this.submittedQueue) {        if (this.isCancelled()) break;        this.semaphore.acquire();        this.executor.execute(f);      }      for (int i = 0; i < this.size; i++) {        if (this.isCancelled()) break;        result.add(this.completedQueue.take().get());      }      done = true;    } finally {      if (!done) this.cancel(true);    }    return result;  }  public Collection<V> get(long timeout, TimeUnit unit)    throws InterruptedException, ExecutionException, TimeoutException {    // timeout handling isn't perfect, but it's an attempt to     // replicate the behavior found in AbstractExecutorService    long nanos = unit.toNanos(timeout);    long totalTime = System.nanoTime() + nanos;    boolean done = false;    Collection<V> result = new ArrayList<V>(this.submittedQueue.size());    try {      for (BoundedFuture f : this.submittedQueue) {        if (System.nanoTime() >= totalTime) throw new TimeoutException();        if (this.isCancelled()) break;        this.semaphore.acquire();        this.executor.execute(f);      }      for (int i = 0; i < this.size; i++) {        if (this.isCancelled()) break;        long nowTime = System.nanoTime();        if (nowTime >= totalTime) throw new TimeoutException();        BoundedFuture f = this.completedQueue.poll(totalTime - nowTime, TimeUnit.NANOSECONDS);        if (f == null) throw new TimeoutException();        result.add(f.get());      }      done = true;    } finally {      if (!done) this.cancel(true);    }    return result;  }  public boolean isCancelled() {    return this.cancelled;  }  public boolean isDone() {    return this.completedQueue.size() == this.size;  }}// Shared Thread Poolprivate ExecutorService pool = Executors.newFixedThreadPool(50);// Using new ParallelTaskpublic List<Output> processParallelTask(List<Stuff> request) {  Collection<Callable<Output>> tasks = new ArrayList<Callable<Output>>(request.size());  for (final Stuff s : request) {    tasks.add(new Callable<Output>() { // no wait      public Output call() throws Exception {        return lengthyExternalProcess(s);      }    });  }  // timeout after 6 seconds if necessary  return new ParallelTask<Output>(this.pool,tasks,5).get(6, TimeUnit.SECONDS);}

读书人网 >软件架构设计

热点推荐