Java同步器框架AQS与锁实现原理
JUC是Java中并发控制的核心包,其中AQS是并发的基础核心类,类名全称AbstractQueuedSynchronizer,其父类是AbstractOwnableSynchronizer(AOS),
用于存取获得独占锁的线程。Java中的ReentrantLock、CountDownLatch、Semaphore等都是基于AQS基础上实现的,
当然Java中还提供了64
位的同步器框架AbstractQueuedLongSynchronizer,它的state状态字段是long型,一般应用中32
位的AQS就足够用。
在了解AQS之前,首先需要简单了解一下LockSupport这个类,AQS通过调用LockSupport挂起或者唤醒线程,而LockSupport则调用UNSAFE。LockSupport的两个 核心方法是park以及unpark,unpark方法是给线程产生一个许可,并且最多一个许可,多次调用许可并不会增加。park方法是消费一个许可,当调用线程没有许可可用时, 则会被阻塞,默认情况下线程都是没有许可,直接调用park方法,会被阻塞。当然线程可以在调用park方法之前先通过调用unpark方法获取许可,这样线程就不会被阻塞。
public class ThreadStudy {
public static void main(String[] args) {
LockSupport.unpark(Thread.currentThread());
LockSupport.park();
System.out.println("ok");
}
}
上面例子中的main线程是不会被阻塞的,因为调用park方法的时候当前线程已经拥有了一个许可,每调用一次park会消费一个许可,所以多次调用park方法必然会阻塞线程, 因为许可至多只有一个,下面代码中的main线程会被阻塞。
public class ThreadStudy {
public static void main(String[] args) {
LockSupport.unpark(Thread.currentThread());
LockSupport.unpark(Thread.currentThread());
LockSupport.park();
LockSupport.park();
System.out.println("ok");
}
}
LockSupport给线程发放许可的unpark方法
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
LockSupport中的park方法相对比较多,park、parkUntil以及parkNanos
public static void park() {
UNSAFE.park(false, 0L);
}
public static void parkNanos(long nanos) {
if (nanos > 0)
UNSAFE.park(false, nanos);
}
public static void parkUntil(long deadline) {
UNSAFE.park(true, deadline);
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(true, deadline);
setBlocker(t, null);
}
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, nanos);
setBlocker(t, null);
}
}
后面三个park方法签名中多一个blocker参数,该参数会在线程阻塞之前,写入到线程对象中的parkBlocker字段上。 线程:我被阻塞了,通过parkBlocker你就知道我是被谁阻塞的?线程调用park方法被阻塞之后,可以由如下几种方式唤醒:
- 其他线程通过调用LockSupport.unpark唤醒被阻塞的线程
- 其他线程通过调用被阻塞线程的interrupt方法中断了被阻塞的线程
- 指定的阻塞时间或者截止时间已过(只针对parkNanos或者parkUntil)
由此可见,LockSupport的park方法是可以响应中断的,下面代码中的main线程是不会被阻塞的,因为线程在调用park方法之前 已经调用interrupt中断了,park方法会立即返回。
public class ThreadStudy {
public static void main(String[] args) {
Thread.currentThread().interrupt();
LockSupport.park();
System.out.println("ok");
}
}
在了解LockSupport的基本用法之后再去分析AQS就容易很多了,AQS支持维护两种类型的队列,分别是同步队列Sync以及条件等待队列Condition, 一个线程要进入Condition队列,首先它必须已经获得锁,获得锁之后因为条件不满足再调用await方法则进入了Condition队列,在进入 Condition队列之后线程会完全释放已经获取的锁,如果释放失败,则当前线程所在的节点状态则被设置成了CANCELLED状态,稍后将会被踢出Condition队列。
两个队列的作用不一样,其数据结构也是不一样的,Sync队列是一个双向链表,其head字段指向的头结点是空节点,所谓的空节点并不是null,而是获得了锁的Node节点, 节点中的Thread对象被设置成null了。Condition队列是一个单向列表,其firstWaiter字段用于指向非空的头结点。两个队列的基本数据节点都是Node对象,构造不同队列 节点时相关的参数使用情况不一样,但是两个队列都是从队列尾部tail插入后来的请求节点。
AQS中的节点对象是由其内部一个静态类Node提供实现的
static final class Node {
/** 共享锁模式 */
static final Node SHARED = new Node();
/** 互斥锁模式 */
static final Node EXCLUSIVE = null;
/** 取消状态 */
static final int CANCELLED = 1;
/**
* 通知状态,表示其后继节点需要被唤醒,也就是说某个线程在进入同步队列
* 等待获取锁过程中,一定要将其前置节点状态设置为SIGNAL
*/
static final int SIGNAL = -1;
/** 条件等待队列中使用的状态,表示线程正在等待条件 */
static final int CONDITION = -2;
/**
* 共享锁模式中表示需要无条件向后传播
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
/**
* 仅在同步队列中使用
*/
volatile Node prev;
/**
* 仅在同步队列中使用
*/
volatile Node next;
/**
* 等待锁的线程对象
*/
volatile Thread thread;
/**
* 条件队列中使用的字段,表示某个节点的后继节点
*/
Node nextWaiter;
//判断当前节点中的线程请求的是否是共享锁
final boolean isShared() {
return nextWaiter == SHARED;
}
//获取同步队列中节点的前置节点,某个节点前置节点一定不会为空,队列初始化时会构造头结点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) { // 构造Sync队列节点时使用的构造方法
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // 构造Condition队列节点时使用的构造方法
this.waitStatus = waitStatus;
this.thread = thread;
}
}
Sync队列通过使用Node对象的prev
、next
字段构造成了双向链表,而Condition队列通过使用Node对象的nextWaiter
字段构造成了单向链表。
同步队列中的节点状态默认是0
,其构造方法并未设置状态,而是设置了锁的模式,条件队列中节点的默认状态是CONDITION
,即-2
。
暂时先不管Condition条件等待这部分实现,首先来分析AQS锁的获取与释放。在看AQS源码时最好结合其某个实现类比如ReentrantLock类来分析, 当然相关的设计模式还是需要有基本的了解,AQS中频繁使用的就是模板模式,即父类实现了一些通用的方法,将某个核心方法下放委托给子类自己去实现, 这种设计模式的优点就是灵活,在父类的基础上可以扩展出更多不同功能的实现类。AQS中锁的获取与释放并未实现,由实现类自己全权决定锁的获取与释放 逻辑,锁获取与释放(成功与失败)后的逻辑处理则由AQS统一调度处理,AQS交由子类实现的几个核心方法:
- tryAcquire(int) 获取独占锁
- tryRelease(int) 释放独占锁
- tryAcquireShared(int) 获取共享锁
- tryReleaseShared(int) 释放共享锁
- isHeldExclusively 当前线程是否获取独占锁
不同的实现类根据其功能实现不同的方法,比如ReentrantLock实现了tryAcquire
、tryRelease
、isHeldExclusively
,而ReentrantReadWriteLock
实现了上面五个方法,CountDownLatch与Semaphore则实现了tryAcquireShared
、tryReleaseShared
。
AQS中有三个成员变量,其中一个是state
,所有的实现类都是通过操控AQS的state
来判定当前线程是否成功获取锁或者释放锁,线程竞争锁的手段一般都是通过
调用AQS的compareAndSetState方法来实现。
/**
* 指向同步等待队列中的头部节点,已成功获取锁的线程节点,状态一定不是CANCELLED
*/
private transient volatile Node head;
/**
* 指向同步等待队列中的尾部节点,一般是新加入的节点,延迟初始化,当有节点加入队列时才会初始化
*/
private transient volatile Node tail;
/**
* 同步器状态
*/
private volatile int state;
/**
* 获取同步器状态
*/
protected final int getState() {
return state;
}
/**
* 设置同步器状态,在构造实现类对象时调用,设置state初始状态值,如CountDownLatch/Semaphore等
*/
protected final void setState(int newState) {
state = newState;
}
/**
* 竞争锁的核心方法
*/
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
独占锁的实现
竞争独占锁通过调用AQS的acquire(int arg)方法实现
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法是由子类实现,比如ReentrantLock就实现了此方法,其只需返回boolean类型结果值告诉AQS结果,如果获取锁成功返回true, AQS什么也不用做,直接return了,如果tryAcquire返回false,AQS则开始构造线程节点对象,加入到同步等待队列中。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter方法的作用就是将当前调用线程包装成Node节点对象,然后通过一次CAS快速将节点加入到队列尾部,如果操作成功直接返回,如果 快速添加到队列尾部失败,表示当前有线程正在竞争。队列未初始化或者节点快速加入队列失败,都会进入到enq方法,通过for无限循环CAS操作, 确保节点成功加入到队列中。JUC中很多方法都是这种逻辑,先简单快速执行一次,如果达到预期目标,快速返回,否则进入无限循环直至操作成功, 有点类似我们的do while风格。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//队列未初始化,则通过CAS操作设置head为一个空节点,此时head、tail指向同一个空节点
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
//将队列tail节点设置为新加入节点的前置节点,再通过CAS将新加入的节点设置为tail节点,
//CAS操作成功之后,再设置节点的后置节点,这就是Node节点中prev一定不为空原因
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq方法主要作用是初始化同步等待队列,同时确保等待的线程节点能成功加入到队列中,采用方式就是无限循环CAS重试,直至成功。
注意,enq
方法执行成功之后返回的不是刚加入的节点,而是其前置节点。当然在addWaiter这个方法中没有使用到enq方法的返回
值,但是在Condition队列中某个线程节点因为被signal通知,通过调用enq转移到同步等待队列时用到了,务必注意。
addWaiter方法执行成功之后,线程已经被加入到同步等待队列了,接下来不能不管这个线程啊,这个线程当前还处于活动状态中,AQS得对其
设置一些规则才能进行调度,包括是否阻塞、什么时候获取锁等,这些操作则由AQS的acquireQueued
方法实现。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued是AQS调度的核心方法,主要由for循环逻辑处理,返回结果表示当前线程是否发生了中断,如果锁竞争不激烈,线程一进入到同步等待队列, 其前置节点可能刚好是头结点(成功获取锁的线程),此时直接尝试调用子类实现的tryAcquire获取锁。如果获取成功,则调用setHead方法将当前线程 节点设置为头结点,然后直接返回,这样就少了一次阻塞唤醒线程操作。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
AQS中的头结点是成功获取锁的节点,也是空节点,AQS就是通过setHead方法清空节点中的线程对象引用。再回到acquireQueued方法,即使当前线程的 前置节点是头结点,也有可能获取锁失败,原因如下:
- 前置节点的线程还未处理完成,并未释放锁
- 前置节点的线程已成功释放锁,当前同步器处于非公平模式,后来的线程刚好通过CAS操作成功获取了锁,导致当前线程竞争锁失败
- 子类实现的tryAcquire竞争锁时发生了异常,则直接进入了finally代码块
前两种原因竞争锁失败之后,处理逻辑一致,都将进入到AQS的shouldParkAfterFailedAcquire方法,这个方法主要用于判断当前线程是否可以被阻塞,该方法是 在acquireQueued方法中的for无限循环中调用的,每一次只会执行一个if-else分支逻辑。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 之前的操作已经前置节点状态设置成SIGNAL,此时直接返回表示可以阻塞线程了。
*/
return true;
if (ws > 0) {
/*
* 当前线程的前置节点已经被取消,则向前一直寻找一个不是取消状态的节点作为当前线程的前置节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 线程被阻塞之前一定要将前置节点的状态设置为SIGNAL,
* 这是AQS中的调度规则,这里的逻辑一般肯定在最上面的if逻辑前执行了
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
如果前置节点的状态大于0
,则表示CANCELLED(1)
状态,一般是由于调用tryAcquire发生了异常,被AQS设置成了CANCELLED
状态。如果前置节点是取消状态,则一直找前
置节点的前置,直到找到一个不是取消状态的节点作为当前线程的前置节点,然后再将当前线程的前置节点状态设置成SIGNAL(-1)
,设置成功之后,AQS下一步则会进入阻塞
当前线程逻辑。这好比我们排队买东西,跟前一个人说当轮到你的时候,你叫我一声,如果前面的人不同意,你只能找前面的前面那个人,直到前面有一个人同意叫你了,这样
你就可以安心的睡觉去了。
历经几次for的处理,shouldParkAfterFailedAcquire方法肯定会返回true,返回true之后才会进入parkAndCheckInterrupt方法,这个方法才是真正阻塞当前线程的方法。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
代码执行到上面的LockSupport.park(this)这一行,线程则被阻塞了,只能等其他线程调用LockSupport.unpark方法唤醒当前线程或者调用当前线程对象的interrupt方法 中断当前线程。前面说过,LockSupport.park方法会响应中断,如果当前线程被中断了,该方法立即返回结果true,如果是被其他线程唤醒,则返回结果是false。 从parkAndCheckInterrupt返回之后,又回到acquireQueued方法的for循环中开始尝试竞争锁了。AQS中的acquireQueued方法是不响应中断操作的,只是用一个变量 interrupted记录当前线程是否曾经被中断过,AQS中还有响应中断的doAcquireInterruptibly方法,该方法通过parkAndCheckInterrupt方法返回结果决定是否抛出 中断异常InterruptedException,是否响应中断这取决于实现类调用AQS哪个方法。
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
AQS中提供了三个竞争独占锁的方法,这三个方法的不同之处在于竞争锁失败之后的部分逻辑,大部分核心处理逻辑是一样的。
- acquireQueued 不支持中断,也不支持最大等待时间
- doAcquireInterruptibly 支持中断,会抛出中断异常,但是不支持等待时间
- doAcquireNanos 支持中断,也支持最大等待时间,等待时间截止了,该方法立即返回
doAcquireNanos方法多了等待时间的相关逻辑,竞争锁失败之后,需要判断最大等待时间是否已超过,等待时间截止,立刻返回。
这里有一个自旋的逻辑,如果当前线程剩下的等待时间还不足spinForTimeoutThreshold
,即1000纳秒,则进行片刻自旋,不进行阻塞,稍后立即返回。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
再来分析下AQS中调用实现类的tryAcquire方法发生异常的情况,发生异常之后则进入finally代码块中的if逻辑,执行cancelAcquire方法。
private void cancelAcquire(Node node) {
if (node == null)
return;
// 发生异常了,清除节点中的线程对象
node.thread = null;
// 跳过取消的节点,向前查找,直到找到一个非取消状态的正常节点作为当前节点的前置节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 这个pred.next一定也是取消状态的节点,这里暂存起来,主要是下面逻辑中compareAndSetNext会用到
Node predNext = pred.next;
// 将当前节点的状态设置为取消状态
node.waitStatus = Node.CANCELLED;
// 当前节点刚好是队列尾节点,先直接CAS尝试操作设置新的tail节点,CAS操作失败表示有其他线程修改了tail
if (node == tail && compareAndSetTail(node, pred)) {
// tail节点设置成功之后,将tail节点的后置节点置空,因为这个tail节点的后置节点已经被取消了
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 唤醒node后面的线程,唤醒的线程未必能马上成功获取锁,防止遗漏线程去竞争锁
unparkSuccessor(node);
}
node.next = node; // 这个节点已经完全踢出了等待列队,加速GC
}
}
再分析下上面方法中的那个长的if判断逻辑,拆出来就是要同时满足这三个条件
- pred != head
如果前置节点是头节点(拥有锁的线程),此时应该无条件去唤醒后面的线程去竞争锁了,直接执行else分支中的unparkSuccessor唤醒后面的有效线程 - (ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))
这个条件很重要,因为后面的正常节点已经将其前置节点状态设置为SIGNAL,并且已经被AQS阻塞了,此时这些正常节点的前置节点又被取消了,其并不感知。 - pred.thread != null
为什么有这个判断,看上面方法cancelAcquire的前几行代码可知,pred线程可能也刚好被取消,刚将Node节点的线程清除,还未设置节点状态为CANCEL。
unparkSuccessor是AQS用于唤醒后继节点的方法,该方法在两个地方会被调用,调用点之一就是上面的cancelAcquire,另外一个调用点就是线程释放锁成功 之后会唤醒其后继节点,首先看下AQS中线程释放锁的方法release。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
/*
* 如果头结点的状态是0,有可能是后面的线程未设置状态或者没有后置节点,
* 或者已经执行过唤醒操作,因为unparkSuccessor在执行之前会将节点的状态又重新设置为0
*/
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease是实现类的方法,该方法返回true表示线程释放锁成功,之后的处理逻辑由AQS处理,AQS首先判断头结点的状态不为0
才会执行unparkSuccessor方法,这就是
AQS为什么在cancelAcquire以及shouldParkAfterFailedAcquire方法中会将线程的前置节点状态从默认值0
设置为SIGNAL(-1)
,如果没有设置前置节点状态为
SIGNAL,当前置线程释放锁时,则不会及时唤醒跟在它后面的线程去竞争锁,如果跟在它后面的这个线程不响应中断以及未设置超时,有可能很长时间不会被唤醒了,实际
上它还是有机会被唤醒的,那就是当其他线程释放锁成功之后调用了unparkSuccessor,而这个线程的后置线程已经被取消,unparkSuccessor方法只能从队列尾部依次向前
查找一个状态未取消的线程进行唤醒。
private void unparkSuccessor(Node node) {
//将状态再次设置成0,表示已经执行唤醒后继节点的操作
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 后置线程已经被取消,只能从队列尾部依次向前查找一个不是取消状态的线程,
* 所以说,某个线程没有及时将其前置节点状态设置为SIGNAL,还是有机会被唤醒的
* 但是AQS一定会设置这个状态
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒线程
if (s != null)
LockSupport.unpark(s.thread);
}
共享锁的实现
AQS中共享锁与独占锁的实现大部分逻辑类似,AQS中也提供了三个对应的竞争共享锁的方法
- doAcquireShared 不响应中断,不支持最大等待时间
- doAcquireSharedInterruptibly 支持中断,发生中断时会抛出InterruptedException异常
- doAcquireSharedNanos 支持中断,支持最大等待时间
AQS中留给子类实现的tryAcquireShared方法返回值不再是boolean,而是int类型,大于等于0表示竞争共享锁成功,当且仅当大于0时表示需要无条件向后传播 唤醒其他竞争共享锁的线程,小于0表示竞争锁失败。上面这三个方法与竞争独占锁的三个方法功能类似,这里只简单分析doAcquireShared方法, 其大部分逻辑与acquireQueued相似,区别就是在线程竞争锁成功之后的部分逻辑处理。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
从doAcquireShared代码中看出,线程竞争锁成功之后,调用的是setHeadAndPropagate,而竞争独占锁成功之后调用的是setHead。 线程竞争共享锁与独占锁最大的区别就是线程竞争共享锁成功之后,会唤醒其后面竞争共享锁的线程,因为多个线程可以同时成功获取共享锁。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* propagate大于0,表示需要无条件向后传播,唤醒其后面的竞争共享锁的线程,如果后面是竞争独占锁的线程
* 是不需要唤醒的,即使唤醒竞争独占锁的线程,此时它也不可能成功,因为共享锁与独占锁互斥(实现类遵循)。
* propagate等于0时,只需判断当前线程的状态或者其头结点的状态是否小于0,只要有一个共享锁线程竞争成功,
* 则表示其他线程也可以去竞争共享锁了,独占锁的线程是不需要唤醒的
*
* 只要满足条件则调用doReleaseShared方法,这里调用这个方法并不是释放锁的意思,而是通过这个
* doReleaseShared方法的逻辑去唤醒后面竞争共享锁的线程
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
下面再分析一下AQS中释放共享锁的方法releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
releaseShared是AQS留给子类实现释放锁的方法,只需返回boolean类型告诉AQS成功与否。线程释放共享锁成功之后,AQS则调用doReleaseShared方法唤醒后面的线程。 分析到这里,应该对共享锁与独占锁的唤醒触发点很清楚了,独占锁可能有两个地方会触发唤醒线程操作,而共享锁比独占锁多了一个触发点,就是上面的setHeadAndPropagate方法。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//当前线程执行doReleaseShared方法时,头结点没有变化,表示没有新节点
if (h == head)
break;
}
}
AQS中释放共享锁成功之后的处理逻辑比独占锁复杂一些,在独占锁模式中,只需简单的判断头结点的状态是否小于0
,小于0则可以调用unparkSuccessor去唤醒后面的线程了。
但是在共享锁模式中,尤其是ReentrantReadWriteLock的实现中,同步等待队列中包含竞争共享锁的线程,也包含竞争独占锁的线程。在doReleaseShared方法中通过
if (h != null && h != tail)判断同步队列中是否有等待线程,接着判断头结点的状态是否是SIGNAL,通过CAS将头结点状态由SIGNAL(-1
)设置为0
,如果设置失败表示
其他线程已经触发了唤醒头结点后置线程的操作,状态已变更,本次循环直接跳过,但是AQS当前是共享锁模式,为了支持共享锁的传播,上面方法中有了第二个CAS操作,
即使已经触发头结点后置线程的唤醒操作,仍然把头结点的状态由0
设置成PROPAGATE(-2
),这个CAS的作用体现在哪里? 答案:线程竞争共享锁成功之后会调用setHeadAndPropagate方法,
这个方法中的if判断逻辑则会利用头结点的状态保证共享锁的传播。
条件队列的实现
条件等待队列是由AQS的一个非静态内部类ConditionObject实现的,这个类实现了Condition接口的所有方法,Condition接口定义的方法如下:
public interface Condition {
void await() throws InterruptedException;
// 不支持中断的等待
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
Condition中的await
、signal
与Object类中的wait
、notify
方法类似,但是AQS支持维护多个条件等待队列,比起Object仅有的一个队列,AQS的并发处理效率更高,
在应用代码中要注意线程虚假唤醒等问题,可通过增加while条件判断。
synchronized (obj) {
while (<condition does not hold>)
obj.wait();
... // 执行业务逻辑
}
Condition队列是一个单向链表,比起AQS的同步队列,相对简单,两个成员变量firstWaiter与lastWaiter将多个节点构造成单向链表结构。
public class ConditionObject implements Condition, java.io.Serializable {
// 队列中的头结点
private transient Node firstWaiter;
// 队列中的尾节点
private transient Node lastWaiter;
/**
* 构造方法
*/
public ConditionObject() { }
......
}
选择ConditionObject的await
与singal
方法进行分析,其他方法实现逻辑雷同,不再关注。前面已经提过,线程只有在成功持有锁之后才能通过调用await
方法进入条件等待队列中。
public final void await() throws InterruptedException {
// await方法响应中断,如果线程被中断,立即抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程加入到条件队列尾部,同时负责检查并清理队列中CANCELLED状态的线程
Node node = addConditionWaiter();
// 线程加入队列成功之后,当前线程会完全释放锁,释放失败则当前线程节点被设置为取消状态CANCELLED
int savedState = fullyRelease(node);
int interruptMode = 0;
// 要退出这个循环,要么线程被中断,要么其他线程调用sigin或者signalAll方法,
// 触发当前线程已经从条件等待队列转移到了同步等待队列中
while (!isOnSyncQueue(node)) {
// 线程待在条件队列中,那就必须阻塞,等待其他线程发起条件通知
LockSupport.park(this);
/* 如果线程发生中断interruptMode值一定不是0,直接跳出这个while循环
* interruptMode=-1,表示线程在条件等待队列中的时候时候被中断了,
* 而当前await方法又响应中断,所以后面会直接抛出异常
*
* interruptMode=1,表示线程是在接收到条件通知之后(其他线程调用了signal或者signalAll方法)
* 之后又进入到同步队列时或者通过enq方法应进入到同步队列时被中断了,此时线程已经不在条件等
* 待队列了,不会跑出中断异常,而是记录中断标识,后面根据这个标识再次中断线程(补一次中断)
*
*/
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 执行到这线程肯定已经从条件等待队列重新加入到了同步等待队列
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清理条件队列中已经被取消的节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
/*
* 根据上面interruptMode的值决定抛出中断异常,还是再次中断线程
* interruptMode=-1 直接抛出异常
*
* interruptMode=1 再次中断线程,因为前面checkInterruptWhileWaiting方法调用Thread.interrupted()
* 方法时已经清除了线程的中断标志
*/
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
下面依次简单分析下await方法中调用的各个方法,addConditionWaiter方法逻辑相对比较简单,主要就是将当前线程加入条件等待队列中, 同时检查并清理下非等待状态的节点。
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果当前队列中的尾节点不是CONDITION状态,则会触发一次遍历队列清除所有CANCELLED状态节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
// 构造节点,注意该构造方法,与同步队列使用的构造方法不同
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
unlinkCancelledWaiters遍历整个链表,剔除所有CANCELLED状态节点,基本的链表删除操作。
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
线程成功加入条件等待队列之后,则开始调用fullyRelease方法完全释放锁,同时用savedState变量记录下状态,下一次调用acquireQueued方法时需传递。 线程调用fullyRelease方法完全释放锁时可能失败,失败之后节点就变成CANCELLED状态了,这就是方法unlinkCancelledWaiters执行的意义。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// release方法由AQS的子类实现,其考虑到release方法执行时发生异常的情况
if (release(savedState)) {
failed = false;
return savedState;
} else {
// 未成功释放锁立即抛出异常进入finally语句块
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
线程加入条件队列的目录就是等待通知,在释放锁之后就可以阻塞了,但是阻塞之前需要根据条件进行判断当前线程是否已经被转移到同步等待队列,因为当前线程刚加入条件队列中还未被阻塞之前,
其他某个持有锁的线程刚好调用了Condition的signal
或者signalAll
方法,结果会导致当前线程可能在被阻塞之前已经转移到了同步等待队列中,
isOnSyncQueue则是用于判断线程是否处于同步等待队列中。
final boolean isOnSyncQueue(Node node) {
/*
* 状态是CONDITION的节点一定不在同步队列中,因为线程转移到同步队列中时,
* 节点状态必定通过CAS操作由CONDITION设置成0。
*
* 前面已经提到,同步队列在调用CAS之前会设置节点的prev,CAS成功之后设置节点的next,
* 如果prev为空,此时一定不在同步队列,但是有可能在转移到同步队列路上,还未开始设置prev而已。
*
*/
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// next不为空,节点一定在同步队列中,因为条件队列中使用的字段是Node对象的nextWaiter,不会使用next
if (node.next != null)
return true;
/*
* next为空或者prev不为空时不能确定节点是否在同步队列,可能在进入同步队列的操作开始时刻,
* 所以再通过调用AQS的findNodeFromTail方法遍历一次
*/
return findNodeFromTail(node);
}
findNodeFromTail是AQS的方法,不是Condition的方法,从队列尾部节点开始遍历链表,查找当前线程节点。
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
isOnSyncQueue方法返回false之后,线程调用LockSupport.park方法立即被阻塞,可能当前线程下一刻就进入了同步等待队列,这其实也并不非常重要,
因为其他线程调用signal
或者signalAll
方法将节点从条件等待队列转移到同步等待队列的时候,会调用LockSupport.unpark又唤醒当前线程,此时线程
再执行await方法中的while(!isOnSyncQueue(node))代码时就跳出了while循环了。
文章前面也提到LockSupport.park方法是可以响应中断的,线程在条件队列中等待的时候,或者当前线程刚好在其他线程调用了signal
或者signalAll
方法即将转移到同步队列的时候,调用者就中断了当前线程。因为Condition的await
方法要求相应中断,如果线程在条件队列等待过程中
被中断,按照约定是需要抛出中断异常的,否则记录当前线程的中断标志,具体的中断处理逻辑交由AQS的相关竞争锁方法决定。
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
THROW_IE与REINTERRUPT是ConditionObject类中定义的两个final的静态常量,其值分别为-1
与1
。
checkInterruptWhileWaiting方法会返回三种结果
0
当前线程没有被中断-1
当前线程在条件队列等待时被中断1
当前线程已经被其他线程调用signal
或者signalAll
方法触发通知,至于这个操作发生在中断之前还是中断之后,还是同时发生,并不可知,也不重要
线程被中断之后才有机会执行下面的transferAfterCancelledWait方法,Condition对我们释放的寓意是线程中断了是因为线程调用者不想等待条件了,也就是 不想让线程在条件队列中阻塞,应该让其重新回到同步等待队列中去。
final boolean transferAfterCancelledWait(Node node) {
// CAS设置节点状态为0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//调用AQS的enq方法保证当前线程加入成功进入同步等待队列中
enq(node);
return true;
}
// CAS操作失败,表示有其它线程竞争,当前线程通过Thread.yield()与其他线程竞争获取cpu时间片,
// 让其他线程有机会先执行,加速当前线程进入到同步队列的步伐
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
当前线程被其它线程唤醒并进入到同步队列或者线程被中断,才能跳出await方法的while循环,如果线程不是在等待条件时被中断,则调用AQS的acquireQueued重新
竞争锁,接着清理条件队列中CANCELLED状态的节点,最后根据interruptMode(-1
, 1
)的值执行相关操作。
Condition的signal
或者signalAll
方法是通知条件队列中的线程,signal
方法只会选择条件队列中的首个节点进行通知唤醒,而signalAll
遍历整个队列,
依次通知唤醒所有线程去竞争锁,当然signal
执行失败之后,会重试唤醒头结点的下一个节点,保证一定成功唤醒一个队列中的线程。调用signal
的方法一定是当前正
持有锁的线程才能进行调用,也就说调用Condition的方法,一定要在lock.tryLock执行成功之后执行。
public final void signal() {
// 线程未持有锁,调用signal方法立即抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 选择队列中第一个节点进行通知
if (first != null)
doSignal(first);
}
doSignal方法通过do while语句保证一定成功通知队列中某个线程,从头到尾选择一个,直到操作成功。
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
transferForSignal才是Condition的通知核心逻辑实现
final boolean transferForSignal(Node node) {
/*
* 从条件队列转移到同步队列,首先就是重置节点状态
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* 保证指定的线程成功加入到同步队列中,返回的节点是当前线程节点的前置接节点
*/
Node p = enq(node);
int ws = p.waitStatus;
// 如果前置线程被取消或者设置其状态为SIGNAL失败,马上唤醒当前线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}