/** * the main mechanics of invokeAny. */ private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { // 提交任务为空,抛出空指针异常 if (tasks == null) thrownewNullPointerException(); // 记录待执行的任务的剩余数量 intntasks= tasks.size(); // 任务集合中的数据为空,抛出非法参数异常 if (ntasks == 0) thrownewIllegalArgumentException(); // 以当前实例对象作为参数构建ExecutorCompletionService对象 // ExecutorCompletionService负责执行任务,后面调用poll返回第一个执行结果 ArrayList<Future<T>> futures = newArrayList<Future<T>>(ntasks); ExecutorCompletionService<T> ecs = newExecutorCompletionService<T>(this);
// For efficiency, especially in executors with limited // parallelism, check to see if previously submitted tasks are // done before submitting more of them. This interleaving // plus the exception mechanics account for messiness of main // loop.
try { // 记录可能抛出的执行异常 // Record exceptions so that if we fail to obtain any // result, we can throw the last exception we got. ExecutionExceptionee=null; // 初始化超时时间 finallongdeadline= timed ? System.nanoTime() + nanos : 0L; Iterator<? extendsCallable<T>> it = tasks.iterator();
// 提交任务,并将返回的结果数据添加到futures集合中 // 提交一个任务主要是确保在进入循环之前开始一个任务 // Start one task for sure; the rest incrementally futures.add(ecs.submit(it.next())); --ntasks; // 记录正在执行的任务数量 intactive=1;
// 遍历所有任务 // Interleave time checks and calls to execute in case // executor doesn't have any/much parallelism. for (inti=0; i < size; i++) { execute((Runnable)futures.get(i)); // 在添加执行任务时超时判断,如果超时则立即返回futures集合 nanos = deadline - System.nanoTime(); if (nanos <= 0L) // 在添加执行任务时进行超时判断,如果超时,则立刻返回 futures 集合 return futures; }
// 遍历所有任务 for (inti=0; i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { // 对结果进行判断时进行超时判断 if (nanos <= 0L) // 每次对结果数据进行判断时添加了超时处理逻辑 return futures; try { f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { return futures; } // 重置任务的超时时间 nanos = deadline - System.nanoTime(); } } done = true; return futures; } finally { if (!done) for (inti=0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }
// Check if queue empty only if necessary. // 如果当前线程池状态处于关闭状态(即SHUTDOWN及其以上状态),并且以下三个条件之一成立: // 1. 当前线程池状态为STOP及其以上状态; // 2. 第一个任务不为空; // 3. 工作队列为空; // 则创建新线程失败 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) // 创建新线程失败 returnfalse;
for (;;) { // 获得当前工作线程数 intwc= workerCountOf(c);
// 参数中 core 为true // CAPACITY 为 1 << COUNT_BITS-1,一般不会超过 // 如果工作线程数大于了核心线程数,则创建失败 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) returnfalse; // 通过CAS操作改变c的值 if (compareAndIncrementWorkerCount(c)) // 更改成功就跳出多重循环,且不再运行循环 break retry; // 更改失败,重新获取ctl的值 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 跳出多重循环,且重新进入循环 continue retry; // else CAS failed due to workerCount change; retry inner loop } }
// 用于标记work中的任务是否成功执行 booleanworkerStarted=false; // 用于标记worker是否成功加入了线程池中 booleanworkerAdded=false; Workerw=null; try { // 创建新线程来执行任务 w = newWorker(firstTask); finalThreadt= w.thread; if (t != null) { finalReentrantLockmainLock=this.mainLock; // 加锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 加锁的同时再次检测 // 避免在释放锁之前调用了shut down intrs= runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable thrownewIllegalThreadStateException(); // 将线程添加到线程池中 workers.add(w); ints= workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 添加成功标志位变为true workerAdded = true; } } finally { mainLock.unlock(); } // 如果worker成功加入了线程池,就执行其中的任务 if (workerAdded) { t.start(); // 启动成功 workerStarted = true; } } } finally { // 如果执行失败 if (! workerStarted) // 调用添加失败的函数 addWorkerFailed(w); } return workerStarted; }
// 当两个任务有相同的延迟时间时,按照 FIFO 的顺序入队.sequenceNumber 就是为相同延时任务提供的顺序编号 /** Sequence number to break ties FIFO */ privatefinallong sequenceNumber;
// 任务可以执行的时间,纳秒级 通过triggerTime方法计算得出 /** The time the task is enabled to execute in nanoTime units */ privatelong time;
// 重复任务的执行周期时间,纳秒级 正数表示固定速率执行(为scheduleAtFixedRate提供服务),负数表示固定延迟执行(为scheduleWithFixedDelay提供服务),0表示不重复任务 /** * Period in nanoseconds for repeating tasks. A positive * value indicates fixed-rate execution. A negative value * indicates fixed-delay execution. A value of 0 indicates a * non-repeating task. */ privatefinallong period;
// 重新入队的任务 通过reExecutePeriodic方法入队重新排序 /** The actual task to be re-enqueued by reExecutePeriodic */ RunnableScheduledFuture<V> outerTask = this; // 延迟队列的索引,以支持更快的取消操作 /** * Index into delay queue, to support faster cancellation. */ int heapIndex;
/** * Sets the next time to run for a periodic task. */ privatevoidsetNextRunTime() { longp= period; // 固定速率执行,scheduleAtFixedRate if (p > 0) time += p; else // 固定延迟执行,scheduleWithFixedDelay time = triggerTime(-p); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/** * Requeues a periodic task unless current run state precludes it. * Same idea as delayedExecute except drops task rather than rejecting. * * @param task the task */ voidreExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { // 任务入队 super.getQueue().add(task); // 重新检查run-after-shutdown参数,如果不能继续运行就移除队列任务,并取消任务的执行 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else // 启动一个新的线程等待任务 ensurePrestart(); } }
/* * This class specializes ThreadPoolExecutor implementation by * * 1. Using a custom task type, ScheduledFutureTask for * tasks, even those that don't require scheduling (i.e., * those submitted using ExecutorService execute, not * ScheduledExecutorService methods) which are treated as * delayed tasks with a delay of zero. * * 2. Using a custom queue (DelayedWorkQueue), a variant of * unbounded DelayQueue. The lack of capacity constraint and * the fact that corePoolSize and maximumPoolSize are * effectively identical simplifies some execution mechanics * (see delayedExecute) compared to ThreadPoolExecutor. * * 3. Supporting optional run-after-shutdown parameters, which * leads to overrides of shutdown methods to remove and cancel * tasks that should NOT be run after shutdown, as well as * different recheck logic when task (re)submission overlaps * with a shutdown. * * 4. Task decoration methods to allow interception and * instrumentation, which are needed because subclasses cannot * otherwise override submit methods to get this effect. These * don't have any impact on pool control logic though. */
/** * continueExistingPeriodicTasksAfterShutdown和executeExistingDelayedTasksAfterShutdown是 * ScheduledThreadPoolExecutor 定义的 run-after-shutdown 参数,用来控制池关闭之后的任务执行逻辑 */ // 关闭后继续执行已经存在的周期任务 /** * False if should cancel/suppress periodic tasks on shutdown. */ privatevolatileboolean continueExistingPeriodicTasksAfterShutdown;
// 关闭后继续执行已经存在的延时任务 /** * False if should cancel non-periodic tasks on shutdown. */ privatevolatilebooleanexecuteExistingDelayedTasksAfterShutdown=true;
/** * 用来控制任务取消后是否从队列中移除.当一个已经提交的周期或延迟任务在运行之前被取消,那么它之后将不会运行. * 默认配置下,这种已经取消的任务在届期之前不会被移除.通过这种机制,可以方便检查和监控线程池状态,但也可能导致已经取消的任务无限滞留. * 为了避免这种情况的发生,我们可以通过setRemoveOnCancelPolicy方法设置移除策略,把参数removeOnCancel设为true可以在任务取消后立即从队列中移除. */ /** * True if ScheduledFutureTask.cancel should remove from queue */ privatevolatilebooleanremoveOnCancel=false;
// 是为相同延时的任务提供的顺序编号,保证任务之间的 FIFO 顺序.与 ScheduledFutureTask 内部的sequenceNumber参数作用一致 /** * Sequence number to break scheduling ties, and in turn to * guarantee FIFO order among tied entries. */ privatestaticfinalAtomicLongsequencer=newAtomicLong();