背景
在支付付款(账户打款)场景中,我们调用支付接口给用户付款后,一般不会有异步回调告诉我们这笔订单的状态,需要我们开起一个线程 每隔1秒3秒5秒30秒5分钟30分钟1小时这样的时间间隔去查询订单的状态,如果经过4次查询也就是确定了支付最终状态(打款成功,打款失败)就去修改订单的状态,如果是打款中就需要隔一段时间再去查询。
如果查一笔订单我们就启动一个线程去查询订单状态,第一次查询付款中的话就让线程sleep1秒,第二次就让线程sleep3秒,第三次...... 直到确定了订单的状态或者走完时间间隔数组都没有确定订单状态再结束线程。
下面是一段伪测试代码:
public static void main(String[] args) { int[] frequency = { 1, 3, 5, 30, 60, 300, 600, 3600}; String orderNo = "123456"; Thread thread = new Thread(() -> { long startTime = System.currentTimeMillis(); for (int i = 0; i < frequency.length; i++) { try { Thread.sleep(frequency[i] * 1000); } catch (InterruptedException e) { e.printStackTrace(); } boolean result = query(i, orderNo); System.out.println("第" + (i + 1) + "次查询订单,查询是否确定订单状态:" + result); if (result) { System.out.println("确定了订单状态"); break; } } long endTime = System.currentTimeMillis(); System.out.println("执行总耗时:" + (endTime - startTime) / 1000); }); thread.start(); } private static boolean query (int index, String orderNo) { if (index >= 2) { //修改订单状态,付款成功或者失败 // changeOrderStatus(orderNo); return true; } return false; }打印信息: 第1次查询订单,查询是否确定订单状态:false 第2次查询订单,查询是否确定订单状态:false 第3次查询订单,查询是否确定订单状态:true 确定了订单状态 执行总耗时:9
上面的代码有几点问题
如果有很多付款的请求,系统需要开启很多的线程,一个线程就得1-2M的内存 容易造成系统崩溃如果订单一直没有成功或者失败(付款中 打款中)我们的线程将一直占用系统资源抽象出一个接口QueryTask(查询任务)实现Runnable接口,我们来一个查单请求就把它放入一个容器中,在用一个线程定时从容器中取那些到期的查询任务,然后交给jdk的线程池去执行这些查询任务。
任务执行器维护一个并发map(容器)里面存着定时任务,一个单线程每隔几秒从容器中取出到期的任务,一个线程池用来执行到期的任务队列。
@Component public class TaskExecutor implements Runnable { private final Logger logger = LoggerFactory.getLogger(getClass()); private String name; @Value("${ht.task.frequency}") private String frequency; private int[] queryFrequency; private AtomicInteger waitCount = new AtomicInteger(0); //任务队列为空次数达到多少次关闭线程 private int shutDownCount = 3; //当前线程是否在执行标志 private boolean runningFlag = false; public TaskExecutor() { this.name = "TaskExecutor-"; } @PostConstruct public void init() { String[] split = frequency.split(","); queryFrequency = new int[split.length]; for(int i = 0; i < split.length; i++) { queryFrequency[i] = Integer.parseInt(split[i]); } logger.info("任务执行器的执行频率初始化为:{}", StringUtils.join( split, ",")); } //一个任务执行器维护一个并发map ConcurrentHashMap<String,QueryTask> taskMap = new ConcurrentHashMap<>(); //一个执行器一个单线程检查map任务 private ScheduledExecutorService singleExecutorService = Executors.newSingleThreadScheduledExecutor(); //执行到期需要去查询的任务的线程池 private ExecutorService taskExecutorService = Executors.newCachedThreadPool(); /** * 队列中添加任务 * @param task */ public void addTask(QueryTask task) { if(!taskMap.containsKey(task.getName())) { taskMap.put(task.getName(), task); } else { logger.info("任务名称:{} 已经在队列中了", task.getName()); } } /** * 队列中刷新任务,重新开始 * @param task */ public void refreshTask(QueryTask task) { if(taskMap.containsKey(task.getName())) { logger.info("任务名称:{},刷新任务index为0", task.getName()); taskMap.get(task.getName()).refreshIndex(); } } public void startTask() { //如果任务队列为空,返回 if (taskMap.keySet().size() < 1) { logger.info("线程执行器名称:{},taskQueue size:{},启动失败", name, taskMap.size()); return; } //如果监听队列的单线程关闭了,重新启动 if (singleExecutorService.isShutdown()) { logger.info("线程执行器名称:{},关闭状态", name); singleExecutorService = Executors.newSingleThreadScheduledExecutor(); taskExecutorService = Executors.newCachedThreadPool(); singleExecutorService.scheduleAtFixedRate(this, 1, 3, TimeUnit.SECONDS); runningFlag = true; logger.info("线程执行器名称:{},启动", name); } if(runningFlag == false) { //判断当前线程是否已经关闭 singleExecutorService.scheduleAtFixedRate(this, 1, 3, TimeUnit.SECONDS); runningFlag = true; logger.info("线程执行器名称:{},启动", name); } else { logger.info("线程已启动"); } } /** * 获取可执行的任务 * @return */ private List<QueryTask> getExecutableQueryTaskFromQueue() { ArrayList<QueryTask> result = new ArrayList<>(); //要删掉的任务数组 long now = new Date().getTime(); for(String key : taskMap.keySet()) { QueryTask task = taskMap.get(key); long time = 0l;//相隔时间 for(int i = 0; i < task.getIndex(); i++) { time += queryFrequency[i]; } if( (now - task.getStartTime()) / 1000 > time) { result.add(task); task.indexIncrement(); } } return result; } /** * 删除完毕的任务 * 任务要么到达执行次数 * 任务要么已经查询到结果 */ private void deleteDoneTask() { ArrayList<String> deletes = new ArrayList<>(); for(String taskName : taskMap.keySet()) { QueryTask task = taskMap.get(taskName); if(task.getIndex() >= queryFrequency.length || task.getQueryFinishFlag() == true) { logger.info("任务名称:{},已完成", taskName); deletes.add(taskName); } } for(String taskName : deletes) { taskMap.remove(taskName); } } @Override public void run() { deleteDoneTask(); List<QueryTask> list = getExecutableQueryTaskFromQueue(); Optional<String> reduce = taskMap.keySet().stream().map(x -> taskMap.get(x).getName()).reduce((x, y) -> x + "," + y); logger.info("队列中的任务数量:{},可执行的任务数量: {}",taskMap.size() + "-" + (reduce.isPresent() ? reduce.get() : "空"), list.size()); if(taskMap.size() == 0) { int tmpWaitCount = waitCount.incrementAndGet(); logger.info("等待了 {} 次, 达到{}次关闭,{}", waitCount, shutDownCount, tmpWaitCount == shutDownCount ? "关闭taskExecutor" : ""); if(tmpWaitCount == 3) { singleExecutorService.shutdown(); taskExecutorService.shutdown(); } } else if(list.size() >= 1) { //重置waitCount waitCount = new AtomicInteger(0); //一个执行具体任务的线程执行器 for(QueryTask task : list) { taskExecutorService.submit(task); } } } }有了上面这些辅助类之后,我们在新增一个查单任务的时候,只需要编写查单方法 提供唯一任务标识。即重写getName() 和 query() 方法。
public QueryTaskImpl() { } public void setOrderNo(String orderNo) { this.orderNo = orderNo; } @Override public String getName() { return orderNo; } /** * * @return true表示查询完毕,不需要再查询了 */ @Override public boolean query() { logger.info("正在第 {} 次执行任务-{}", getIndex(), orderNo); if(getIndex() >= 3) { logger.info("查询结果确定了"); return true; } return false; } }打印的日志信息:
2020-09-04 11:00:24.036 INFO 13688 --- [nio-8001-exec-1] c.h.b.s.queryThread.TaskExecutor : 线程执行器名称:TaskExecutor-,启动 2020-09-04 11:00:25.040 INFO 13688 --- [pool-3-thread-1] c.h.b.s.queryThread.TaskExecutor : 队列中的任务数量:1-123,可执行的任务数量: 1 2020-09-04 11:00:25.042 INFO 13688 --- [pool-4-thread-1] c.h.b.s.queryThread.test.QueryTaskImpl : 正在第 1 次执行任务-123 2020-09-04 11:00:28.038 INFO 13688 --- [pool-3-thread-1] c.h.b.s.queryThread.TaskExecutor : 队列中的任务数量:1-123,可执行的任务数量: 1 2020-09-04 11:00:28.038 INFO 13688 --- [pool-4-thread-1] c.h.b.s.queryThread.test.QueryTaskImpl : 正在第 2 次执行任务-123 2020-09-04 11:00:31.040 INFO 13688 --- [pool-3-thread-1] c.h.b.s.queryThread.TaskExecutor : 队列中的任务数量:1-123,可执行的任务数量: 1 2020-09-04 11:00:31.040 INFO 13688 --- [pool-4-thread-1] c.h.b.s.queryThread.test.QueryTaskImpl : 正在第 3 次执行任务-123 2020-09-04 11:00:31.040 INFO 13688 --- [pool-4-thread-1] c.h.b.s.queryThread.test.QueryTaskImpl : 查询结果确定了 2020-09-04 11:00:34.041 INFO 13688 --- [pool-3-thread-1] c.h.b.s.queryThread.TaskExecutor : 任务名称:123,已完成 2020-09-04 11:00:34.041 INFO 13688 --- [pool-3-thread-1] c.h.b.s.queryThread.TaskExecutor : 队列中的任务数量:0-空,可执行的任务数量: 0 2020-09-04 11:00:34.041 INFO 13688 --- [pool-3-thread-1] c.h.b.s.queryThread.TaskExecutor : 等待了 1 次, 达到3次关闭, 2020-09-04 11:00:37.038 INFO 13688 --- [pool-3-thread-1] c.h.b.s.queryThread.TaskExecutor : 队列中的任务数量:0-空,可执行的任务数量: 0 2020-09-04 11:00:37.038 INFO 13688 --- [pool-3-thread-1] c.h.b.s.queryThread.TaskExecutor : 等待了 2 次, 达到3次关闭, 2020-09-04 11:00:40.038 INFO 13688 --- [pool-3-thread-1] c.h.b.s.queryThread.TaskExecutor : 队列中的任务数量:0-空,可执行的任务数量: 0 2020-09-04 11:00:40.038 INFO 13688 --- [pool-3-thread-1] c.h.b.s.queryThread.TaskExecutor : 等待了 3 次, 达到3次关闭,关闭taskExecutor