一、前置知识
公平锁和非公平锁
- 公平锁:锁被释放以后,先申请的线程先得到锁。性能较差一些,因为公平锁为了保证时间上的绝对顺序,上下文切换更频繁
- 非公平锁:锁被释放以后,后申请的线程可能会先获取到锁,是随机或者按照其他优先级排序的。性能更好,但可能会导致某些线程永远无法获取到锁
可重入锁
也叫做递归锁,指的是线程可以再次获取自己的内部锁,比如一个线程获取到了对象锁,此时这个对象锁还没有释放,当其想再次获取这个对象锁的时候还是可以获取的,如果不可重入的话,会导致阻塞。
自旋思想
当线程请求锁时,如果锁已经被其他线程持有,那么该线程会不断地重试获取锁,而不是被挂起等待,这种不断尝试获取锁的行为称为自旋
LockSupport
- 一个工具类,用于线程的阻塞和唤醒操作,类似于wait()和notify()方法,但是更加灵活和可控
- 提供了
park()
和unpark()
两个静态方法用于线程阻塞和唤醒操作。 - 优点在于可以在任意时刻阻塞和唤醒线程而不需要事先获取锁或监视器对象。
数据结构之双向链表
双向链表(Doubly Linked List)是一种常见的数据结构,它是由一系列结点(Node)组成的,每个结点包含三个部分:数据域、前驱指针和后继指针。其中,数据域存储结点的数据,前驱指针指向前一个结点,后继指针指向后一个结点。通过这种方式,双向链表可以实现双向遍历和插入、删除操作。
设计模式之模板设计模式
- 模板设计模式是一种行为型设计模式,定义了一种算法的框架,并将某些步骤延迟到子类中事先,这种设计模式的主要目的是允许子类在不改变算法结构的情况下重新定义算法中的某些步骤。
- 优点是能够提高代码复用性和可维护性。
二、概述
Java中的大部分同步类(Lock
、Semaphore
、ReentrantLock
等)都是基于 AbstractQueuedSynchronizer
(简称为AQS)实现的。AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。本文会从应用层逐渐深入到原理层,并通过 ReentrantLock
的基本特性和 ReentrantLock
与AQS的关联,来深入解读AQS相关独占锁的知识点。
三、ReentrantLock
1. ReentrantLock特性概览
ReentrantLock意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。为了帮助大家更好地理解ReentrantLock的特性,我们先将ReentrantLock跟常用的Synchronized进行比较,其特性如下
ReentrantLock | Synchronized | |
---|---|---|
锁实现机制 | 依赖AQS | 监视器模式 |
灵活性 | 支持响应中断、超时、尝试获取锁 | 不灵活 |
释放形式 | 必须显示调用unlock0释放锁 | 自动释放监视器 |
锁类型 | 公平锁&非公平锁 | 非公平锁 |
条件队列 | 可关联多个条件队列 | 关联一个条件队列 |
可重入性 | 可重入 | 可重入 |
下面通过伪代码,进行更加直观的比较:
// **************************Synchronized的使用方式**************************
// 1.用于代码块
synchronized (this) {}
// 2.用于对象
synchronized (object) {}
// 3.用于方法
public synchronized void test () {}
// 4.可重入
for (int i = 0; i < 100; i++) {
synchronized (this) {}
}
// **************************ReentrantLock的使用方式**************************
public void test () throw Exception {
// 1.初始化选择公平锁、非公平锁
ReentrantLock lock = new ReentrantLock(true);
// 2.可用于代码块
lock.lock();
try {
try {
// 3.支持多种加锁方式,比较灵活; 具有可重入特性
if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }
} finally {
// 4.手动释放锁
lock.unlock()
}
} finally {
lock.unlock();
}
}
2. ReentrantLock与AQS的关联
通过上文我们已经了解,ReentrantLock支持公平锁和非公平锁(关于公平锁和非公平锁的原理分析,可参考《不可不说的Java“锁”事》),并且ReentrantLock的底层就是由AQS来实现的。那么ReentrantLock是如何通过公平锁和非公平锁与AQS关联起来呢? 我们着重从这两者的加锁过程来理解一下它们与AQS之间的关系(加锁过程中与AQS的关联比较明显,解锁流程后续会介绍)。
非公平锁源码中的加锁流程如下:
// java.util.concurrent.locks.ReentrantLock#NonfairSync
// 非公平锁
static final class NonfairSync extends Sync {
...
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
...
}
上述代码解析如下:
- 首先通过CAS的方式,将 state(同步状态)从0 设置成 1, 如果成功,则将当前线程设置成独占线程
- 如果state(同步状态)修改失败,程序走else逻辑,通过
acquire()
方法进行后续处理。
acquire()
是CAS的核心方法,有 FairSync
和 UnfairSync
两个子类实现它,它们的逻辑基本类似,目的都是通过将争抢锁的线程组成队列,基于 state(同步状态)的变化,阻塞和唤醒线程,从而实现锁的获取和释放。
四、AQS原理
1. AQS整体框架
首先,我们通过下面的架构图来整体了解一下AQS框架:
- 上图中有颜色的为Method,无颜色的为Attribution。
- 总的来说,AQS框架共分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。
- 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。
2. AQS原理概览
AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
主要原理图如下:
2.1 AQS数据结构
先来看下AQS中最基本的数据结构——Node,Node即为上面CLH变体队列中的节点。
Node源码如下(重要):
/**
* Wait queue node class.
*
* <p>To enqueue into a CLH lock, you atomically splice it in as new
* tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
*
*/
static final class Node {
/** 共享模式下的节点 */
static final Node SHARED = new Node();
/** 独占模式下的节点 */
static final Node EXCLUSIVE = null;
/** 线程取消执行状态:1 */
static final int CANCELLED = 1;
/** 线程准备就绪,等待资源释放后执行 */
static final int SIGNAL = -1;
/** 线程等待执行条件触发 */
static final int CONDITION = -2;
/**
* 此状态在共享模式下才会用到
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: 为-1,表示线程已经准备好了,就等资源释放了
* CANCELLED: 为 1,表示线程获取锁的请求已经取消了
* CONDITION: 为-2,表示节点在等待队列中,节点线程等待唤醒
* PROPAGATE: 为-3,当前线程处在SHARED情况下,该字段才会使用
* 0: 当一个Node被初始化的时候的默认值
*/
volatile int waitStatus;
/**
* node节点的前节点
*/
volatile Node prev;
/**
* node节点的后节点
*/
volatile Node next;
/**
* 当前节点运行的线程
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special value SHARED.
*/
Node nextWaiter;
/**
* 共享模式下返回true
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回当前节点的前节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
解释一下几个方法和属性值的含义:
方法和属性值 | 含义 |
---|---|
waitStatus | 当前节点在队列中的状态 |
thread | 表示处于该节点的线程 |
prev | 前驱指针 |
predecessor | 返回前驱节点,没有的话抛出npe |
nextWaiter | 指向下一个处于CONDITION状态的节点(由于本篇文章不讲述Condition Queue队列,这个指针不多介绍) |
next | 后继指针 |
线程两种锁的模式:
模式 | 含义 |
---|---|
SHARED | 表示线程以共享的模式等待锁 |
EXCLUSIVE | 表示线程正在以独占的方式等待锁 |
waitStatus
有下面几个枚举值:
枚举 | 含义 |
---|---|
0 | 当一个Node被初始化的时候的默认值 |
CANCELLED | 为1,表示线程获取锁的请求已经取消了 |
CONDITION | 为-2,表示节点在等待队列中,节点线程等待唤醒 |
PROPAGATE | 为-3,当前线程处在SHARED情况下,该字段才会使用 |
SIGNAL | 为-1,表示线程已经准备好了,就等资源释放了 |
2.2 同步状态State
在了解数据结构后,接下来了解一下AQS的同步状态——State。AQS中维护了一个名为state的字段,意为同步状态,是由Volatile修饰的,用于展示当前临界资源的获锁情况。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
* The synchronization state.
*/
private volatile int state;
下面提供了几个访问这个字段的方法:
方法名 | 描述 |
---|---|
protected final int getState() | 获取State的值 |
protected final void setState(int newState) | 设置State的值 |
protected final boolean compareAndSetState(int expect, int update) | 使用CAS方式更新State |
这几个方法都是Final修饰的,说明子类中无法重写它们。我们可以通过修改State字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。
对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是AQS架构图中的第一层:API层。
2.3 AQS其它关键属性
head : 表示CLH队列中的头节点
private transient volatile Node head;
tail :表示CLH队列中的尾节点
private transient volatile Node tail;
3. AQS重要方法与ReentrantLock的关联
从架构图中可以得知,AQS提供了大量用于自定义同步器实现的Protected方法。自定义同步器实现的相关方法也只是为了通过修改State字段来实现多线程的独占模式或者共享模式。自定义同步器需要实现以下方法(ReentrantLock需要实现的方法如下,并不是全部):
方法名 | 描述 |
---|---|
protected boolean isHeldExclusively() | 该线程是否正在独占资源。只有用到Condition才需要去实现它。 |
protected boolean tryAcquire(int arg) | 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。 |
protected boolean tryRelease(int arg) | 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。 |
protected int tryAcquireShared(int arg) | 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 |
protected boolean tryReleaseShared(int arg) | 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。 |
一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease
、tryAcquireShared-tryReleaseShared
中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock
。ReentrantLock
是独占锁,所以实现了tryAcquire-tryRelease
。
为了帮助大家理解ReentrantLock和AQS之间方法的交互过程,以非公平锁为例,我们将加锁和解锁的交互流程单独拎出来强调一下,以便于对后续内容的理解。
加锁:
通过
ReentrantLock
的加锁方法Lock
进行加锁操作。会调用到内部类
Sync
的Lock
方法,由于Sync#lock
是抽象方法,根据ReentrantLock
初始化选择的公平锁和非公平锁,执行相关内部类的Lock
方法,本质上都会执行AQS的Acquire
方法。(以非公平锁NonfairSync
为例)java.util.concurrent.locks.ReentrantLock#lock
=>java.util.concurrent.locks.ReentrantLock.Sync#lock
=>java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
=>java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
AQS的
Acquire
方法会执行tryAcquire
方法,但是由于tryAcquire
需要自定义同步器实现,因此执行了ReentrantLock
中的tryAcquire
方法,由于ReentrantLock
是通过公平锁和非公平锁内部类实现的tryAcquire
方法,因此会根据锁类型不同,执行不同的tryAcquire
。(这里再次以非公平锁为例)java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire
=>java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
=>java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
tryAcquire
是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock
自定义同步器无关。java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
=>java.util.concurrent.locks.AbstractQueuedSynchronizer#enq
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
=>java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
=>java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt
以上为AQS加锁过程的核心逻辑和核心方法。
解锁:
通过
ReentrantLock
的解锁方法Unlock
进行解锁。Unlock
会调用内部类Sync
的Release
方法,该方法继承于AQS。java.util.concurrent.locks.ReentrantLock#unlock
=>java.util.concurrent.locks.AbstractQueuedSynchronizer#release
Release中会调用
tryRelease
方法,tryRelease
需要自定义同步器实现,tryRelease
只在ReentrantLock
中的Sync
实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。java.util.concurrent.locks.AbstractQueuedSynchronizer#tryRelease
=>java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
释放成功后,所有处理由AQS框架完成,与自定义同步器无关。
通过上面的描述,大概可以总结出ReentrantLock
加锁解锁时API层核心方法的映射关系。
五、AQS加锁/解锁源码解析
下面以 ReentrantLock
的实现为例,通过一个具体的案例,详细解释AQS是如何实现线程的抢占、创建队列(FIFO)、入队、以及获取到锁后出队的过程。
1. 模拟场景
假设有5个线程,分别是 线程A
、线程B
、线程C
、线程D
、线程E
同时去争抢一个资源,然后执行相应的逻辑,线程A先抢到锁,其它线程只能 等待A
执行结束后再抢锁。这里使用 ReentrantLock非公平锁实现。
模拟代码如下:
public class AQSDemo {
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();//非公平锁
// A线程先抢到锁,执行业务
new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("----come in A");
//暂停1分钟线程
try {
TimeUnit.MILLISECONDS.sleep(1000 * 60);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
reentrantLock.unlock();
}
}, "A").start();
// B线程等待,进入AQS队列(FIFO),等待着A运行结束,尝试去抢占锁。
new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("----come in B");
} finally {
reentrantLock.unlock();
}
}, "B").start();
// C线程等待,进入AQS队列(FIFO),等待着A运行结束,此时前面是B线程
new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("----come in C");
} finally {
reentrantLock.unlock();
}
}, "C").start();
// D线程等待,进入AQS队列(FIFO),等待着A运行结束,此时前面是C线程
new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("----come in C");
} finally {
reentrantLock.unlock();
}
}, "D").start();
// E线程等待,进入AQS队列(FIFO),等待着A运行结束,此时前面是D线程
new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("----come in C");
} finally {
reentrantLock.unlock();
}
}, "E").start();
}
}
2. 资源初始化
最开始,没有线程争抢锁时,如下图,此时 同步状态为0(表示资源空闲),所没有被线程抢占。
3. 无竞争获取资源
此时 线程A 调用了 lock() 方法抢占了锁,得到了资源,如下图:
此时,同步状态被修改为1,表示资源已被占用。由于此时还有其它线程来争抢资源,所以CLH队列依然是空的。
线程A的代码执行如下图:
// java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
final void lock() {
// 使用CAS将同步状态从0修改为1,如果成功表示成功抢占了锁
if (compareAndSetState(0, 1))
// 抢占锁成功,设置当前线程为独占线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 以独占模式争抢锁,成功则返回,失败则线程进入CLH队列
acquire(1);
}
4. 线程加入等待队列
4.1 加入队列的时机
此时 线程B
也调用了 lock()
方法争抢锁,但由于 线程A
没有退出执行,依然占有的着锁,此时 state(同步状态)为1,所以 线程B 使用CAS修改state 操作是失败的,只能走 else 分支,进入 acquire(1) 方法。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(arg)
获取锁逻辑, 获取成功则直接返回,获取失败后,会执行框架AQS的后续逻辑
addWaiter(Node mode)
当
tryAcquire(arg)
获取锁失败,就会调用addWaiter
加入到等待队列中去,并返回Node对象(Node为执行线程经过封装后的对象)acquireQueued(final Node node, int arg)
把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)
下面基于非公平锁实现,详细解读这三个方法 tryAcquire(arg)
、addWaiter(Node mode)
、acquireQueued(final Node node, int arg)
4.1.1 tryAcquire
// java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
这是AQS父类的默认实现,当子类没有重新实现它时,抛出 UnsupportedOperationException,tryAcquire 有公平锁、非公平锁等不同的实现逻辑,这里我们先看 ReentrantLock
内部类 NonfairSync
的实现
// java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
继续往下
// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取同步状态
int c = getState();
if (c == 0) { // 如果同步状态为0,表示资源空闲,没有被其它线程占用
// CAS修改同步状态,成功的话则设置当前线程为独占线程,并返回true
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果同步状态不是0,但是当前线程就是资源的独占线程,这就是表示锁重入,此时更新同步状态,并返回true
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 同步状态既不是空闲(0),当前线程也不是资源的独占线程,表示当前线程此次抢占锁失败,返回false。
return false;
}
由于此时锁被 线程A
占用,线程B
执行到 nonfairTryAcquire 方法时,同步状态即不是0,线程B
也不是独占线程,所以这里返回 false
。
4.2 如何加入队列
在 线程B
执行 tryAcquire
获取锁失败后,会执行 addWaiter(Node.EXCLUSIVE)
加入等待队列,具体实现方法如下:
// java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
private Node addWaiter(Node mode) {
// 通过当前的线程和锁模式新建一个节点
Node node = new Node(Thread.currentThread(), mode);
// Pred指针指向尾节点Tail
Node pred = tail;
if (pred != null) {
// 将New中Node的Prev指针指向Pred
node.prev = pred;
// CAS设置Tail节点为新建的节点
if (compareAndSetTail(pred, node)) {
// pred节点的next指针指向新的node节点
pred.next = node;
// 返回新建的node节点
return node;
}
}
// node节点加入CLH队列
enq(node);
// 返回node节点
return node;
}
由于 线程B
在争抢锁时,此时 CLH队列为空,即 head、tail 都为 null,那么上面代码中的 pred 也为null,此时需要创建CLH队列,并将 线程B
加入到队列中,这时就要进入 enq(node) 方法
// java.util.concurrent.locks.AbstractQueuedSynchronizer#enq
private Node enq(final Node node) {
// 自旋操作
for (;;) {
Node t = tail;
// 如果队列没有被初始化过,则先初始化一个虚拟节点(new Node())
if (t == null) { // Must initialize
// 初始化虚节点,并将它设置成头节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 队列不为空时,当前线程所在节点node的prev指针指向尾节点
node.prev = t;
// CAS设置当前节点为尾节点
if (compareAndSetTail(t, node)) {
// 之前的尾节点的next指针指向当前节点node
t.next = node;
// 返回之前的尾节点
return t;
}
}
}
}
如果队列没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点(虚节点)。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。其实,addWaiter
就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。
另外也需要注意,enq() 方法返回的 node对象(t) 并不是 addWaiter 方法返回的node对象。
此时 线程B
在经过 enq方法的自旋处理后,进入了CLH队列,如下图:
这时 线程B 就入队了,作为尾节点插入到队列中,头节点是一个虚节点。如果再有线程要获取锁,依次在队列中往后排队即可。
如果线程争抢锁是一个公平锁,那么还会增加下面一段逻辑代码:hasQueuedPredecessors()
方法
hasQueuedPredecessors
是公平锁加锁时判断等待队列中是否存在有效节点的方法。如果返回False,说明当前线程可以争取共享资源;如果返回True,说明队列中存在有效节点,当前线程必须加入到等待队列中。
//java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {
// 尾节点
Node t = tail; // Read fields in reverse initialization order
// 头节点
Node h = head;
// 指向头节点的下一个节点
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
看到这里,我们理解一下 h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
为什么要判断的头结点的下一个节点?第一个节点储存的数据是什么?
双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。当
h != t
时: 如果(s = h.next) == null
,等待队列正在有线程进行初始化,但只是进行到了Tail指向Head,没有将Head指向Tail,此时队列中有元素,需要返回True(这块具体见下边代码分析)。 如果(s = h.next) != null,说明此时队列中至少有一个有效节点。如果此时s.thread == Thread.currentThread(),说明等待队列的第一个有效节点中的线程与当前线程相同,那么当前线程是可以获取资源的;如果s.thread != Thread.currentThread(),说明等待队列的第一个有效节点线程与当前线程不同,当前线程必须加入进等待队列。
这时,我们再回顾一下节点的入队操作,enq()
方法
// java.util.concurrent.locks.AbstractQueuedSynchronizer#enq
private Node enq(final Node node) {
// 自旋操作
for (;;) {
Node t = tail;
// 如果队列没有被初始化过,则先初始化一个虚拟节点(new Node())
if (t == null) { // Must initialize
// 初始化虚节点,并将它设置成头节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 队列不为空时,当前线程所在节点node的prev指针指向尾节点
node.prev = t;
// CAS设置当前节点为尾节点
if (compareAndSetTail(t, node)) {
// 之前的尾节点的next指针指向当前节点node
t.next = node;
// 返回之前的尾节点
return t;
}
}
}
}
节点入队不是原子操作,所以会出现短暂的 head != tail
,此时Tail指向 null
,而Head指向虚节点
。这种情况下也需要将相关线程加入队列中。所以这块代码是为了解决极端情况下的并发问题。
4.3 等待队列中线程出队列时机
回到最初的源码:
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上文解释了addWaiter
方法,这个方法其实就是把对应的线程以Node的数据结构形式加入到双端队列里,返回的是一个包含该线程的Node。而这个Node会作为参数,进入到acquireQueued
方法中。acquireQueued
方法可以对排队中的线程进行“获锁”操作。
总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued
会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。
下面我们从“何时出队列?”和“如何出队列?”两个方向来分析一下acquireQueued源码:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程中是否中断过
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
if (p == head && tryAcquire(arg)) {
// 获取锁成功,头指针移动到当前node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
注:setHead方法是把当前节点置为虚节点,但并没有修改 waitStatus
,因为它是一直需要用的数据。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取头结点的节点状态
int ws = pred.waitStatus;
// 说明头结点处于唤醒状态
if (ws == Node.SIGNAL)
return true;
// 通过枚举值我们知道waitStatus>0是取消状态
if (ws > 0) {
do {
// 循环向前查找取消节点,把取消节点从队列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 设置前任节点等待状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire
方法实现业务:
- 判断前节点(pred)waitStatus 是否为 SIGNAL(待唤醒)状态,如果是,直接返回true
- 如果前节点
waitStatus > 0
(已取消状态),则从前节点开始一直往前找,找到一个waitStatus
不大于0的节点,作为当前节点的前节点 - 如果前节点
waitStatus = 0
(默认状态),则将前节点的waitStatus
设置成SIGNAL
parkAndCheckInterrupt
主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private final boolean parkAndCheckInterrupt() {
// 挂起当前线程
LockSupport.park(this);
// 当前线程重新执行后,判断线程是否被中断过,返回中断标记,并清除中断标记(如果返回为true,表示线程被中断过,由于interrupted 方法会清除中断标识,后面会通过 selfInterrupt()补充还原中断标识)
return Thread.interrupted();
}
上述方法的流程图如下:
从上图可以看出,跳出当前循环的条件是当“前置节点是头结点,且当前线程获取锁成功”。为了防止因死循环导致CPU资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起,具体挂起流程用流程图表示如下(shouldParkAfterFailedAcquire流程):
由上述我们可以知道,此时经过 acquireQueued 方法的自旋处理后,头节点(虚节点) 的waitStatus设置成 SIGNAL(-1), 而 线程B 被挂起,等待唤醒。
此时队列如下:
如果此时还有其它线程来争抢锁,例如:线程C
、线程D
、线程E
、线程F
,也加入到队列中,如图下:
尾节点 (NodeE) 的前节点waitStatus 都为 -1。
4.4 CANCELLED状态节点生成
如果线程在抢锁的过程中出现异常,或者线程被中断,此时 acquireQueued
方法会走到 finally
分支,failed
标识依然为 false
// java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程中是否中断过
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
if (p == head && tryAcquire(arg)) {
...
}
...
}
} finally {
if (failed) // failed为false
// 执行线程的取消操作,从队列中出队
cancelAcquire(node);
}
}
通过cancelAcquire
方法,将Node的状态标记为CANCELLED
。接下来,我们逐行来分析这个方法的原理:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private void cancelAcquire(Node node) {
// 将无效节点过滤
if (node == null)
return;
// 设置该节点不关联任何线程,也就是虚节点
node.thread = null;
Node pred = node.prev;
// 通过前驱节点,跳过取消状态的node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 把当前node的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
// 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
// 如果1和2中有一个为true,再判断前驱节点的线程是否为null
// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
当前的流程:
- 获取当前节点的前驱节点,如果前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED。
- 根据当前节点的位置,考虑以下三种情况:
- 当前节点是尾节点。
- 当前节点是Head的后继节点。
- 当前节点不是Head的后继节点,也不是尾节点。
这里看一下unparkSuccessor
方法:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private void unparkSuccessor(Node node) {
// 获取头结点waitStatus
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点的下一个节点
Node s = node.next;
// 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
if (s == null || s.waitStatus > 0) {
s = null;
// 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果当前节点的下个节点不为空,而且状态<=0,就把下一个节点unpark
if (s != null)
LockSupport.unpark(s.thread);
}
根据上述第二条,我们来分析每一种情况的流程。
- 当前节点是尾节点。
当前节点是Head的后继节点。
当前节点不是Head的后继节点,也不是尾节点
以前面的线程队列为例,画流程示意图
当前队列中的线程如下:
此时 线程D 被中断执行,需要退出队列,退出后队列如下:
通过上面的流程,我们对于CANCELLED节点状态的产生和变化已经有了大致的了解,但是为什么所有的变化都是对Next指针进行了操作,而没有对Prev指针进行操作呢?什么情况下会对Prev指针进行操作?
执行cancelAcquire的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过Try代码块中的shouldParkAfterFailedAcquire方法了),如果此时修改Prev指针,有可能会导致Prev指向另一个已经移除队列的Node,因此这块变化Prev指针不安全。 shouldParkAfterFailedAcquire方法中,会执行下面的代码,其实就是在处理Prev指针。shouldParkAfterFailedAcquire是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更Prev指针比较安全。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
5. 如何解锁
前面我们说了,多个线程抢占资源,线程A
抢到了锁,线程B
、线程C
、线程D
、线程E
没有抢到锁,进入CLH队列中的加锁情况,也说了 线程D
中断执行,退出队列的情况。
现在我们来说,线程A
执行结束,对AQS释放锁流程进行分析。由于ReentrantLock在解锁的时候,并不区分公平锁和非公平锁,所以我们直接看解锁的源码:
// java.util.concurrent.locks.ReentrantLock
public void unlock() {
sync.release(1);
}
可以看到,本质释放锁的地方,是通过框架来完成的。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
// 尝试释放锁,成功返回true,失败则返回false
if (tryRelease(arg)) {
// 队列头节点
Node h = head;
if (h != null && h.waitStatus != 0)
// 更新头节点的waitStatus,并唤醒头节点的后继节点
unparkSuccessor(h);
return true;
}
return false;
}
在ReentrantLock里面的公平锁和非公平锁的父类Sync定义了可重入锁的释放锁机制。
// java.util.concurrent.locks.ReentrantLock.Sync
// 方法返回当前锁是不是没有被线程持有
protected final boolean tryRelease(int releases) {
// 减少可重入次数
int c = getState() - releases;
// 当前线程不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
这里反过来再看一下 java.util.concurrent.locks.AbstractQueuedSynchronizer#release 释放锁的代码
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
// 上边自定义的tryRelease如果返回true,说明该锁没有被任何线程持有
if (tryRelease(arg)) {
// 获取头结点
Node h = head;
// 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
这里的判断条件为什么是h != null && h.waitStatus != 0?
h == null
Head还没初始化。初始情况下,head == null
,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。- 如:
线程A
执行结束,其它线程还没有争抢锁的情况
- 如:
h != null && waitStatus == 0
表明后继节点对应的线程仍在运行中,不需要唤醒。- 如:
线程A
执行结束,其它线程刚刚开始争抢锁,CLH队列正在初始化
- 如:
h != null && waitStatus < 0
表明后继节点可能被阻塞了,需要唤醒。- 如:线程A 执行结束,此时CLH队列已经有多个线程排队等待唤醒
再看一下unparkSuccessor方法:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private void unparkSuccessor(Node node) {
// 获取头结点waitStatus
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点的下一个节点
Node s = node.next;
// 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
if (s == null || s.waitStatus > 0) {
s = null;
// 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果当前节点的下个节点不为空,而且状态<=0,就把下一个节点unpark
if (s != null)
LockSupport.unpark(s.thread);
}
对应到我们的CLH队列示例图,就是 虚节点
的 waitStatus
设置成了0,然后将 线程B
unpark。
为什么要从后往前找第一个非Cancelled的节点呢?原因如下。
之前的 addWaiter
方法:
// java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
private Node addWaiter(Node mode) {
// 通过当前的线程和锁模式新建一个节点
Node node = new Node(Thread.currentThread(), mode);
// Pred指针指向尾节点Tail
Node pred = tail;
if (pred != null) {
// 将New中Node的Prev指针指向Pred
node.prev = pred;
// CAS设置Tail节点为新建的节点
if (compareAndSetTail(pred, node)) {
// pred节点的next指针指向新的node节点
pred.next = node;
// 返回新建的node节点
return node;
}
}
// node节点加入CLH队列
enq(node);
// 返回node节点
return node;
}
我们从这里可以看到,节点入队并不是原子操作,也就是说,node.prev = pred;
compareAndSetTail(pred, node)
这两个地方可以看作Tail入队的原子操作,但是此时 pred.next = node;
还没执行,如果这个时候执行了unparkSuccessor方法,就没办法从前往后找了,所以需要从后往前找。还有一点原因,在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的Node。
综上所述,如果是从前往后找,由于极端情况下入队的非原子操作和CANCELLED节点产生过程中断开Next指针的操作,可能会导致无法遍历所有的节点。所以,唤醒对应的线程后,对应的线程就会继续往下执行。
6. 中断恢复后的执行流程
线程B
被唤醒后,会执行return Thread.interrupted();
,这个函数返回的是当前执行线程的中断状态,并清除。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
再回到 acquireQueued
代码,当 parkAndCheckInterrupt
返回True或者False的时候,interrupted的值不同,但都会执行下次循环。如果这个时候获取锁成功,就会把当前interrupted返回。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程中是否中断过
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
if (p == head && tryAcquire(arg)) {
// 获取锁成功,头指针移动到当前node
setHead(node);
p.next = null; // help GC
failed = false;
// 返回线程的中断标识
return interrupted;
}
// 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 中断标识设置为true
interrupted = true;
}
} finally {
if (failed)
// 取消线程执行
cancelAcquire(node);
}
}
如果acquireQueued为True,就会执行selfInterrupt方法。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
- 当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过Thread.interrupted()方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为False),并记录下来,如果发现该线程被中断过,就再中断一次(重新恢复线程的中断标识)。
- 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。
六、AQS应用
1. ReentrantLock的可重入应用
ReentrantLock的可重入性是AQS很好的应用之一,在了解完上述知识点以后,我们很容易得知ReentrantLock实现可重入的方法。在ReentrantLock里面,不管是公平锁还是非公平锁,都有一段逻辑。
公平锁:
// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; }
非公平锁:
// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire if (c == 0) { if (compareAndSetState(0, acquires)){ setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; }
从上面这两段都可以看到,有一个 同步状态State
来控制整体可重入的情况。State
是Volatile
修饰的,用于保证一定的可见性和有序性。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private volatile int state;
接下来看State这个字段主要的过程:
- State初始化的时候为0,表示没有任何线程持有锁。
- 当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁是,就会多次+1,这里就是可重入的概念。
- 解锁也是对这个字段-1,一直到0,此线程对锁释放。
2. JUC中的应用场景
除了上边ReentrantLock的可重入性的应用,AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了JUC中的几种同步工具,大体介绍一下AQS的应用场景:
同步工具 | 同步工具与AQS的关联 |
---|---|
ReentrantLock | 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。 |
Semaphore | 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数。 |
CountDownLatch | 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过。 |
ReentrantReadWriteLock | 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数。 |
ThreadPoolExecutor | Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease)。 |
3. 自定义同步工具
了解AQS基本原理以后,按照上面所说的AQS知识点,自己实现一个同步工具。
public class MyLock {
private static class Sync extends AbstractQueuedSynchronizer {
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}
protected boolean tryRelease(int arg) {
return compareAndSetState(1, 0);
}
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
private Sync sync = new Sync();
/**
* 加锁
*/
public void lock() {
sync.acquire(1);
}
/**
* 解锁
*/
public void unlock() {
sync.release(1);
}
}
测试代码:
public class TestMyLock {
static int count = 0;
static MyLock myLock = new MyLock();
public static void main (String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
public void run () {
try {
myLock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
myLock.unlock();
}
}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}
运行结果:
20000
上述代码每次运行结果都会是20000。通过简单的几行代码就能实现同步功能,这就是AQS的强大之处。
本文主要参考以下链接
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 george_95@126.com