privatebooleanaddWorker(Runnable firstTask, boolean core){ retry: for (;;) { int c = ctl.get();int rs = runStateOf(c);
// Check if queue empty only if necessary. // SHUTDOWN,STOP,TIDYING,TERMINATED四种状态返回失败 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) returnfalse;
for (;;) { int wc = workerCountOf(c); // >corePoolSize需要放入workQueue,>maximumPoolSize也会返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) returnfalse;
// cas (rs,wc) + 1,rs在高位,这里是wc+1 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 如果状态变了,需要重新循环 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 创建一个worker w = new Worker(firstTask); // 被代理的线程 final Thread t = w.thread; if (t != null) { // 该锁主要针对 HashSet<Worker> workers final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 线程已经启动过了 if (t.isAlive()) // precheck that t is startable thrownew IllegalThreadStateException(); // 添加到Set<Worker> workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 启动线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
publicvoidsetCorePoolSize(int corePoolSize){ if (corePoolSize < 0) thrownew IllegalArgumentException(); // 差值 int delta = corePoolSize - this.corePoolSize; // 重新复制 this.corePoolSize = corePoolSize; // 如果原来的线程数大于新的线程数,则会终止 if (workerCountOf(ctl.get()) > corePoolSize) interruptIdleWorkers(); elseif (delta > 0) { // We don't really know how many new threads are "needed". // As a heuristic, prestart enough new workers (up to new // core size) to handle the current number of tasks in // queue, but stop if queue becomes empty while doing so. int k = Math.min(delta, workQueue.size()); // 创建线程 while (k-- > 0 && addWorker(null, true)) { // workQueue为空就先停止创建,毕竟没必要 if (workQueue.isEmpty()) break; } } } publicvoidsetMaximumPoolSize(int maximumPoolSize){ if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) thrownew IllegalArgumentException(); this.maximumPoolSize = maximumPoolSize; if (workerCountOf(ctl.get()) > maximumPoolSize) interruptIdleWorkers(); }