队列同步器 AQS原理解析—独占式获取同步状态

tech2022-07-15  187

在队列同步器 AbstractQueuedSynchronizer 的简单使用一篇我们简单介绍了AbstractQueuedSynchronizer的使用,本篇我们结合上一篇的示例介绍AQS的原理实现。在上一篇我们一直提到的一个词是等待队列,如果获取锁失败,则将当前线程放到等待队列,那么AbstractQueuedSynchronizer中的等待队列到底是什么呢?在之前我们先介绍同步队列。

同步队列是一个一个FIFO双向队列,用来来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。Node为AbstractQueuedSynchronizer内的一个内部类,定义如下:

static final class Node { static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /** * 等待状态 * SIGNAL:值为-1,后继节点的线程处于等待状态,而当前节点的线程如果释放可同步状态或者 * 被取消,将会通知后继节点,使得后继节点的线程得以运行。 * CANCELLED: 值为1,由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中 * 取消等待,节点进入该状态将不会变化 * CONDITION: 值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对 * Condition调用了signal()方法之后,该节点将会从等待队列转移到同步队列中,加入到同步队 * 列中的获取 * PROPAGATE: 值为-3表示下一次共享式同步状态获取将会无条件的传播下去 * */ volatile int waitStatus; /** * 前驱节点,当前节点加入同步队列时被设置,在尾部添加 */ volatile Node prev; /** * 后继节点 */ volatile Node next; /** * 获取同步状态的线程 */ volatile Thread thread; /** * 等待队列中的后继节点,如果当前节点是共享的那么该字段为SHARED常量,也就是说 * 节点类型(独占和共享)和等待队列中的后继节点共用一个字段 */ Node nextWaiter; ...... }

节点是构成同步队列(等待队列)的基础,同步器拥有首节点和尾节点,如果没有获取到同步状态,线程将会成为节点加入该队列的尾部,同步队列的基本结构如下图所示:

下面我们结合代码分析同步器是如何更改同步状态以及将线程加入到同步队列和等待队列的。因为同步器获取同步状态分为独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况。因此我们也从这两方面去分析。本篇博客主要介绍独占锁同步状态的获取与释放。再次之前我们需要了解同步器的模板方法。不熟悉的可以参考《队列同步器 AbstractQueuedSynchronizer 的简单使用》一篇博客。在我们获取同步状态是会调用模板方法如下代码所示:

//独占式获取锁 public final void acquire(int arg) { //调用重写方法获取同步状态,如果没有获取到同步状态,将当前线程加入到等待队列 if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

上面的方法中调用重写方法tryAquire(arg)获取同步状态,如果获取失败,则通过 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)操作将线程加入到等待队列,下面我们主要分析加入到等待队列的过程。首先是AddWaiter方法,代码如下:

private Node addWaiter(Node mode) { //将当前线程封装到Node中 Node node = new Node(Thread.currentThread(), mode); //将节点添加到尾节点 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; //如果尾节点为空,设置头结点 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) //头结点赋值为尾节点 tail = head; } else { 否则将节点设置尾节点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

上面代码通过使用compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全添加。在enq(final Node node)方法中,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程)这部分代码逻辑在acquireQueued方法中代码如下所示:

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

在acquireQueued(final Node node,int arg)方法中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态,如下图所示,为独占获取同步状态的流程图:

在上图中前驱节点为头节点且能够获取同步状态的判断条件和线程进入等待状态是获取同步状态的自旋过程。当同步状态获取成功之后,当前线程从acquire(int arg)方法返回,如果对于锁这种并发组件而言,代表着当前线程获取了锁。当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。代码如下所示:

public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }

在AbstractQueuedSynchronizer中主要是同步队列的添加,阅读源码时,了解Node类中的属性以及AbstractQueuedSynchronizer中的属性的含义,AbstractQueuedSynchronizer主要字段如下所示:

//头结点,指向第一个等待节点 private transient volatile Node head; //尾节点,指向最后一个等待节点 private transient volatile Node tail; //同步状态 private volatile int state;

 

最新回复(0)