JUC的同步结构

tech2026-06-04  1

信号量使用代码

public class Main { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量,只允许 3个线程同时访问 Semaphore semaphore = new Semaphore(4); for (int i=0;i<10;i++){ final long num = i; executorService.submit(new Runnable() { @Override public void run() { try { //获取许可 semaphore.acquire(); //执行 System.out.println("Accessing: " + num); Thread.sleep(1000); // 模拟随机执行时长 //释放 System.out.println("Release..." + num);semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } executorService.shutdown(); }

信号量控制每次最多能有几个线程同时访问资源,它类似一个计数器,acquire减少,release增加。

信号量部分代码分析: 每个线程到来,acquire对状态量增加,当状态量为0时,再过来的线程进入CLH队列。当有线程执行release方法时,对CLH队列中的线程尝试唤醒。

门栓使用代码

import java.util.Arrays; import java.util.Comparator; import java.util.Scanner; import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; public class Main { volatile static boolean flag = false; volatile static AtomicInteger p = new AtomicInteger(0); static CountDownLatch cd = new CountDownLatch(3); public static void main(String[] args) throws InterruptedException { Scanner sc = new Scanner(System.in); for(int i=0;i<3;i++) { new Thread(()->{ try { System.out.println("beg"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } cd.countDown(); System.out.println("Thread is: "+ Thread.currentThread().getName()); }).start(); } // Thread.sleep(1000); cd.await(); System.out.println("ok we all here"); } }

关键代码分析: countDownLatch()对门栓数量减少,用await进行多线程同步操作,当门栓数量减为0时触发事件,

public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } //如果state不为0,那么就返回false,不会执行 doReleaseShared(); 方法,当state为0,也就是最后一个线程到了触发唤醒事件,执行doReleaseShared() 唤醒。 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } //唤醒操作 private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } // await是将到来的线程放到CLH队列里 等待条件满足 state==0进行唤醒操作
最新回复(0)