分布式可重入锁

# 分布式可重入锁初步分析

分析:

在Redis内放置线程唯一标识,每加锁一次Value值加一,每解锁一次Value值减一,当减为零时把锁删除掉。

思路:

  • 我们可以在uuid后再拼接一个数字,用来记录重入的次数,但是这样会比较麻烦。
  • 使用Redis中的Hash数据模型+Lua脚本,以lock作为最外层的key,以uuid作为Map的K,以重入次数作为Map的V。

# Redis数据模型Hash使用

127.0.0.1:6379> hset user name jack
(integer) 1
127.0.0.1:6379> hset user age 20
(integer) 1
1
2
3
4

以上命令设置了一个user对象,对象中存放了name和age,分别为jack和20.

image-20221009201148177

# 可重入锁——加锁的Lua脚本

流程分析:

  1. 判断锁是否存在(通过EXISTS命令),不存在则直接获取锁(hset key field value命令)
  2. 如果锁存在,则判断是否为自己的锁(HEXISTS),如果是自己的锁,则可以重入(hincrby key field increment)
  3. 如果锁存在但是又不是自己的锁,则进行重试(递归或者循环)
if redis.call('exists','lock')==0 
then 
  	redis.call('hset','lock',uuid,1) 
		redis.call('expire','lock',30) 
		return 1 
elseif redis.call('hexists','lock',uuid)==1 
then 
	redis.call('hincrby','lock',1) 
	redis.call('expire','lock',30) 
	return 1 
else 
	return 0 
end
1
2
3
4
5
6
7
8
9
10
11
12
13

以上Lua脚本有着一些重复的地方,其实我们在没有lock键的时候,也可以执行hincrby命令的,所以优化代码,最终代码为:

if redis.call('exists',KEYS[1])==0 or redis.call('hexists',KEYS[1],ARGV[1])==1 
then 
  	redis.call('hincrby',KEYS[1],ARGV[1],1) 
		redis.call('expire',KEYS[1],30) 
		return 1 
else 
	return 0 
end
1
2
3
4
5
6
7
8

# 可重入锁——解锁的Lua脚本

流程分析:

  1. 判断自己的锁是否存在(HEXISTS),如果不存在则返回nil
  2. 如果自己的锁存在,则减一(hincrby -1),判断减一后的值是否为0,如果为0则释放锁(del),并返回1
  3. 如果减一后的值不为0,返回0

Lua脚本代码:

if redis.call('hexists',KEYS[1],ARGV[1])==0 
then 
	return nil 
elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)==0 
then 
	return redis.call('del',KEYS[1]) 
else 
	return 0 
end
1
2
3
4
5
6
7
8
9

# 可重入锁Lua脚本——代码实现

因为加锁和解锁的代码嵌入到业务代码中很不优雅,看起来也很乱,所以这里把加锁和解锁封装到工具类中。

我们创建DistributedRedisLock类,让这个类实现Lock接口,这里的Lock接口是ReentrantLock类实现的那个Lock接口。

image-20221009212323745

/**
 * @Author 早睡蛋
 * @Date 2022/10/9 21:20:52
 * @Desc:
 *      三个加锁方法和一个解锁方法,重点实现
 */
public class DistributedRedisLock implements Lock {
    @Override
    public void lock() {}

    @Override
    public boolean tryLock() {}

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {}

