ReentrantLock可重入锁底层分析

# 不可重入导致死锁问题

假设有一段业务代码A,A一开始会获取锁,上锁之后执行流程,这是流程里面要调用方法B,而方法B里面也要获取锁。这是因为在一次请求中,这把锁的名字都是一样的,导致B不可能会拿到锁,所以B会一直重试。这是A也不会释放锁,因为A要等B执行完,这就导致了死锁问题。

# ReentrantLock构造方法到底层

    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

默认无参构造是使用非公平锁,公平锁还是非公平锁,它们都继承自Sync类

    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {...}

    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {...}
1
2
3
4
5
6
7
8
9

Sync是ReentrantLock里面的一个内部类,

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {...}
1
2
3
4
5
6

而Sync又继承自AbstractQueuedSynchronizer(AQS),AQS是JUC中所有锁的基石

image-20221009165922284

关于AQS,他的底层主要有两部分构成,一个是state(int类型,默认值为0),一个是队列(FIFO类型的队列)

    /**
     * The synchronization state.
     */
    private volatile int state;
1
2
3
4

# ReentrantLock锁的执行流程

非公平锁:

首先一个请求进来,进入AQS,通过CAS对操作state进行操作,判断state的值是否为0,如果是0,则变为1,说明抢锁成功。反之,若CAS操作失败了,就会被扔到队列中,进行排队。但是非公平锁并不会确保先进来的请求能先抢到锁,因为会不断的有其他请求来插队。

公平锁:

和非公平锁的执行顺序有点相反,公平锁的请求先进来队列,对队列进行判断,若队列中没有其他请求,则才对state进行操作。

# 非公平锁的加锁分析

