线程池部分源码详解

tech2022-12-09  111

ThreadPoolExecutor属性定义的解释

// Integer总共有32位,最右边29位表示工作线程数,最左边3位表示线程池状态,简单来说,3个二进制位可以表示从0至7的8个不同数值 private static final int COUNT_BITS = Integer.SIZE - 3; // 线程池的容量 2^29 - 1 = 536870911即 000-11111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 此状态表示线程池能接受新任务 111-00000000000000000000000000000,即十进制的-536870912 private static final int RUNNING = -1 << COUNT_BITS; // 此状态表示不再接受新任务,但可以继续执行队列中的任务 000-00000000000000000000000000000,即十进制的0 private static final int SHUTDOWN = 0 << COUNT_BITS; // 此状态表示全面拒绝,并中断正在处理的任务 001-00000000000000000000000000000,即十进制的536870912 private static final int STOP = 1 << COUNT_BITS; // 此状态表示所有任务已经终止 010-00000000000000000000000000000,即十进制的1073741824 private static final int TIDYING = 2 << COUNT_BITS; // 此状态表示已清理完现场 011-00000000000000000000000000000,即十进制1610612736 private static final int TERMINATED = 3 << COUNT_BITS; // ~CAPACITY = 111-00000000000000000000000000000 进行与运算获取到左边3位的线程状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // CAPACITY = 000-11111111111111111111111111111 进行与运算获取到右边29位的线程数 private static int workerCountOf(int c) { return c & CAPACITY; } // 把左边3位和右边29位按或运算,合并成一个值 private static int ctlOf(int rs, int wc) { return rs | wc; } // 因为RUNNING的十进制值比SHUTDOWN小,所以可以通过这个来判断 private static boolean isRunning(int c) { return c < SHUTDOWN; } /** * 执行失败的概率非常低,及时失败再次执行成功的概率也是极高的,类似于自旋锁的原理。 * 这里的处理逻辑是先+1,创建失败再-1,这是轻量处理并发创建线程的方式。 * 如果先创建线程,然后进行加减操作的话,代价明显比这样处理要大 */ private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); }

线程池五中状态分别为 RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED

而按照十进制值从小到大依次排序为:RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED

ThreadPoolExecutor.execute()方法源码

public void execute(Runnable command) { //任务为空,抛异常 if (command == null) throw new NullPointerException(); //返回包含线程数和线程池状态的Integer类型数值 int c = ctl.get(); //如果工作线程小于核心线程数,创建线程任务并执行 if (workerCountOf(c) < corePoolSize) { // if (addWorker(command, true)) return; //如果创建失败,防止外部已经在线程池中加入新任务,重新获取一遍 c = ctl.get(); } //如果线程池是RUNNING状态且任务加入队列成功 if (isRunning(c) && workQueue.offer(command)) { //防止外部已经在线程池中加入新任务,重新获取一遍,坚持一遍 int recheck = ctl.get(); //如果线程池不是RUNNING状态,则将刚加入的任务从队列中移除,并执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); //如果之前的线程已经被消费完了,新建一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //核心线程和队列都满了,尝试创建一个新线程 else if (!addWorker(command, false)) //创建失败则唤醒拒绝策略 reject(command); }

ThreadPoolExecutor.addWorker()方法源码

/** * 根据当前线程池状态与给定的界限(核心线程数和最大线程数),检查是否可以添加新的任务线程。如果可以则创建并启动任务 * 创建成功返回true,失败返回false;返回false的两种情况 * 1.为线程池状态为STOP或可以关闭即不是RUNNING状态 * 2.线程工厂创建新的任务线程失败 * @param firstTask 新线程应首先运行的任务 * @param core 新增线程时的判断指标 * true:判断当前 RUNNING 状态的线程是否小于corePoolSize * false:判断当前 RUNNING 状态的线程是否小于maximumPoolSize */ private boolean addWorker(Runnable firstTask, boolean core) { //随机定义的一个字符串,使用break retry时让代码跳出循环且不再进入循环, //使用continue retry时,跳出当前循环进入下一次循环 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 1. 当线程池的状态大于SHUTDOWN时,返回false。因为线程池处于关闭状态了,就不能再接受任务了 // 2. 当线程池的状态是STOP及之上的状态,firstTask不为空或者任务队列为空,返回false if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //获取工作线程数 int wc = workerCountOf(c); // 1. 线程数大于等于理论上的最大数(2^29-1),则直接返回false。(因为超过最大允许线程数会影响左边3位的线程池状态值) // 2. 根据core来决定线程数应该和谁比较。当线程数大于核心线程数或者最大线程数时,直接返回false。 // (因为当大于核心线程数时,表示此时任务应该直接添加到队列中(如果队列满了,可能入队失败);当大于最大线程数时,肯定不能再新创建线程了) if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //将当前活动线程数+1 if (compareAndIncrementWorkerCount(c)) break retry; //因为线程池状态和工作线程数是随时可变的,所以要经常提取这个最新值,并将其与前面获取到的值比较, // 如果相同,说明线程池的状态没有发生变化,继续在内循环中进行循环 // 如果不相同,说明在这期间,线程池的状态发生了变化,需要跳到外层循环,然后再重新进行循环 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop // 否则CAS由于workerCount更改而失败;重试内部循环 } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 创建一个新的worker线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //在进行ThreadPoolExecutor的敏感操作时,都需要持有锁,避免在添加和自启动线程时被干扰 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. //获取锁时进行重新检查,如果ThreadFactory失败或在获得锁之前关闭,请退回 int rs = runStateOf(ctl.get()); //如果线程是RUNNING状态或者虽然是关闭状态,但是firstTask是null,就讲worker线程添加到线程池中 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //检查线程是否是可启动的 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); //整个线程池在运行期间的最大并发任务个数 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

工作线程的核心类实现,Worker源码

/** * 实现Runnable接口,并把本对象作为参数输入给run()方法中的runWorker(this), * 所以内部属性线程thread在start的时候,即会调用runWorker方法 */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { // 它是AbstractQueuedSynchronizer的方法 // 在runWorker方法执行之前禁止线程被中断 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } // 当thread被start()之后,执行runWorker方法 public void run() { runWorker(this); } }

通过ThreadPoolExecutor的方法创建线程池,能更加明确线程池的运行规则,规避资源耗尽的风险

参考:《码出高效》,《面试官:来!聊聊线程池的实现原理以及使用时的问题》

最新回复(0)