    @Override
    public Condition newCondition() {
        return null;
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# DistributedRedisLock自定义加/解锁类代码

/**
 * @Author 早睡蛋
 * @Date 2022/10/9 21:20:52
 * @Desc: 三个加锁方法和一个解锁方法,重点实现
 */
public class DistributedRedisLock implements Lock {

    /**
     * 这里的redisTemplate没有注入,是因为本类没有交给Spring进行管理,我们通过构造方法进行外部注入
     */
    private StringRedisTemplate redisTemplate;

    /**
     * lockName是锁的名称,通过该属性,顶层业务中可以设置自定义的锁名称
     */
    private String lockName;

    /**
     * uuid是锁的唯一标识,因为加锁和解锁在本类中进行,所以创建uuid也在创建构造方法内进行
     */
    private String uuid;

    /**
     * 锁的过期时间,默认是30秒
     */
    private long expire = 30;

    /**
     * 自定义构造方法,为了使用工厂方法外部注入redisTemplate,还有外部传入锁名称
     *
     * @param redisTemplate
     * @param lockName
     */
    public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = UUID.randomUUID().toString();
    }

    /**
     * lock方法,直接调用我们写好的tryLock无参方法即可
     */
    @Override
    public void lock() {
        tryLock();
    }

    /**
     * 无参方法,直接调用有参的就可以了
     *
     * @return
     */
    @Override
    public boolean tryLock() {
        try {
            return tryLock(-1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //加锁失败就返回false
        return false;
    }

    /**
     * 我们需要重点编写tryLock有参方法
     *
     * @param time the maximum time to wait for the lock
     * @param unit the time unit of the {@code time} argument
     * @return
     * @throws InterruptedException
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        //若没有传入时间,则使用默认时间
        if (time != -1) {
            this.expire = unit.toSeconds(time);
        }
        //Lua脚本字符串导入
        String script = "if redis.call('exists',KEYS[1])==0 or redis.call('hexists',KEYS[1],ARGV[1])==1 " +
                "then " +
                "   redis.call('hincrby',KEYS[1],ARGV[1],1) " +
                "   redis.call('expire',KEYS[1],ARGV[2]) " +
                "   return 1 " +
                "else " +
                "   return 0 " +
                "end";
        while (!redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName),uuid, String.valueOf(expire))) {
            Thread.sleep(50);
        }
        return true;
    }

    /**
     * 解锁方法
     */
    @Override
    public void unlock() {
        String script = "if redis.call('hexists',KEYS[1],ARGV[1])==0 " +
                "then " +
                "   return nil " +
                "elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)==0 " +
                "then " +
                "   return redis.call('del',KEYS[1]) " +
                "else " +
                "   return 0 " +
                "end";
        Long flag = redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuid);
        if (flag == null) {
            throw new IllegalMonitorStateException("this lock is not belong to you!");
        }

    }

    @Override
    public Condition newCondition() {
        return null;
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123

# 构造工厂方法DistributedLockClient

/**
 * @Author 早睡蛋
 * @Date 2022/10/9 21:38:13
 * @Desc: 分布式锁客户端
 */
@Component
public class DistributedLockClient {

    /**
     * 为什么需要在次注入redisTemplate?
     * 因为该类才是被Spring管理的,才能进行注入
     * 我们使用DistributedRedisLock的构造方法实例化DistributedRedisLock时传入redisTemplate与锁名称
     */
    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 获取Redis的分布式锁对象
     *
     * @return
     */
    public DistributedRedisLock getRedisLock(String lockName) {
        return new DistributedRedisLock(redisTemplate, lockName);
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 业务层代码

		@Autowired
    private StringRedisTemplate redisTemplate;
		
		/**
     * 注入工厂对象,目的是创建DistributedRedisLock实例
     */
    @Autowired
    private DistributedLockClient distributedLockClient;

    public void deduct() {
        //获取Redis分布式锁,加锁
        DistributedRedisLock redisLock = distributedLockClient.getRedisLock("lock");
        redisLock.lock();
        try {
            //1.查询库存信息
            String stock = redisTemplate.opsForValue().get("stock");
            //2.判断库存是否充足
            if (stock != null && stock.length() != 0) {
                Integer st = Integer.valueOf(stock);
                if (st > 0) {
                    //3.扣减库存
                    redisTemplate.opsForValue().set("stock", String.valueOf(--st));
                }
            }
        } finally {
            redisLock.unlock();
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 可重入测试

public void deduct() {
    //获取Redis分布式锁,加锁
    DistributedRedisLock redisLock = distributedLockClient.getRedisLock("lock");
    redisLock.lock();
    try {
        //1.查询库存信息
        String stock = redisTemplate.opsForValue().get("stock");
        //2.判断库存是否充足
        if (stock != null && stock.length() != 0) {
            Integer st = Integer.valueOf(stock);
            if (st > 0) {
                //3.扣减库存
                redisTemplate.opsForValue().set("stock", String.valueOf(--st));
            }
        }
        
        test();
        
    } finally {
        redisLock.unlock();
    }
}
/**
     * 可重入测试切入方法
     */
public void test() {
    DistributedRedisLock lock1 = distributedLockClient.getRedisLock("lock");
    lock1.lock();
    System.out.println("可重入锁.....");
    lock1.unlock();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

通过断点测试,上述代码出现问题,原因是UUID出现了问题。我们是在DistributedRedisLock类里面创造uuid的,但是DistributedRedisLock没有交给Spring管理,是一个多例对象,所以每执行一次上锁获取到的uuid都不一样。

因此我们要解决因为uuid不一致导致的锁不一致问题。

为什么不能直接用线程id代替uuid?这是因为在集群下,每一个后台服务的线程id会重复。

# 修改代码——uuid与线程id确保一致性

因为工厂方法是交给Spring托管的,所以是单例的,我们在工厂方法内通过无参构造创建uuid,这样就确保每一个服务都是唯一的。然后我们在加锁方法里,获取当前线程id,拼接到uuid后面,这样就确保了每一个服务内的每一个线程都具有唯一标识。

首先修改DistributedLockClient类:

/**
 * @Author 早睡蛋
 * @Date 2022/10/9 21:38:13
 * @Desc: 分布式锁客户端
 */
@Component
public class DistributedLockClient {

    /**
     * 为什么需要在次注入redisTemplate?
     * 因为该类才是被Spring管理的,才能进行注入
     * 我们使用DistributedRedisLock的构造方法实例化DistributedRedisLock时传入redisTemplate与锁名称
     */
    @Autowired
    private StringRedisTemplate redisTemplate;

    private String uuid;
  
		//注解注入会优先执行无参构造方法,这样就确保了每一个服务初始化后uuid是一样的
    public DistributedLockClient() {
        this.uuid = UUID.randomUUID().toString();
    }

    /**
     * 获取Redis的分布式锁对象
     * 传入唯一标识uuid
     *
     * @return
     */
    public DistributedRedisLock getRedisLock(String lockName) {
        return new DistributedRedisLock(redisTemplate, lockName, uuid);
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

修改DistributedRedisLock类:

/**
     * 我们需要重点编写tryLock有参方法
     *
     * @param time the maximum time to wait for the lock
     * @param unit the time unit of the {@code time} argument
     * @return
     * @throws InterruptedException
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        //若没有传入时间,则使用默认时间
        if (time != -1) {
            this.expire = unit.toSeconds(time);
        }
        //Lua脚本字符串导入
        String script = "if redis.call('exists',KEYS[1])==0 or redis.call('hexists',KEYS[1],ARGV[1])==1 " +
                "then " +
                "   redis.call('hincrby',KEYS[1],ARGV[1],1) " +
                "   redis.call('expire',KEYS[1],ARGV[2]) " +
                "   return 1 " +
                "else " +
                "   return 0 " +
                "end";
        while (!redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), getThreadId(), String.valueOf(expire))) {
            Thread.sleep(50);
        }
        return true;
    }

    /**
     * 解锁方法
     */
    @Override
    public void unlock() {
        String script = "if redis.call('hexists',KEYS[1],ARGV[1])==0 " +
                "then " +
                "   return nil " +
                "elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)==0 " +
                "then " +
                "   return redis.call('del',KEYS[1]) " +
                "else " +
                "   return 0 " +
                "end";
        Long flag = redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), getThreadId());
        if (flag == null) {
            throw new IllegalMonitorStateException("this lock is not belong to you!");
        }

    }

    /**
     * 获取线程id并且拼接uuid
     *
     * @return
     */
    String getThreadId() {
        return uuid + ":" + Thread.currentThread().getId();
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58