我们对lock()方法源码进行查阅,它是调用了sync的lock()方法

    /**
     * Acquires the lock.
     *
     * <p>Acquires the lock if it is not held by another thread and returns
     * immediately, setting the lock hold count to one.
     *
     * <p>If the current thread already holds the lock then the hold
     * count is incremented by one and the method returns immediately.
     *
     * <p>If the lock is held by another thread then the
     * current thread becomes disabled for thread scheduling
     * purposes and lies dormant until the lock has been acquired,
     * at which time the lock hold count is set to one.
     */
    public void lock() {
        sync.lock();
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

再去查看非公平锁的类,查看执行的逻辑,可以看到,一上来就是CAS操作,期望值是0,目标值是1。若CAS操作成功了,则通过setExclusiveOwnerThread设置当前线程为排他线程。

    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

compareAndSetState方法源码如下,其中unsafe会使用指针对内存中的变量直接进行操作(因为在Java中,只能对对象进行操作,这里是JDK提供的一个后门),unsafe内提供了大量硬件级别的CAS原子性操作(比较并交换)。

注意:unsafe是提供给JDK进行操作的,我们在代码内是不建议直接使用unsafe的。

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
1
2
3
4

回到上面的代码,如果获取锁不成功,则执行acquire()方法。

final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
1
2
3
4
5
6

acquire()方法如下,这里使用了&&符号,表明只有&&左边为true是=时,右边才会继续执行。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
1
2
3
4
5

tryAcquire()方法的作用是尝试获取锁,代码如下:

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
1
2
3

其实,很多的锁方法,AQS都已经帮我们实现了,我们可以通过AQS实现独占锁(排他)和共享锁。

  • 实现独占锁(排他锁)我们需要重写AQS的tryAcquiretryRelease方法
  • 实现共享锁我们需要重写tryAcquireSharedtryReleaseShared方法

ReentrantLock就是独占排他锁,所以需要重写tryAcquiretryRelease方法

因为Sync继承了AQS,我们查询Sync类,发现只实现了tryRelease方法:

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
1
2
3
4
5
6
7
8
9
10
11
12

那么tryAcquire方法的重写在哪?其实就在非公平锁类里面

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
1
2
3

我们继续查阅NonfairSync.tryAcquire()方法内的nonfairTryAcquire方法,它是Sync内的方法,其中这里的入参acquires为1,至于为什么为1,可以重新按照上述加锁的顺序进行查阅。

final boolean nonfairTryAcquire(int acquires) {
  					//获取当前线程
            final Thread current = Thread.currentThread();
  					//获取state
            int c = getState();
            if (c == 0) {
              	//进行CAS操作,若成功,则setExclusiveOwnerThread为当前线程,做到排他
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
  					//如果c不等于0,则判断当前线程是否是有锁线程,如果是,则进行重入,给线程加1
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

这里返回true或者false的意义,是用来在AQS的acquire方法内进行使用的,代码如下。如果加锁成功,返回true,则!tryAcquire(arg)的值为false,后面都不用执行了,如果加锁失败,!tryAcquire(arg)就为true,则继续执行后面的acquireQueued(入队列)和selfInterrupt方法。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
1
2
3
4
5

我们重点分析acquireQueued中的入参类型addWaiter方法,代码如下,其中tail是尾节点。整个方法的执行意思就是,判断尾节点若不为空,则将当前线程加入到尾巴上去。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14

如果为尾节点为空,则执行enq方法。这时的场景是什么?就是有两个请求进来,第一个请求拿到锁了,第二个请求只能入队,这时的队伍是空的。enq内做了一个自旋操作,然后判断若没有尾节点,则compareAndSetHead创建一个新的节点,这个节点没有线程,让尾节点等于这个新的头节点,然后再次进入死循环,这是尾节点已经不为空了。然后将当前线程节点追加到这个以尾节点为首的队列中去,然后执行设置尾节点的方法,最后把当前的node节点设置成尾巴就行了。

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

所以可重入锁的加锁流程为:

  1. ReentrantLock.lock()
  2. NonfairSync.lock()
  3. AQS.acquire()
  4. NonfairSync.tryAcquire()
  5. Sync.nonfairTryAcquire()

大致流程:

  1. CAS获取锁,如果没有线程占用锁(state==0),加锁成功并且记录当前线程是有锁线程(其实这里如果没有线程占用锁,还会再执行一次)。
  2. 如果state不为0,说明锁已经被占用,则判断是否是当前线程占用了锁,如果是则重入(state++)。
  3. 都不满足,则加锁失败,返回false,入队等待。

# 非公平锁解锁分析

    /**
     * Attempts to release this lock.
     *
     * <p>If the current thread is the holder of this lock then the hold
     * count is decremented.  If the hold count is now zero then the lock
     * is released.  If the current thread is not the holder of this
     * lock then {@link IllegalMonitorStateException} is thrown.
     *
     * @throws IllegalMonitorStateException if the current thread does not
     *         hold this lock
     */
    public void unlock() {
        sync.release(1);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14

我们可以看到,ReentrantLock中的unlock方法调用的是Sync子类NonfairSync的release方法,但NonfairSync中并没有release方法,并入Sync中也没有。所以release方法在AQS中。

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
1
2
3
4
5
6
7
8
9

AQS的release方法中,调用了tryRelease方法,代码如下。

protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
1
2
3

我们可以看到AQS的tryRelease并没有具体的实现方法,因为前面说到,使用独占锁或者共享锁都需要重写对应的方法。所以AQS的tryRelease由Sync来重写实现。具体实现代码如下:

protected final boolean tryRelease(int releases) {
  					//这里的releases值为1
            int c = getState() - releases;
  					//判断要释放的是否是拥有锁的线程,如果释放的锁不属于自己,则报错
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
  					//这里需要结合本篇文章开头讲的例子进行思考,因为方法内可能有嵌套方法,上的锁不止一把,所以需要判断是否都释放完了
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

可重入锁的解锁流程:

  1. ReentrantLock.unlock(1)
  2. AQS.release(1)
  3. Sync.tryRelease(1)

大致流程:

  1. 判断当前线程是否有锁(是否是自己的锁),如果不是则抛出异常
  2. 对state的值减一后进行判断,判断是否为0,如果为0,则解锁成功返回true
  3. 如果减一后不为0,则返回false