线程池(Thread Pool),是一种基于池化思想的管理线程的工具,可以实现线程的复用,避免线程使用中频繁创建和销毁所带来的资源消耗。
在Java中,线程池主要通过ThreadPoolExecutor类实现。下面我们来认识一下这个类。ThreadPoolExecutor的继承关系如下图所示。
Executor:顶层接口,提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。ExecutorService:这个接口增加了一些扩充执行任务和管控线程池的方法,比如停止线程池的运行。AbstractExecutorService:上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。ThreadPoolExecutor:实现类,实现最复杂的运行部分,一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好地结合,从而执行并行任务。ThreadPoolExecutor的运行流程如下图所示。
ThreadPoolExecutor的运行流程遵循这样的规则:
当线程池中的核心线程数量未达到最大线程数时,启动一个核心线程去执行任务;如果线程池中的核心线程数量达到最大线程数时,那么任务会被插入到任务队列中排队等待执行;如果在上一步骤中任务队列已满,但是线程池中线程数量未达到限定线程总数,那么启动一个非核心线程来处理任务;如果上一步骤中线程数量达到了限定线程总量,那么线程池则拒绝执行该任务,且ThreadPoolExecutor会调用RejectedtionHandler的rejectedExecution方法来通知调用者。简单地说,线程池运行调度的优先顺序为:核心线程—>阻塞队列—>非核心线程—>拒绝策略。下面这个流程图就直观多了。
我们来看一下ThreadPoolExecutor的构造函数。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }下面是各个参数的解释:
corePoolSize,线程池中核心线程的数量。默认情况下,即使核心线程没有任务在执行,它也依然存在不会被销毁,我们固定一定数量的核心线程,这样就避免了一般情况下CPU创建和销毁线程带来的开销。maximumPoolSize,线程池中的最大线程数。最大线程数=核心线程+非核心线程。当任务数量超过最大线程数时其它任务可能就会被阻塞。非核心线程,只有当核心线程不够用,且线程池有空余时才会被创建,执行完任务后非核心线程会在一定时间后被销毁。keepAliveTime,非核心线程空闲并等待工作的超时时长。若超过该时长没有被分配新任务,则该线程被回收。当allowCoreThreadTimeOut设置为true时,此属性也会作用在核心线程上,即核心线程超时也会被回收。workQueue,线程池中的任务队列,提交给线程池的Runnable会被存储在这个对象上。讲了这么多,我们来搞个简单的例子看一下。
public class ThreadPoolExecutorTest { private void createThreadPool() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2)); // 向线程池中添加 10 个任务 for (int i = 1; i <= 10; i++) { ThreadTask threadTask = new ThreadTask(i); System.out.println("Add Task:" + i + ", at time: " + System.currentTimeMillis()); threadPoolExecutor.execute(threadTask); } threadPoolExecutor.shutdown(); } private synchronized int getQueueSize(Queue queue) { return queue.size(); } public static void main(String[] args) { ThreadPoolExecutorTest test = new ThreadPoolExecutorTest(); test.createThreadPool(); } static class ThreadTask implements Runnable { // 当前任务序号 private int index; ThreadTask(int index) { this.index = index; } public void run() { System.out.println(Thread.currentThread().getName() + " Task:" + index + ",at time : " + System.currentTimeMillis()); try { Thread.sleep(3000); System.out.println("Task: " + index + " finished"); } catch (InterruptedException e) { e.printStackTrace(); } } } }代码很简单。在ThreadPoolTest类中,首先构造一个ThreadPoolExecutor对象实现线程池,并向其中连续添加10个相同的任务。任务的具体内容在ThreadTask类中实现,这里可以看到只是通过让Thread休眠3秒,来模拟耗时任务,非常简单。下面来看看ThreadPoolExecutor的参数调整所产生的不同结果。
注意构造函数中的参数设定。其中,核心线程数为2,总线程数为6,队列容量为4,可以发现刚好可以容纳处理10个任务。运行结果如下。
由结果可见,2个核心线程(pool1-1-thread-1,pool-1-thread-2)首先开始执行任务Task1和Task2。接着,Task3,4,5,6进入等待队列,剩下的Task7,8,9,10则创建非核心线程(pool-1-thread-3,4,5,6)完成。这是一个任务数量和线程池参数刚刚好的情况,舒服。
下面看第二种情况。等待队列的容量从原来的4变为了10。
现在来看一下运行结果。
由结果可见,由于改变了队列长度,使得队列足够长,可以容纳所有未执行的任务。在任务执行全程,没有非核心线程被创建,仅由两个核心线程逐个完成所有任务。
这里再次改变队列大小,从原来的10变为2。线程池可同时处理的任务数量变为8。
运行结果如下。
由结果可见,当添加任务至Task9,10时,程序报出异常:“Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task ThreadPoolExecutorTest$ThreadTask@135fbaa4 rejected from java.util.concurrent.ThreadPoolExecutor@45ee12a7”。即任务被线程池无情拒绝了。这里就要讲到线程池的拒绝策略了。
被线程池拒绝的任务将交由RejectedExecutionHandler类来处理。RejectedExecutionHandler提供了四种任务拒绝策略:
•AbortPolicy(默认) -- 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。上例中出现的异常即 为这种情况。
•CallerRunsPolicy -- 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的调用线程中处理被拒绝的任务。(即不用线程池中的线程执行,而是交给调用方的线程来执行)
•DiscardOldestPolicy -- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
•DiscardPolicy -- 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。
下面分别展示一下CallerRunsPolicy和DiscardOldestPolicy两种策略的例子。
可以看到,添加CallerRunsPolicy后,没有任务被抛弃,而是将Task9任务分配到main线程中执行了。Task10则在池中线程空闲后被pool-1-thread-3执行。
可以看到,Task3,4没有完成,被线程池舍弃掉了(被Task9,10挤掉了,因为Task3,4是最先入队的,就是所谓的Oldest,被discard掉了,太惨了)。
我们可以通过实现RejectedExecutionHandler接口来自定义拒绝策略。将对被拒绝任务要执行的操作复写在方法rejectedExecution中。这里我只是简单地打出任务被拒绝的信息“is discarded”。在实际运用中还没遇到具体的使用场景,以后碰上了再添上吧。