在JDK 14版本,Doug Lea对AQS的实现进行了一次大的变更,但基本的数据结构还是基于CLH的同步队列+单链表的等待队列
相比之前的版本,新版的AQS最明显的变化是:尽量延后线程节点进入队列的时间
具体代码改动见github
前言
本文注意以ReentrantLock为例,分析AQS加锁与释放锁的过程,以及线程在同步队列和等待队列之间的转换
预备知识
- CLH无锁队列的基本原理
- 自旋等待的代码实现
- LockSupport常用方法的功能
- Unsafe与VarHandle的使用
- 可重入锁的概念
- 位运算符的使用
术语
为了区分tryLock和lock,try代表只执行一次,下文中我称之为惰性操作,而lock是在死循环中自选等待,直到抢到锁,我称之为贪婪操作
ReentrantLock加锁过程
以ReentrantLock.lock()为例,分析加锁的过程,其调用内部类Sync.lock()方法。Sync属于抽象类,本身继承了抽象类AbstractQueuedSynchronizer(以下简称AQS),而提到抽象类,大多和模板方法设计模式有关
因此Sync.lock()和AQS.acquire中为调用的主流程,而2个子类FairSync和NonfairSync实现了各自tryAcquire时的差异化逻辑
在没有认识到AQS的同步队列之前,此处先不讲述hasQueuedPredecessors等方法,以下先以非公平锁为例讲解
Sync.lock()方法较为简单,前言中提到新版AQS尽量延后线程节点进入队列的时间,initialTryLock正是一种体现,以非公平锁为例,快速看下initialTryLock的流程
- 如果AQS中的state同步变量为0,并且设置成1成功,就把当前线程设置为独占锁的持有者,返回true
- 如果当前线程已经是独占锁的持有线程了,表示当前是锁的重入过程,state+1
- 否则获取锁失败
1 | final void lock() { |
虽然这个方法很简单,但是有几点说明
- 这是个惰性操作,只是尝试一次获取锁
- 为什么直接调用了setState而不是compareAndSetState,虽然原因很简单,getExclusiveOwnerThread() == current表示当前线程获取到了这把独占锁,别的线程是无法修改的,所以可以直接设置。
但是什么时候可以不通过cas,而是直接赋值是我们需要关心的,后面的源码还有很多这类操作,所以在直接赋值时要反推,是否代表当前线程已经获取到了锁,或者说操作是线程安全的
initialTryLock失败后,回到acquire(1)方法,其内部实现如下
- 先调用tryAcquire尝试加锁(惰性操作),然后调用AQS的acquire方法
- tryAcquire只是简单尝试加锁,但是为什么不考虑可重入性呢?
原因是initialTryLock已经判断过当前线程持有锁的情况,所以这里可以不考虑可重入
而在后面也会多次调用该方法,但是是在线程明确没有获取锁的情况下,所以也不需要考虑可重入
1 | acquire(1) |
acquire核心流程
参数说明
首先看方法形参说明和实参列表的对应关系
形参名 | 说明 | 实参 |
---|---|---|
node | 线程节点或条件节点(传入为 null,仅考虑线程节点) | null |
arg | 需要获取的参数(通常为加锁次数或资源数) | 1 |
shared | 共享模式(true)或独占模式(false) | false |
interruptible | 是否响应中断(true 会返回负数表示中断) | false |
timed | 是否定时阻塞(true 启用超时机制) | false |
time | 定时阻塞的到期时间(单位:纳秒,timed 为 true 时生效) | 0L |
acquire(null, arg, false, false, false, 0L),即以独占模式,不可中断,不定时阻塞进行加锁
返回值说明
返回值 | 说明 |
---|---|
正数 | 加锁成功 |
0 | 加锁超时 |
负数 | 加锁过程中被中断 |
流程图
下图是整个acquire流程,在后面的讲解中,配合此图来看(看不清单击两次)
类图
字段说明
重点对AQS和Node的字段进行说明,方法在下文讲解中说明
字段 | 类型/值 | 描述 |
---|---|---|
AQS 成员变量 | ||
head |
volatile Node | CLH 双链表队列的头节点 |
tail |
volatile Node | CLH 双链表队列的尾节点 |
state |
volatile int | 同步变量 |
Node 内部类 | ||
prev |
volatile Node | 前驱节点 |
next |
volatile Node | 后继节点 |
waiter |
Thread | 当前节点关联的等待线程 |
status |
volatile int | 节点状态,取值如下: |
status 状态 | ||
0 |
初始状态 | 空闲状态,可进入同步队列或转为取消状态 |
WAITING(1) |
等待状态 | 表示节点正在等待获取锁 |
CANCELLED(0x80000000) |
负数(取消状态) | 表示节点已取消获取锁(通常是线程中断或超时导致) |
COND(2) |
条件等待 | 表示在等待队列中 |
代码解析
acquire的代码很长,我们需要逐步分解,先看定义的几个局部变量
完整代码见github或本地ide查看
1 | Thread current = Thread.currentThread(); |
此处重点说明first的含义,主要和head做区分。first节点(下文称为首节点),即head(头节点)的后驱节点,有以下特点:
1 | head.next == first |
流程简述
接下来是死循环内获取锁的逻辑,代码很长,首先我们看下它翻译后的注释
注: 以下的队列表示同步队列,入队指插入同步队列的队尾
1 | 循环执行: |
以上翻译不完全直译原文,有部分自己的补充
执行细节
获取锁的执行过程包裹在for(;;)中,包含了8个大的(else)if,结合上文的流程描述和流程图,下面从判断逻辑和执行逻辑分步骤分析这8个if
第一个If
1 | if (!first && (pred = (node == null) ? null : node.prev) != null && |
if的判断逻辑: if条件比较复杂,包含了赋值语句和比较,我们简化为:这是在判断node已经入队的情况下,不是首节点,并且前驱不为空的情况
1 | // 简化版: |
if的执行逻辑是:
- 如果node的前驱节点处于取消状态(status<0),清理队列中所有取消状态的节点。cleanQueue方法分析作为补充,此时可以先不关心
- 如果前驱的前驱为null,告诉jvm处于自旋等待状态,此时说明node是首节点
这两步都进行了continue进入下一轮循环,说明这是在node进入队列获取锁之前做的稳定性措施
回顾传入的参数:acquire(null, arg, false, false, false, 0L),第一个参数为node,明显不满足if条件,所以第一次循环if肯定不会执行的
第二个if
第一个if在首次不执行的情况下,第二个if才是真正的流程入口,源码如下:
1 | if (first || pred == null) { |
if判断逻辑:
- 如果node是首节点,或者node的前驱节点为null
从第一个if看,node肯定为null,pred也必定为null,可以理解为线程还没有入队
思考:pred==null有没有可能是线程已经入队成为node,但是node的前驱为null呢?
不会,我的理解是node不可能在前驱节点为null的情况下还来获取锁,pred为null表示node是头节点,而头节点表示已经获取到了锁,还未释放的状态(或者头节点是dummy),node没有必要再去获取锁,下面的代码也会验证我的想法
执行逻辑:
- 如果传入的shared为true,调用tryAcquireShared方法,否则调用tryAcquire方法
异常则取消获取锁,cancelAcquire方法分析见return cancelAcquire - 如果获取锁成功,但没入队的情况,直接返回1,表示加锁成功
- 如果通过入队的方式成为了首节点,并且获取锁成功,则进行双链表的维护,让原来的head节点出队
如果是共享锁,则不需要等待释放,直接唤醒下一个节点
如果收到了中断信号,重置中断标志位
返回整数1,表示加锁成功,
双链表维护:
1 | node.prev = null; |
- node前驱指向null
- head指向node,即node成为新head
- 前驱.next本来指向node,改为指向null,即老head出队
- node获取锁之后,取消对当前线程的指向,方便jvm gc
上述双链表节点变化示意图如下,值得注意的是原来的head节点出队,node2成为新的head节点,node3成为新的first节点
思考:为什么双链表的变更不用考虑线程安全为题?
因为此时线程已经加锁成功,没有其他线程能够改变head节点和node.pred节点
第三个if
这里主要是初始化队列,失败则调用acquireOnOOME
判断逻辑:tail为null
1 | Node t; |
执行逻辑:
tryInitializeHead主要是为了初始化dummy节点,并且让head和tail都指向dummy节点,最后返回tail节点
- 如果tail不为null,说明不需要初始化,直接返回tail
- 如果tail为null但是head不为null,说明head当前是中间态,未被释放,通过自旋等待head指向正确
- 初始化一个独占节点,cas设置head指向独占节点
1 | private Node tryInitializeHead() { |
思考:如果有2个线程同时调用tryInitializeHead,thradA执行到h==null,切换到threadB,此时B也认为h==null
threadB让h指向创建的dummy1节点,此时切换到threadA,A会再创建一个dummy2节点,让h指向dummy2节点,是否会出现线程安全问题?
不会,虽然threadA创建了多余的dummy2节点,但是在cas设置head=h时,会首先判断head是否为null
不管threadA还是threadB哪个先成功执行cas,另一个线程都会进入到下一轮循环,认为tail!=null, 并且return,和步骤1形成闭环
那么多余创建的dummy节点算不算浪费呢?
不会,多余的dummy节点会被gc回收,考虑以下代码
node改变指向后,第一个Node对象没有被引用,会被gc回收
1 | Node node = new Node(); // 第一个对象 |
acquireOnOOME
acquireOnOOME前文已提到该方法,调用tryAcquire系列方法,自旋重试直到成功获取锁
代码逻辑比较简单,我给2个辅助说明
- U.park(false, nanos); false表示相对时间,传入纳秒,阻塞当前线程,直到时间结束被唤醒
- n<<x 表示 n * 2^x
1 | private int acquireOnOOME(boolean shared, int arg) { |
第四个if
仅仅是初始化node,为入队做准备
1 | else if (node == null) { // allocate; retry before enqueue |
第五个if
判断逻辑:pred==null在之前多次出现,表示node还没有入队
执行逻辑:
- 封装node
- node.prev = tail
- 设置node为新的tail
如果成功,原来tail.next = node
如果失败,设置node.prev = null, 其实是一个回滚操作,这样下一轮循环pred还是为null,重新尝试,可以说是一个巧妙的办法
1 | else if (pred == null) { // try to enqueue |
第六个if
如果是首节点,并且自旋次数!=0, 说明是要进行spins次尝试获取锁,而不是立刻进行park(阻塞)
前面我也提到了,新版的AQS会尽最大努力,延缓入队和线程阻塞的时机,从而减少线程上下文切换的开销
1 | else if (first && spins != 0) { |
注意:此时node的status还是0
第七个if
如果自旋次数也用完了,那么就是真的要被阻塞挂起了,设置node.status为WAITING
1 | else if (node.status == 0) { |
第八个if
最后就是真的要挂起线程了,执行逻辑:
- 先重置自旋次数
- 判断是否设置了超时阻塞
是:计算超时时间,并设置超时阻塞
否:直接阻塞 - else就比较有趣了,他表示虽然设置了超时时间,但是已经过期了,直接break,走return逻辑
- 从park处被唤醒继续执行,首先去除status标志,表示空闲,随时准备进入等待
- 如果线程被中断,并且acquire传入的参数interruptible=true,表示可以响应中断,直接,走return逻辑
1 | else { |
关于自旋次数spins与postSpins
它们的作用是让线程在park之前先自旋spins次,尝试获取锁,次数用完就被park阻塞挂起,之后被其他线程unpark或parkNanos时间到期后,又再次自旋spins次进行获取锁,也就是在不断增大的spins和park之间来回切换
只不过这个spins每次减为0之后,都会增大,你大致可以理解为2^n -1
具体计算规则如下:
首先它们都是byte类型,即8bit,范围为-128~127,也就是说spins在不断增大时,也是会溢出的,127溢出会变为-128
再执行spins–还有意义吗,有的,兄弟,有的,-128-1又再次溢出为127,所以spins其实是在循环中不断溢出的
有兴趣的同学可以尝试下下面的代码
1 | byte spins = 0, postSpins = 0; |
return cancelAcquire
那么return cancelAcquire的实现是怎样呢
- 如果node不为null,表示已入队
取消对线程的引用,设置为取消状态,清理队列中取消状态的节点 - 中断处理,如果已经中断且可以响应中断,设置为取消状态
否则仅仅重置中断标志位
返回0,也是acquire方法的返回,表示取消获取锁
1 | private int cancelAcquire(Node node, boolean interrupted, |
cleanQueue方法分析
首先吐槽下这个方法的变量命名,全是p,q,s,n,完全不知所谓,所以我根据它们本身的含义,重新命名
1 | private void cleanQueue() { |
执行逻辑
将同步队列从后往前遍历,定义四个指针变量:
- current表示当前节点
- next表示当前节点的下一个节点
- prev表示当前节点的上一个节点
- prevNext表示prev下一个节点,正常情况下就是current,但是可能被其他线程修改,所以定义prevNext
第一个if表示已经遍历到了队首,即清理完成
1 | if (current == null || (prev = current.prev) == null) |
第二个if的条件就很复杂,所以再次改写
1 | if (next == null ? tail != current : (next.prev != current || next.status < 0)) |
它是在判断当前节点的前驱和后继状态是否不正常,如果不正常,直接break,进入下一次循环
节点状态不正常有三种情况
- next == null表示当前节点为队尾,tail应该==current,否则不正常
- next的前驱是current,即next.prev == current,否则不正常
- next.status < 0表示后继节点被取消了,表示不正常
第三个if表示当前节点是取消状态, 那么就把当前节点从队列中移除,就是普通的双链表节点删除操作
1 | if (current.status < 0) { |
- 如果当前节点是尾节点,那么直接把前驱节点设置为新的尾节点, 即casTail(current, prev)
- 如果当前节点不是尾节点,那么把next的prev指向current的prev,即next.casPrev(current, prev))
并且保证当前节点的prev没有变,即current.prev == prev - prev.next之前指向current,现在指向next,即prev.casNext(current, next)
- 如果prev是head节点,即prev.prev == null,唤醒下一个阻塞等待的节点
思考:
如果casTail或者casPrev失败会有问题吗?
首先看失败的结果,直接break进入下一轮循环,失败的原因无非是有其他线程修改了链表结构,因此重新从tail开始遍历,这就是一种贪婪模式,一直循环直到成功
为什么current.prev和current.next没有指向null?current会被gc回收吗
基于gc可达性分析,没有任何节点指向current,因此current不可达而被回收
第四个if更多的是一种修复,如果前驱的next没有指向current,但是current的prev依然指向前驱,这一点从上面的图3也可以看出
那么修复前驱的next指向current
以下是修复过程:
- 前驱的next不是current,说明需要修复
- prev不是head,current.prev还是指向pred,进行修复,否则说明prev和current不相连,跳过本次循环
- 修复prev.next指向current,如果prev指向head,那么唤醒下一个节点
1 | if ((prevNext = prev.next) != current) { |
ReentrantLock释放锁锁过程
实际上unlock方法非常简单
1 | public void unlock() { |
看下tryRelease方法如何实现
- 获取当前同步变量,减去释放的计数
- 验证当前持有线程正确性,如果不是当前线程,抛出异常
- 如果同步变量为0了,说明释放的计数=重入数了, 清除独占锁持有线程变量
- 设置回同步变量
全程不需要考虑线程安全问题,应该其他线程无法修改CLH队列节点和state同步变量
1 | protected final boolean tryRelease(int releases) { |
这里我需要澄清"唤醒"这个很有歧义的词,"唤醒"通常对应Object.notify(),或是Condition.signal()方法,但是在ReentrantLock.unlock释放锁时,线程是持有锁的。
而notify或signal是在线程持有锁是调用wait/await主动释放了锁,所以这里的"唤醒"是对应加锁过程线程从park方法的阻塞中被唤醒(unpark)
执行逻辑:
从head节点开始,从前往后遍历CLH队列status不为0的节点,清除它的等待状态,唤醒等待的线程
1 | private static void signalNext(Node h) { |
思考:为什么是status!=0,0为什么不行,如果线程被取消了怎么办?如果是COND条件等待状态怎么办?
- 等于0表示线程节点还没入队,甚至可能还没成为节点,总之不在队列中
- 取消状态的分析就比较麻烦了,先回顾什么情况下会被取消
1.加锁的第二个if出现异常 2.调用parkNanos出现时间过期 3.线程被中断
第1种情况又分为未入队和入队成为首节点,未入队好说,signalNext遍历不到
其他情况则是在入队情况下,需要cancelAcquire保证通过cleanQueue来清理取消了的节点 - 条件等待状态需要先释放锁,线程从同步队列进入等待队列,所以同步队列节点不可能存在COND状态
公平锁
以上是关于非公平锁的分析,公平锁的加锁/释放锁只有细微的区别,下面做一下分析
加锁
公平锁的加锁和非公平锁的加锁基本一致,唯一不同的是,公平锁的加锁会先判断队列是否为空,主要体现在initialTryLock和tryAcquire方法中
initialTryLock区别
这里代码和公平锁大致一样,只是在发现同步变量为0,加锁之前调用hasQueuedThreads判断队列里是否有线程
ps: 说实话写的有点啰嗦,应该把公共代码提取到父类的模板方法里
1 | final boolean initialTryLock() { |
hasQueuedThreads实现
实现比较简单,从同步队列的tail开始遍历,如果节点状态大于等于0,则说明有线程在等待,返回true
说明下status>0表示线程在等待,等于0表示线程从park唤醒,重新开始获取锁
1 | public final boolean hasQueuedThreads() { |
tryAcquire区别
同样吐槽啰嗦,为什么不放到Sync里
1 | protected final boolean tryAcquire(int acquires) { |
hasQueuedPredecessors
首先这个方法可读性很差,我们先改写下
1 | public final boolean hasQueuedPredecessors() { |
定义了3个变量:headPtr指向head节点,firstNode指向首节点,firstThread表示首节点的等待线程
if判断: 如果队列不为空,则判断以下三个条件之一
- 首节点为空,表示没有等待中的节点
- 首节点的等待线程为空,有可能节点处于取消状态,见return-cancelAcquire
- 首节点的prev应该指向head,但如果指向了null,说明首节点已经获取到了锁,是让head节点出队的中间态,见第二个if的执行逻辑
以上条件均表示firstThread = firstNode.waiter不成立,需要重新获取,也就是调用getFirstQueuedThread
最后返回值表示首节点的等待线程存在,并且不是当前线程,也就是说公平锁在发现队列有等待线程的时候,不会继续获取锁
getFirstQueuedThread很简单,从头到尾遍历,获取第一个等待线程不为空的节点,并返回等待线程
ps:不知道为什么有一模一样的判断条件,只能说啰嗦且不合理
1 | public final Thread getFirstQueuedThread() { |
至于公平锁的释放,和非公平锁一样