ThreadPoolExecutor线程池源码分析与流程图

tech2023-07-12  116

ThreadPoolExecutor:线程池

使用线程池参数详解:线程池原理:执行流程图:源码分析submitexecute:具体执行addWorker

使用线程池

//创建线程池 ThreadPoolExecutor test = new ThreadPoolExecutor(1, 1, 5000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(), ThreadPoolExecutor.defaultHandler); //添加任务 test.submit(() -> { System.out.println(1); });

参数详解:

corePoolSize:核心线程数,添加任务时,如果线程数没有达到这个数量,创建新的线程maximumPoolSize:总线程数,当核心线程与阻塞队列满了以后,线程的数量没有超过这个值,创建新线程keepAliveTime:非核心线程会等到这个时间后,销毁unit:时间单位workQueue:核心线程满了以后,放入的队列 ArrayBlockingQueue:基于数组的阻塞队列LinkedBlockingQuene:基于链表的阻塞队列SynchronousQuene:不存元素,直接阻塞的队列PriorityBlockingQuene:具有优先级阻塞队列 threadFactory:创建线程的工场handler:饱和策略 AbortPolicy:直接抛出异常(默认策略)CallerRunsPolicy:调用者所在线程执行任务DiscardOldestPolicy:丢弃阻塞队列中最前任务DiscardPolicy:直接丢弃…自定义

线程池原理:

添加线程时,优先创建核心线程,当核心线程数满了以后,放入阻塞队列中,当阻塞队列满了以后创建非核心线程,核心线程中的任务执行完以后,从阻塞队列中获取

执行流程图:

源码分析

submit

public Future<?> submit(Runnable task) { if (task == null) { throw new NullPointerException(); } // 包装类 RunnableFuture<Void> ftask = newTaskFor(task, null); // 执行 execute(ftask); return ftask; }

execute:具体执行

public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } int c = ctl.get(); // 判断工作线程数是否达到核心线程数 // 如果没有到达核心线程数,添加核心工作线程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) { return; } c = ctl.get(); } // 如果线程池正在运行,将任务放入BlockingQueue队列中 if (isRunning(c) && workQueue.offer(command)) { // 如果放入队列成功 int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) { // 如果线程池结束了,执行饱和策略 reject(command); } else if (workerCountOf(recheck) == 0) { addWorker(null, false); } } else if (!addWorker(command, false)) { // 队列满了,并且添加非工作线程失败,进入饱和策略 reject(command); } }

addWorker

private boolean addWorker(Runnable firstTask, boolean core) { retry: // 自旋 for (;;) { int c = ctl.get(); // 判断线程池状态 int rs = runStateOf(c); // 如果线程池停止,直接返回false if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) { return false; } // 自旋+cas添加线程,直到添加成功或者不符合添加的条件 for (;;) { // 获取工作线程个数 int wc = workerCountOf(c); // 判断当前工作的线程是否大于等于线程池最大工作数 // 如果添加的是核心线程,判断工作的线程数是否超过核心线程数量 // 如果是非核心线程,判断工作的线程数是否超过总工作线程数 // 满足上面条件,直接返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) { return false; } // 修改核心线程数量,跳出全部循环 if (compareAndIncrementWorkerCount(c)) { break retry; } c = ctl.get(); // 如果状态有变化,跳出此层循环 if (runStateOf(c) != rs) { continue retry; } } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 包装成worker(继承AQS),并用线程工场类创建线程出来 // 设置此线程第一个工作任务 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 获取线程池状态 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) { throw new IllegalThreadStateException(); } // 添加到任务队列中 workers.add(w); int s = workers.size(); if (s > largestPoolSize) { largestPoolSize = s; } workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 如果工作线程添加成功,启动线程 t.start(); workerStarted = true; } } } finally { if (!workerStarted) { addWorkerFailed(w); } } return workerStarted; }
最新回复(0)