信号量控制每次最多能有几个线程同时访问资源,它类似一个计数器,acquire减少,release增加。
信号量部分代码分析: 每个线程到来,acquire对状态量增加,当状态量为0时,再过来的线程进入CLH队列。当有线程执行release方法时,对CLH队列中的线程尝试唤醒。
关键代码分析: countDownLatch()对门栓数量减少,用await进行多线程同步操作,当门栓数量减为0时触发事件,
public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } //如果state不为0,那么就返回false,不会执行 doReleaseShared(); 方法,当state为0,也就是最后一个线程到了触发唤醒事件,执行doReleaseShared() 唤醒。 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } //唤醒操作 private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } // await是将到来的线程放到CLH队列里 等待条件满足 state==0进行唤醒操作