新手也能搞懂的线程池简介

tech2023-11-07  121

​线程池(Thread Pool),是一种基于池化思想的管理线程的工具,可以实现线程的复用,避免线程使用中频繁创建和销毁所带来的资源消耗。

一、使用线程池的优点

重用线程池中的线程,避免频繁地创建和销毁线程带来的性能消耗有效控制线程的最大并发数量,防止线程过大导致抢占资源造成系统阻塞提高线程的可管理性,可以对线程进行一定的管理

二、ThreadPoolExecutor类

在Java中,线程池主要通过ThreadPoolExecutor类实现。下面我们来认识一下这个类。ThreadPoolExecutor的继承关系如下图所示。

Executor顶层接口,提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。ExecutorService这个接口增加了一些扩充执行任务和管控线程池的方法,比如停止线程池的运行。AbstractExecutorService上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。ThreadPoolExecutor实现类,实现最复杂的运行部分,一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好地结合,从而执行并行任务。

ThreadPoolExecutor的运行流程如下图所示。

ThreadPoolExecutor的运行流程遵循这样的规则: 

当线程池中的核心线程数量未达到最大线程数时,启动一个核心线程去执行任务;如果线程池中的核心线程数量达到最大线程数时,那么任务会被插入到任务队列中排队等待执行;如果在上一步骤中任务队列已满,但是线程池中线程数量未达到限定线程总数,那么启动一个非核心线程来处理任务;如果上一步骤中线程数量达到了限定线程总量,那么线程池则拒绝执行该任务,且ThreadPoolExecutor会调用RejectedtionHandler的rejectedExecution方法来通知调用者。

简单地说,线程池运行调度的优先顺序为:核心线程—>阻塞队列—>非核心线程—>拒绝策略。下面这个流程图就直观多了。


三、ThreadPoolExecutor构造函数

我们来看一下ThreadPoolExecutor的构造函数。

public ThreadPoolExecutor(int corePoolSize,                           int maximumPoolSize,                           long keepAliveTime,                           TimeUnit unit,                           BlockingQueue<Runnable> workQueue) {     this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,          Executors.defaultThreadFactory(), defaultHandler); }

下面是各个参数的解释:

corePoolSize,线程池中核心线程的数量。默认情况下,即使核心线程没有任务在执行,它也依然存在不会被销毁,我们固定一定数量的核心线程,这样就避免了一般情况下CPU创建和销毁线程带来的开销。maximumPoolSize,线程池中的最大线程数。最大线程数=核心线程+非核心线程。当任务数量超过最大线程数时其它任务可能就会被阻塞。非核心线程,只有当核心线程不够用,且线程池有空余时才会被创建,执行完任务后非核心线程会在一定时间后被销毁。keepAliveTime,非核心线程空闲并等待工作的超时时长。若超过该时长没有被分配新任务,则该线程被回收。当allowCoreThreadTimeOut设置为true时,此属性也会作用在核心线程上,即核心线程超时也会被回收。workQueue,线程池中的任务队列,提交给线程池的Runnable会被存储在这个对象上。

四、具体示例

讲了这么多,我们来搞个简单的例子看一下。

public class ThreadPoolExecutorTest {     private void createThreadPool() {         ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(                 2,                 4,                 10,                 TimeUnit.SECONDS,                 new ArrayBlockingQueue<Runnable>(2));         // 向线程池中添加 10 个任务         for (int i = 1; i <= 10; i++) {             ThreadTask threadTask = new ThreadTask(i);             System.out.println("Add Task:" + i + ", at time: "                     + System.currentTimeMillis());             threadPoolExecutor.execute(threadTask);         }         threadPoolExecutor.shutdown();     }     private synchronized int getQueueSize(Queue queue) {         return queue.size();     }     public static void main(String[] args) {         ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();         test.createThreadPool();     }     static class ThreadTask implements Runnable {         // 当前任务序号         private int index;         ThreadTask(int index) {             this.index = index;         }         public void run() {             System.out.println(Thread.currentThread().getName() + " Task:"                     + index + ",at time : " + System.currentTimeMillis());             try {                 Thread.sleep(3000);                 System.out.println("Task: " + index + " finished");             } catch (InterruptedException e) {                 e.printStackTrace();             }         }     } }

代码很简单。在ThreadPoolTest类中,首先构造一个ThreadPoolExecutor对象实现线程池,并向其中连续添加10个相同的任务。任务的具体内容在ThreadTask类中实现,这里可以看到只是通过让Thread休眠3秒,来模拟耗时任务,非常简单。下面来看看ThreadPoolExecutor的参数调整所产生的不同结果。

例1

注意构造函数中的参数设定。其中,核心线程数为2,总线程数为6,队列容量为4,可以发现刚好可以容纳处理10个任务。运行结果如下。

由结果可见,2个核心线程(pool1-1-thread-1,pool-1-thread-2)首先开始执行任务Task1和Task2。接着,Task3,4,5,6进入等待队列,剩下的Task7,8,9,10则创建非核心线程(pool-1-thread-3,4,5,6)完成。这是一个任务数量和线程池参数刚刚好的情况,舒服。


例2

下面看第二种情况。等待队列的容量从原来的4变为了10

现在来看一下运行结果。

由结果可见,由于改变了队列长度,使得队列足够长,可以容纳所有未执行的任务。在任务执行全程,没有非核心线程被创建,仅由两个核心线程逐个完成所有任务。


例3

这里再次改变队列大小,从原来的10变为2。线程池可同时处理的任务数量变为8。

运行结果如下。

由结果可见,当添加任务至Task9,10时,程序报出异常:“Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task ThreadPoolExecutorTest$ThreadTask@135fbaa4 rejected from java.util.concurrent.ThreadPoolExecutor@45ee12a7”。即任务被线程池无情拒绝了。这里就要讲到线程池的拒绝策略了。


五、拒绝策略

被线程池拒绝的任务将交由RejectedExecutionHandler类来处理。RejectedExecutionHandler提供了四种任务拒绝策略:

AbortPolicy(默认)  -- 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。上例中出现的异常即  为这种情况。

CallerRunsPolicy    -- 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的调用线程中处理被拒绝的任务。(即不用线程池中的线程执行,而是交给调用方的线程来执行)

DiscardOldestPolicy   -- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。

DiscardPolicy       -- 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。

下面分别展示一下CallerRunsPolicy和DiscardOldestPolicy两种策略的例子。


1. CallerRunsPolicy

可以看到,添加CallerRunsPolicy后,没有任务被抛弃,而是将Task9任务分配到main线程中执行了。Task10则在池中线程空闲后被pool-1-thread-3执行。


2. DiscardOldestPolicy

可以看到,Task3,4没有完成,被线程池舍弃掉了(被Task9,10挤掉了,因为Task3,4是最先入队的,就是所谓的Oldest,被discard掉了,太惨了)。


自定义拒绝策略

我们可以通过实现RejectedExecutionHandler接口来自定义拒绝策略。将对被拒绝任务要执行的操作复写在方法rejectedExecution中。这里我只是简单地打出任务被拒绝的信息“is discarded”。在实际运用中还没遇到具体的使用场景,以后碰上了再添上吧。

​​

最新回复(0)