ScheduledThreadPoolExecutor的scheduleAtFixedRate方法探究
??? ScheduledThreadPoolExecutor除了具有ThreadPoolExecutor的所有功能外,还可以延迟执行任务或者周期性的执行某个任务。scheduleWithFixedDelay和scheduleAtFixedRate就是用来完成这个功能的。平常使用scheduleAtFixedRate这个方法时并没有多想,但是这几天在实现一个功能的时候,需要考虑scheduleAtFixedRate所执行的task是否会影响任务的周期性,比如scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS),那么这个command的执行会不会影响这个10秒的周期性。因此特意仔细看了下ScheduledThreadPoolExecutor的源代码,这里记录一下,以便以后查看。
??? scheduleAtFixedRate有两个时间参数,initialDelay和period,对应该方法的两个主要功能,即延迟运行任务和周期性执行任务。
?
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Object>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period))); delayedExecute(t); return t; } /** * Specialized variant of ThreadPoolExecutor.execute for delayed tasks. */ private void delayedExecute(Runnable command) { if (isShutdown()) { reject(command); return; } // Prestart a thread if necessary. We cannot prestart it // running the task because the task (probably) shouldn't be // run yet, so thread will just idle until delay elapses. if (getPoolSize() < getCorePoolSize()) prestartCoreThread(); super.getQueue().add(command); }
??? 首先创建一个ScheduledFutureTask,然后通过delayedExecute执行这个task。在delayedExecute中,首先预先启动一个线程,这里要注意的是这个这里用来启动一个新线程的firstTask参数是null,所以新启动的线程是idle状态的,然后把这个task加入到workQueue。ScheduledThreadPoolExecutor里的workQueue用的是DelayedWorkQueue,这个DelayedWorkQueue就是实现delay的关键。DelayedWorkQueue内部使用的是DelayQueue,DelayQueue实现task delay的关键就在于其Offer(E e)和Take.下面,通过分析这两个方法和结合ThreadPoolExecutor的运行原理来说明delay操作是如何实现的
?
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); q.offer(e); if (first == null || e.compareTo(first) < 0) available.signalAll(); return true; } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { available.await(); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { long tl = available.awaitNanos(delay); } else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); // wake up other takers return x; } } } } finally { lock.unlock(); } }
????? ScheduledThreadPoolExecutor执行task是通过工作线程Work来承担的,Work的Run方法如下:
?
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
???? 因为前面在delayedExecute方法里面创建work线程的firstTask参数为null,所以就通过getTask去从workQueue里面获取task,getTask在正常情况下(即线程池没有关闭,线程数量没有超过corePoolSize等)是通过workQueue.take()从workQueue里获取任务。根据上面的贴出来的take方法的代码,如果queue是空的,则take方法会阻塞住,直到有新task被add进来。而在上面的delayedExecute方法的最后,会把创建的scheduledFutureTask加入到workQueue,这样take方法中的available.await()就被唤醒;在take方法里面,如果workQueue不为空,则执行task.getDelay()方法获取task的delay
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); }
?? 这里的time是通过两个方法把initialDelay变成一个triggerTime
/** * Returns the trigger time of a delayed action. */ private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } /** * Returns the trigger time of a delayed action. */ long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
注意看这个方法,这里返回的delay不是固定不变的,从task被放入workQueue起,不同的时间调用getDelay方法会得出不同的delay。如果放入workQueue的task的initialDelay是5秒,那么根据take方法的代码,如果在放入workQueue5秒后,就可以从delayQueue中拿到5秒前put进去的task,这样就实现了delay的功能。
?
?? 在本文的最前面提到scheduleAtFixedRate能够周期性地执行一项任务,那么这个是如何实现的呢?在scheduleAtFixedRate方法里创建了一个ScheduledFutureTask,这个ScheduledFutureTask包装了command,最后周期性执行的是ScheduledFutureTask的run方法。
private void runPeriodic() { boolean ok = ScheduledFutureTask.super.runAndReset(); boolean down = isShutdown(); // Reschedule if not cancelled and not shutdown or policy allows if (ok && (!down || (getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) { long p = period; if (p > 0) time += p; else time = triggerTime(-p); ScheduledThreadPoolExecutor.super.getQueue().add(this); } // This might have been the final executed delayed // task. Wake up threads to check. else if (down) interruptIdleWorkers(); } /** * Overrides FutureTask version so as to reset/requeue if periodic. */ public void run() { if (isPeriodic()) runPeriodic(); else ScheduledFutureTask.super.run(); }
???? 由上面的代码可以看出,scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS)这个方法的周期性会受command的影响,如果command方法的执行时间是10秒,那么执行command的周期其实是20秒,即scheduleAtFixedRate这个方法要等一个完整的command方法执行完成后才继续周期性地执行command方法,其实这样的设计也是符合常理的。
?
???? 以上就是对ScheduledThreadPoolExecutor的一点小理解。