CountDownLatch的使用和原理解析

tech2025-05-07  3

背景

1、countDownLatch是在java1.5被引入,跟它一起被引入的工具类还有CyclicBarrier、Semaphore、concurrentHashMap和BlockingQueue。

2、存在于java.util.cucurrent包下。

CountDownLatch概念

1、countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。

2、是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。

3、CountDownLatch可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景。

CountDownLatch常用方法说明

CountDownLatch(int count); //构造方法,创建一个值为count 的计数器。 await();//阻塞当前线程,将当前线程加入阻塞队列。 await(long timeout, TimeUnit unit);//在timeout的时间之内阻塞当前线程,时间一过则当前线程可以执行, countDown();//对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。

代码示例

模拟场景:

1、分别统计4个指标用户新增数量、订单数量、商品的总销量、总销售额;

2、假设每个指标执行时间为3秒。如果是串行化的统计方式那么总执行时间会为12秒。

3、我们这里使用多线程并行,开启4个子线程分别进行统计

4、主线程等待4个子线程都执行完毕之后,返回结果给前端。

public class CountDownLatchDemo { // 用于保存结果 private static Map map = new HashMap<>(); // 创建计数器 private static CountDownLatch countDownLatch = new CountDownLatch(4); public static void main(String[] args) { long startTime = System.currentTimeMillis(); new Thread(()->{ try { System.out.println("正在统计新增用户数量"); Thread.sleep(3000);//任务执行需要3秒 map.put("userNumber",1);//保存结果值 countDownLatch.countDown();//标记已经完成一个任务 System.out.println("统计新增用户数量完毕"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { System.out.println("正在统计订单数量"); Thread.sleep(3000);//任务执行需要3秒 map.put("countOrder",2);//保存结果值 countDownLatch.countDown();//标记已经完成一个任务 System.out.println("统计订单数量完毕"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { System.out.println("正在总销售额"); Thread.sleep(3000);//任务执行需要3秒 map.put("countmoney",3);//保存结果值 countDownLatch.countDown();//标记已经完成一个任务 System.out.println("统计销售额完毕"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { System.out.println("正在商品销量"); Thread.sleep(3000);//任务执行需要3秒 map.put("countGoods",4);//保存结果值 countDownLatch.countDown();//标记已经完成一个任务 System.out.println("统计商品销量完毕"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); try { //主线程等待所有统计指标执行完毕 countDownLatch.await(); long endTime=System.currentTimeMillis(); //记录结束时间 System.out.println("------统计指标全部完成--------"); System.out.println("统计结果为:"+map.toString()); System.out.println("任务总执行时间为"+(endTime-startTime)/1000+"秒"); } catch (InterruptedException e) { e.printStackTrace(); } } }

执行结果

正在统计新增用户数量 正在总销售额 正在统计订单数量 正在商品销量 统计销售额完毕 统计商品销量完毕 统计订单数量完毕 ------统计指标全部完成-------- 统计结果为:{countmoney=3, countOrder=2, userNumber=1, countGoods=4} 任务总执行时间为3秒 统计新增用户数量完毕

CountDownLatch实现原理

1、创建计数器

当我们调用CountDownLatch countDownLatch=new CountDownLatch(4) 时候,此时会创建一个AQS的同步队列,并把创建CountDownLatch 传进来的计数器赋值给AQS队列的 state,所以state的值也代表CountDownLatch所剩余的计数次数;

public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count);//创建同步队列,并设置初始计数器值 }

2、阻塞线程

当我们调用countDownLatch.wait()的时候,会创建一个节点,加入到AQS阻塞队列,并同时把当前线程挂起

public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1);

判断计数器是技术完毕,未完毕则把当前线程加入阻塞队列

public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //锁重入次数大于0 则新建节点加入阻塞队列,挂起当前线程 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }

构建阻塞队列的双向链表,挂起当前线程

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //新建节点加入阻塞队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //获得当前节点pre节点 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg);//返回锁的state if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //重组双向链表,清空无效节点,挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

3、计数器递减

当我们调用countDownLatch.down()方法的时候,会对计数器进行减1操作,AQS内部是通过释放锁的方式,对state进行减1操作,当state=0的时候证明计数器已经递减完毕,此时会将AQS阻塞队列里的节点线程全部唤醒

public void countDown() { //递减锁重入次数,当state=0时唤醒所有阻塞线程 sync.releaseShared(1); } public final boolean releaseShared(int arg) { //递减锁的重入次数 if (tryReleaseShared(arg)) { doReleaseShared();//唤醒队列所有阻塞的节点 return true; } return false; } private void doReleaseShared() { //唤醒所有阻塞队列里面的线程 for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {//节点是否在等待唤醒状态 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改状态为初始 continue; unparkSuccessor(h);//成功则唤醒线程 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
最新回复(0)