三分钟弄清楚定时任务ScheduledExecutorService实现

tech2024-11-20  28

现在项目中基本不会自己定义一个ScheduledExecutorService来执行定时任务。都是用第三方框架比如xxl-job之类的,不过了解最基本的定时任务原理还是很有必要的。 ​

ScheduledExecutorService简单示例

我们先来看最简单的使用,代码如下图:

ScheduledThreadPoolExecutor是ScheduledExecutorService的实现,能做一些基本的定时任务功能。

比如上图的例子,可以延迟3秒后每隔5秒执行任务。实现起来比较简单。

基础源码分析

首先ScheduledThreadPoolExecutor继承了ThreadPoolExecutor并实现了ScheduledExecutorService,所以它拥有线程池的功能。

然后我们看它的构造方法,它有几个构造方法,我们以上面示例的为例,方法中只有一行代码如下:

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());

调用了父类的构造方法,也就是线程池的构造方法,它自身并没有实现任务事情,除了new了一个DelayedWorkQueue()给父类,这个是给线程池存放任务队列的。

提交任务方法源码

ScheduledThreadPoolExecutor提供了几个执行定时任务的方法,不过我们还是分析上面用到的scheduleAtFixedRate方法,查看源码如下图:

这个方法一共就三步:

首先初始化ScheduledFutureTask,它继承了FutureTask,扩展了一些定时任务需要的属性,比如下次执行时间、每次任务执行间隔。

然后调用decorateTask方法装饰任务,目前没有做任务事情,我们可以自定义实现一些功能。

最后调用delayedExecute方法,从上图可以看到这个方法主要流程也简单,首先是把任务放到线程池的队列中,然后调用ensurePrestart方法,ensurePrestart方法是线程池的方法,作用是根据线程池线程数调用addWorker方法创建线程,addWorker方法就不多说了,不清楚的同学可以看我前面几篇的《三分钟弄懂线程池执行过程》。

提交定时任务的方法还是很简单的,包装一个任务(用于保存定时任务需要的信息),然后提交到线程池的队列中。

但是有一个问题是,任务提交到线程池了,我们知道线程池能够执行任务,但是都是执行一次啊,不会像定时任务那样执行任务的!那它到底是怎么实现定时任务功能的呢?

实现定时任务关键第一步

从之前文章分析线程池中最终执行的是提交的任务那个对象的run方法,所以ScheduledFutureTask对run方法有特殊的实现,跟进源码发现run方法正常的流程就两步:

调用ScheduledFutureTask.super.runAndReset()也就是调用了父类的FutureTask的runAndReset()方法,这个方法没有什么在上上篇文章有讲,就是调用内部的Callable的run,保存结果;

在上一个执行完成也就是任务完成后,调用了setNextRunTime();reExecutePeriodic(outerTask);两步代码,第一步是计算下次任务执行时间,第二个是把任务放到队列中;

run方法执行了具体的方法,在任务执行成功后重新计算了任务下次执行的时间,并再次把任务加载到了队列中,重复执行。

但是到目前为止也只是实现了规定了在什么时候执行与重复执行,那么到底是如何实现在具体的时间执行呢?

延迟队列实现定时执行

通过还是没有发现如何实现定时执行的,没办法只能继续跟进代码,最后跟到线程池的runWorker方法,在获取Worker的firstTask方法来执行,想起刚刚在调用ensurePrestart方法中调用addWorker方法传递的firstTask都是null,也就是说Worker执行的任务都是从队列中拉取的任务,他们都没有自己的firstTask。

也就想起了在初始化方法给线程池传递的队列是new DelayedWorkQueue();所以应该基本找到问题了,我们来看队列的take方法:

从源码可以看到是根据队列中第一个元素,然后利用ScheduledFutureTask的getDelay方法来判断是否返回任务或者阻塞,getDelay是根据属性time(前面提到的下次任务执行时间)减去当前时间。

那么如果第一个任务进来判断是10秒后执行,第二个任务进来判断是5秒后执行,那么第二个任务是不是等第一个任务执行完才能执行呢?

很明显不能这么实现,这里就要看DelayedWorkQueue的add方法了,它并没有实现add方法,不过add方法调用的offer,DelayedWorkQueue实现了offer方法。

offer方法也简单,就是把任务加到queue数组中,如果数组为null则直接加进去,并唤醒take哪里的线程,如果不为null则比较数组中任务的time,越近的越在前面,如果新加进来的任务被设置到了数组的第一个,则唤醒take那里阻塞的线程。

总结

ScheduledExecutorService继承线程池,也是把任务提交给线程池执行,只不过它的任务类进行扩展。

任务类ScheduledFutureTask继承FutureTask并扩展了一些属性来记录任务下次执行时间和每次执行间隔。同时重写了run方法重新计算任务下次执行时间,并把任务放到线程池队列中。

ScheduledExecutorService自定义了阻塞队列DelayedWorkQueue给线程池使用,它可以根据ScheduledFutureTask的下次执行时间来阻塞take方法,并且新进来的ScheduledFutureTask会根据这个时间来进行排序,最小的最前面。

总结起来就是通过线程池来执行任务,ScheduledFutureTask记录任务定时信息,DelayedWorkQueue来排序任务定时执行。

Java程序员日常学习笔记,如理解有误欢迎各位交流讨论!

最新回复(0)