1、原理介绍
简单说下redis实现延迟队列的原理:把所有需要延时执行的任务添加到有序集合里面;并将任务的执行时间设置为分值,另外再使用另一个线程来查找有序集合里面是否存在可以被立即执行的任务,如果有的话就从有序集合里面移除那个任务,并将其添加到另一个执行队列里面。
而 Redisson 封装了接口给我们使用,我们只需要调用接口就可以直接使用,不需要关心redis具体实现原理。
2、使用
2.1 引入pom依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.3</version>
</dependency>
2.2 创建任务实体:
import java.io.Serializable;
public class Work implements Serializable{
private static final long serialVersionUID = -9157119130546758211L;
// 订单ID
private String id;
// 延迟时间:秒
private int times;
public Work(String id, int times) {
this.id = id;
this.times = times;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getTimes() {
return times;
}
public void setTimes(int times) {
this.times = times;
}
}
2.3 创建获取任务线程,使用while不间断获取延迟队列的任务
public class TestDelayMessage {
public static void main(String[] args) throws InterruptedException {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
Redisson redisson = (Redisson) Redisson.create(config);
String key = "delay_queue";
RBlockingQueue<Work> blockingFairQueue = redisson.getBlockingQueue(key);
RDelayedQueue<Work> delayedQueue = redisson.getDelayedQueue(blockingFairQueue);
while (true) {
System.out.println("等待获取任务");
Work work = blockingFairQueue.take();
// 应该把work扔进其他执行队列,这里直接模拟执行
System.out.println("执行任务,任务ID:" + work.getId() + " 执行时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("YYYY-MM-dd hh:mm:ss")));
}
// redisson.shutdown();
}
}
2.4 创建生产任务实体,将延迟任务放入到队列
import java.util.concurrent.TimeUnit;
import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.config.Config;
public class DelayMessageProducer {
static String key = "delay_queue";
public static void main(String[] args) {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
Redisson redisson = (Redisson) Redisson.create(config);
Work work1 = new Work("1", 3);
addQueue(redisson, work1, work1.getTimes(), TimeUnit.SECONDS);
Work work2 = new Work("2", 5);
addQueue(redisson, work2, work2.getTimes(), TimeUnit.SECONDS);
Work work3 = new Work("3", 8);
addQueue(redisson, work3, work3.getTimes(), TimeUnit.SECONDS);
Work work4 = new Work("4", 10);
addQueue(redisson, work4, work4.getTimes(), TimeUnit.SECONDS);
System.out.println("任务放置完毕");
redisson.shutdown();
}
public static void addQueue(Redisson redisson, Work work, long delay, TimeUnit timeUnit) {
RBlockingQueue<Work> blockingFairQueue = redisson.getBlockingQueue(key);
RDelayedQueue<Work> delayQueue = redisson.getDelayedQueue(blockingFairQueue);
delayQueue.offer(work, delay, timeUnit);
delayQueue.destroy();
}
}
3.测试
首先启动获取任务队列,等待任务下达:
再启动下发任务线程:
最后切回接受任务console查看任务执行状态: