Java为我们提供了一系列的可用于并发编程的同步组件,利用这些可以简化我们的开发,而要了解这些组件,AQS是一个无法绕开的话题。
AQS全称AbstractQueuedSynchronizer,翻译过来就是抽象队列化的同步器,我们必须要了解它,吃透它。一方面,它为我们常见的各种并发工具如(CountDownLatch,ReentrantLock等)提供了技术支持,另一方面,掌握了这个,基本上就掌握了同步组件的各类实现原理。虽然这个类有2000多行,但除去注释应该不超过1000行,还在可接受范围内。
本文尝试通过分析源码的方式对AQS进行学习,并最后给出一个应用分析。
二、介绍
这一部分我们参考这个类的定义,来介绍AQS的作用和应用场景。先来看介绍(下面为对原类介绍的一些翻译):
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState(), setState(int) and compareAndSetState(int, int) is tracked with respect to synchronization.
本类是一个基于先进先出(FIFO)等待队列来实现阻塞锁和关联同步器(如信号量,事件等)的框架,通过一个int型原子变量表示状态,来为大多数同步器提供基础支持。子类必须实现那些用于改变该变量值,以及定义该变量的作用,以用于那些表示对象的获取和释放。基于此,本类的其它方法会实现所有的查询和阻塞机制。子类也可以维护其它状态,但只有通过getState,setState和compareAndSetState这几个原子性维护状态变量的方法才会被认为是同步操作。
Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class. Class AbstractQueuedSynchronizer does not implement any synchronization interface. Instead it defines methods such as acquireInterruptibly(int) that can be invoked as appropriate by concrete locks and related synchronizers to implement their public methods.
子类应该以非public的内部辅助类的形式来定义,用作实现包含该内部类所需要的一些同步机制。AbstractQueuedSynchronizer 不实现任何同步接口,而是定义了一些辅助方法,比如acquireInterruptibly(int) ,可以在具体同步器在实现其锁定和释放的公共方法时被调用。
This class supports either or both a default exclusive mode and a shared mode. When acquired in exclusive mode, attempted acquires by other threads cannot succeed. Shared mode acquires by multiple threads may (but need not) succeed. This class does not "understand" these differences except in the mechanical sense that when a shared mode acquire succeeds, the next waiting thread (if one exists) must also determine whether it can acquire as well. Threads waiting in the different modes share the same FIFO queue. Usually, implementation subclasses support only one of these modes, but both can come into play for example in a ReadWriteLock. Subclasses that support only exclusive or only shared modes need not define the methods supporting the unused mode.
本类提供独占模式或共享模式,两者也可同时提供。当一个线程在独占模式下获取到锁之后,其它线程再尝试获取将会失败。而多个线程在共享模式下获取锁则可能会成功(但不是必须的)。本类并不“懂得”当一个线程在共享模式下是否获得锁有什么差区,除非指定场景,下一个等待的线程(如果存在)必须确定其是否可以获得锁。在不同模式等待的线程共享同样的FIFO队列。通常来说,子类的实现只支持其中一种模式,但两者是可以同时支持的,比如在ReadWriteLock里。如果一个子类只支持独占或者只支持共享模式,那它没有必要再提供没有用到的另一种模式的相关方法的实现。
This class defines a nested AbstractQueuedSynchronizer.ConditionObject class that can be used as a Condition implementation by subclasses supporting exclusive mode for which method isHeldExclusively() reports whether synchronization is exclusively held with respect to the current thread, method release(int) invoked with the current getState() value fully releases this object, and acquire(int), given this saved state value, eventually restores this object to its previous acquired state. No AbstractQueuedSynchronizer method otherwise creates such a condition, so if this constraint cannot be met, do not use it. The behavior of AbstractQueuedSynchronizer.ConditionObject depends of course on the semantics of its synchronizer implementation.
本类定义了一个内部的类AbstractQueuedSynchronizer.ConditionObject,可以被子类用于支持独占模式的条件实现,方法isHeldExclusively() 用于判断独占锁是否被当前线程占有,被 getState()调用的方法release(int) 完全释放了这个对象。acquire(int), 在给定待保存的值之后,将这个对象恢复到其之前的状态。AbstractQueuedSynchronizer没有方法创建这个对象,所以如果用不到这个约束的话,就不要用它。ConditionObject 的行为完全依赖于具体同步器的实现语义。
This class provides inspection, instrumentation, and monitoring methods for the internal queue, as well as similar methods for condition objects. These can be exported as desired into classes using an AbstractQueuedSynchronizer for their synchronization mechanics.
本类提供对内部队列和条件对象的监测与监控方法。这些方法可以暴露给使用AQS的类,以协助它们实现同步语义。
Serialization of this class stores only the underlying atomic integer maintaining state, so deserialized objects have empty thread queues. Typical subclasses requiring serializability will define a readObject method that restores this to a known initial state upon deserialization.
本类序列化之后只会保存内部的state变量的值,所以反序列化后的对象的线程队列将是空的。需要序列化的子类通常会定义readObject方法来将这个存储为一个已知初始化状态,以便反序列化使用。
------------------------------
从上面的介绍,大致知道了这个类的作用,用法和一些要注意的问题,下面我们继续看其推荐的用法示例。
To use this class as the basis of a synchronizer, redefine the following methods, as applicable, by inspecting and/or modifying the synchronization state using getState(), setState(int) and/or compareAndSetState(int, int):
如果要把本类作为一个基础的同步器的话,需要重新定义以下方法,对于同步状态的查询和修改则可以用getState,setState(int), compareAndSetState(int, int):
tryAcquire(int) 尝试获取独占锁
tryRelease(int) 尝试释放独占锁
tryAcquireShared(int) 尝试获取共享锁
tryReleaseShared(int) 尝试释放共享锁
isHeldExclusively() 判断当前线程是否持有共享锁
Each of these methods by default throws UnsupportedOperationException. Implementations of these methods must be internally thread-safe, and should in general be short and not block. Defining these methods is the only supported means of using this class. All other methods are declared final because they cannot be independently varied.
每个方法定义时都会抛出UnsupportedOperationException,这些方法的实现需要是线程安全的,执行时间上应该短,不能被阻塞。定义这些方法的实现是使用这个类的唯一方式,其它方法都被定义成了final,因为他们不能被独立改变。
You may also find the inherited methods from AbstractOwnableSynchronizer useful to keep track of the thread owning an exclusive synchronizer. You are encouraged to use them -- this enables monitoring and diagnostic tools to assist users in determining which threads hold locks.
您也可以从AbstractOwnableSynchronizer 继承的方法里找到一些用于追踪持有独占锁的线程的方法。这些方法是被推荐使用的--可以让监控和统计工具帮助用户发现哪些线程持有锁。
Even though this class is based on an internal FIFO queue, it does not automatically enforce FIFO acquisition policies. The core of exclusive synchronization takes the form:
虽然本类基于内部的FIFO队列,但它并没有增强FIFO机制,核心的独占机制表现形式如下(共享模式与之类似):
Because checks in acquire are invoked before enqueuing, a newly acquiring thread may barge ahead of others that are blocked and queued. However, you can, if desired, define tryAcquire and/or tryAcquireShared to disable barging by internally invoking one or more of the inspection methods, thereby providing a fair FIFO acquisition order. In particular, most fair synchronizers can define tryAcquire to return false if hasQueuedPredecessors() (a method specifically designed to be used by fair synchronizers) returns true. Other variations are possible.
由于检查能否获得锁是在入队之前,那么一个新的请求锁的线程可能会在其它阻塞并排队的线程之前获得锁。不过如果有需要的话,可以在定义tryAcquire及tryAcquireShared时,通过调用内部的一个或多方法来禁止这种行为,通过提供公平的FIFO获取顺序来保证。在实现上,大多数公平同步器可以当hasQueuedPredecessors方法(一个为公平同步器提供的方法)返回true时,让tryAcquire返回false.其它方式也是可以的。
Throughput and scalability are generally highest for the default barging (also known as greedy, renouncement, and convoy-avoidance) strategy. While this is not guaranteed to be fair or starvation-free, earlier queued threads are allowed to recontend before later queued threads, and each recontention has an unbiased chance to succeed against incoming threads. Also, while acquires do not "spin" in the usual sense, they may perform multiple invocations of tryAcquire interspersed with other computations before blocking. This gives most of the benefits of spins when exclusive synchronization is only briefly held, without most of the liabilities when it isn't. If so desired, you can augment this by preceding calls to acquire methods with "fast-path" checks, possibly prechecking hasContended() and/or hasQueuedThreads() to only do so if the synchronizer is likely not to be contended.
默认的抢占(又称为greedy,renouncement和convoy-avoidance)具备最好的扩展性和吞吐量,这种策略不能保证公平或无饥饿,但早入队的线程允许在后面的线程之前重新入队,每一个重入的线程都有和后续的线程同样的机会去获得锁。同时,由于获取锁在通常情况下不是自旋模式,可以在阻塞前,和其它线程的竞争间隙里发起更多的tryAcquire调用。这个表示大多数独占锁时的自旋只是短暂的挂起,当其不自旋时则不需要任何负担。如果需要如此,可能通过预调用带快速路径检查的获取锁的方法。仅在同步器不存在竞争的情况下,可以通过调用 hasContented() 或 hasQueueedThreads() 来进行预检查。(这一段翻译太难了,大概意思就是说默认的非公平策略的一些优势吧)
This class provides an efficient and scalable basis for synchronization in part by specializing its range of use to synchronizers that can rely on int state, acquire, and release parameters, and an internal FIFO wait queue. When this does not suffice, you can build synchronizers from a lower level using atomic classes, your own custom Queue classes, and LockSupport blocking support.
本类通过定制化基于int 类型的状态,获取和释放锁的参数,以及一个内部的等待队列,来提供了一个高效和灵活的同步器基础实现。如果不能满足要求,你可能从底层通过使用原子类,自己的队列类,以及LockSupport的阻塞支持来构造自己的同步器。
Here is a non-reentrant mutual exclusion lock class that uses the value zero to represent the unlocked state, and one to represent the locked state. While a non-reentrant lock does not strictly require recording of the current owner thread, this class does so anyway to make usage easier to monitor. It also supports conditions and exposes one of the instrumentation methods:
下面是一个独占的不可重入锁的实现,用0表示非锁定的状态,1表示锁定状态。不可重入锁并没有严格要求记录当前拥有锁的线程。这个例子还是记录了以便让监控更简单。同样这个类也提供了conditons,并通过一些方法进行了暴露。
import java.io.IOException; import java.io.ObjectInputStream; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * 一个非可重入锁的实现 * @author zhaochunhui * */ public class AQSLock implements Lock, java.io.Serializable { // 定义一个内部的不可变类,这个类完成了主要的工作,我们只需要进行引用即可。 private final Sysn sync = new Sysn(); /** * 判断是否有锁 * * @return */ public boolean isLocked() { return sync.isHeldExclusively(); } /** * 判断是否有线程等待 * @return */ public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } private static class Sysn extends AbstractQueuedSynchronizer { // 判断是否处理锁定状态 protected boolean isHeldExclusively() { return getState() == 1; } // 加锁 public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 释放锁 protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } // 提供一个条件 Condition newCondition() { return new ConditionObject(); } // 序列化 private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } }