ReentrantLock可重入锁底层分析
# 不可重入导致死锁问题
假设有一段业务代码A,A一开始会获取锁,上锁之后执行流程,这是流程里面要调用方法B,而方法B里面也要获取锁。这是因为在一次请求中,这把锁的名字都是一样的,导致B不可能会拿到锁,所以B会一直重试。这是A也不会释放锁,因为A要等B执行完,这就导致了死锁问题。
# ReentrantLock构造方法到底层
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
默认无参构造是使用非公平锁,公平锁还是非公平锁,它们都继承自Sync类
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {...}
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {...}
2
3
4
5
6
7
8
9
Sync是ReentrantLock里面的一个内部类,
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {...}
2
3
4
5
6
而Sync又继承自AbstractQueuedSynchronizer(AQS),AQS是JUC中所有锁的基石
关于AQS,他的底层主要有两部分构成,一个是state(int类型,默认值为0),一个是队列(FIFO类型的队列)
/**
* The synchronization state.
*/
private volatile int state;
2
3
4
# ReentrantLock锁的执行流程
非公平锁:
首先一个请求进来,进入AQS,通过CAS对操作state进行操作,判断state的值是否为0,如果是0,则变为1,说明抢锁成功。反之,若CAS操作失败了,就会被扔到队列中,进行排队。但是非公平锁并不会确保先进来的请求能先抢到锁,因为会不断的有其他请求来插队。
公平锁:
和非公平锁的执行顺序有点相反,公平锁的请求先进来队列,对队列进行判断,若队列中没有其他请求,则才对state进行操作。
# 非公平锁的加锁分析
我们对lock()方法源码进行查阅,它是调用了sync的lock()方法
/**
* Acquires the lock.
*
* <p>Acquires the lock if it is not held by another thread and returns
* immediately, setting the lock hold count to one.
*
* <p>If the current thread already holds the lock then the hold
* count is incremented by one and the method returns immediately.
*
* <p>If the lock is held by another thread then the
* current thread becomes disabled for thread scheduling
* purposes and lies dormant until the lock has been acquired,
* at which time the lock hold count is set to one.
*/
public void lock() {
sync.lock();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
再去查看非公平锁的类,查看执行的逻辑,可以看到,一上来就是CAS操作,期望值是0,目标值是1。若CAS操作成功了,则通过setExclusiveOwnerThread
设置当前线程为排他线程。
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
compareAndSetState
方法源码如下,其中unsafe会使用指针对内存中的变量直接进行操作(因为在Java中,只能对对象进行操作,这里是JDK提供的一个后门),unsafe内提供了大量硬件级别的CAS原子性操作(比较并交换)。
注意:unsafe是提供给JDK进行操作的,我们在代码内是不建议直接使用unsafe的。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2
3
4
回到上面的代码,如果获取锁不成功,则执行acquire()方法。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
2
3
4
5
6
acquire()方法如下,这里使用了&&符号,表明只有&&左边为true是=时,右边才会继续执行。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2
3
4
5
tryAcquire()方法的作用是尝试获取锁,代码如下:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
2
3
其实,很多的锁方法,AQS都已经帮我们实现了,我们可以通过AQS实现独占锁(排他)和共享锁。
- 实现独占锁(排他锁)我们需要重写AQS的
tryAcquire
和tryRelease
方法 - 实现共享锁我们需要重写
tryAcquireShared
和tryReleaseShared
方法
ReentrantLock就是独占排他锁,所以需要重写tryAcquire
和tryRelease
方法
因为Sync继承了AQS,我们查询Sync类,发现只实现了tryRelease方法:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2
3
4
5
6
7
8
9
10
11
12
那么tryAcquire方法的重写在哪?其实就在非公平锁类里面
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
2
3
我们继续查阅NonfairSync.tryAcquire()方法内的nonfairTryAcquire方法,它是Sync内的方法,其中这里的入参acquires为1,至于为什么为1,可以重新按照上述加锁的顺序进行查阅。
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取state
int c = getState();
if (c == 0) {
//进行CAS操作,若成功,则setExclusiveOwnerThread为当前线程,做到排他
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果c不等于0,则判断当前线程是否是有锁线程,如果是,则进行重入,给线程加1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
这里返回true或者false的意义,是用来在AQS的acquire方法内进行使用的,代码如下。如果加锁成功,返回true,则!tryAcquire(arg)的值为false,后面都不用执行了,如果加锁失败,!tryAcquire(arg)就为true,则继续执行后面的acquireQueued(入队列)和selfInterrupt方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2
3
4
5
我们重点分析acquireQueued中的入参类型addWaiter方法,代码如下,其中tail是尾节点。整个方法的执行意思就是,判断尾节点若不为空,则将当前线程加入到尾巴上去。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
如果为尾节点为空,则执行enq方法。这时的场景是什么?就是有两个请求进来,第一个请求拿到锁了,第二个请求只能入队,这时的队伍是空的。enq内做了一个自旋操作,然后判断若没有尾节点,则compareAndSetHead创建一个新的节点,这个节点没有线程,让尾节点等于这个新的头节点,然后再次进入死循环,这是尾节点已经不为空了。然后将当前线程节点追加到这个以尾节点为首的队列中去,然后执行设置尾节点的方法,最后把当前的node节点设置成尾巴就行了。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
所以可重入锁的加锁流程为:
- ReentrantLock.lock()
- NonfairSync.lock()
- AQS.acquire()
- NonfairSync.tryAcquire()
- Sync.nonfairTryAcquire()
大致流程:
- CAS获取锁,如果没有线程占用锁(state==0),加锁成功并且记录当前线程是有锁线程(其实这里如果没有线程占用锁,还会再执行一次)。
- 如果state不为0,说明锁已经被占用,则判断是否是当前线程占用了锁,如果是则重入(state++)。
- 都不满足,则加锁失败,返回false,入队等待。
# 非公平锁解锁分析
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() {
sync.release(1);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
我们可以看到,ReentrantLock中的unlock方法调用的是Sync子类NonfairSync的release方法,但NonfairSync中并没有release方法,并入Sync中也没有。所以release方法在AQS中。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
AQS的release方法中,调用了tryRelease方法,代码如下。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
2
3
我们可以看到AQS的tryRelease并没有具体的实现方法,因为前面说到,使用独占锁或者共享锁都需要重写对应的方法。所以AQS的tryRelease由Sync来重写实现。具体实现代码如下:
protected final boolean tryRelease(int releases) {
//这里的releases值为1
int c = getState() - releases;
//判断要释放的是否是拥有锁的线程,如果释放的锁不属于自己,则报错
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//这里需要结合本篇文章开头讲的例子进行思考,因为方法内可能有嵌套方法,上的锁不止一把,所以需要判断是否都释放完了
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
可重入锁的解锁流程:
- ReentrantLock.unlock(1)
- AQS.release(1)
- Sync.tryRelease(1)
大致流程:
- 判断当前线程是否有锁(是否是自己的锁),如果不是则抛出异常
- 对state的值减一后进行判断,判断是否为0,如果为0,则解锁成功返回true
- 如果减一后不为0,则返回false