分布式锁
早睡蛋
# 为什么使用分布式锁
在使用Redis解决并发问题时,不同的锁有着不同的问题:
- JVM本地锁仅在单机单例情况下才能生效
- Redis乐观锁性能较低
分布式锁特点:
- 跨进程
- 跨服务
- 跨服务器
# 分布式锁应用场景
- 超卖现象
- 缓存击穿 一个热点key过期,导致MySQL服务器宕机
# Redis实现分布式锁——分析
分布式锁实现方案:
- 基于Redis实现(√)
- 基于zookeeper/etcd实现
- 基于MySQL实现
特征:
独占排他使用(Redis中的setNX命令)
防止死锁的发生
解决:给锁添加过期时间(expire) 不可重入:可重入性
保证原子性操作 获取锁和锁过期之间:set K V ex TIME nx 判断锁和释放锁之间:Lua脚本
防误删 解决:先判断,在删除
可重入性
自动续期
在Redis集群下,主从不一致导致锁机制失效 解决:RedLock红锁算法
操作:
- 加锁:setNX
- 解锁 :del
- 重试:递归、循环
# Redis实现一个简易的分布式锁
以下代码是初步实现,存在很大的问题,在重试的递归调用中,会存在同一次操作入了多次栈,这会导致锁释放后同一次的操作执行了多次的代码逻辑,从而多减了几次库存。
public void deduct() {
//加锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
if (!lock) {
try {
//重试
Thread.sleep(50);
deduct();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
try {
//1.查询库存信息
String stock = redisTemplate.opsForValue().get("stock");
//2.判断库存是否充足
if (stock != null && stock.length() != 0) {
Integer st = Integer.valueOf(stock);
if (st > 0) {
//3.扣减库存
redisTemplate.opsForValue().set("stock", String.valueOf(--st));
}
}
} finally {
//解锁
redisTemplate.delete("lock");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
解决方案,使用if else判断即可:
public void deduct() {
//加锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
if (!lock) {
try {
//重试
Thread.sleep(50);
deduct();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
try {
//1.查询库存信息
String stock = redisTemplate.opsForValue().get("stock");
//2.判断库存是否充足
if (stock != null && stock.length() != 0) {
Integer st = Integer.valueOf(stock);
if (st > 0) {
//3.扣减库存
redisTemplate.opsForValue().set("stock", String.valueOf(--st));
}
}
} finally {
//解锁
redisTemplate.delete("lock");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 循环重试替换递归重试
递归重试可能会出现栈溢出问题,所以改为循环会好一点
public void deduct() {
//加锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
while (!lock) {
try {
//重试
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//1.查询库存信息
String stock = redisTemplate.opsForValue().get("stock");
//2.判断库存是否充足
if (stock != null && stock.length() != 0) {
Integer st = Integer.valueOf(stock);
if (st > 0) {
//3.扣减库存
redisTemplate.opsForValue().set("stock", String.valueOf(--st));
}
}
} finally {
//解锁
redisTemplate.delete("lock");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 添加锁过期时间防止死锁
场景:
当我们有很多请求并发进行,其中一个请求在加了锁之后,Redis服务器突然宕机了,这时就不会进行释放锁操作,导致其他请求都被阻塞了。
加锁命令:
127.0.0.1:6379> expire stock 20
(integer) 1
127.0.0.1:6379> ttl stock
(integer) 16
127.0.0.1:6379> ttl stock
(integer) 11
127.0.0.1:6379> ttl stock
(integer) 1
127.0.0.1:6379> ttl stock
(integer) -2
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
代码逻辑:
//加锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
while (!lock) {
try {
//重试
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//设置过期时间
redisTemplate.expire("lock", 3, TimeUnit.SECONDS);
//1.查询库存信息
String stock = redisTemplate.opsForValue().get("stock");
//2.判断库存是否充足
if (stock != null && stock.length() != 0) {
Integer st = Integer.valueOf(stock);
if (st > 0) {
//3.扣减库存
redisTemplate.opsForValue().set("stock", String.valueOf(--st));
}
}
} finally {
//解锁
redisTemplate.delete("lock");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
但是以上代码还是存在问题,因为有时候在第一次加锁,然后还没来得及给锁添加过期时间前服务就宕机了。
解决方案:
使用原子操作命令:set K V ex TIME nx
我们对加锁代码进行修改:
//加锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111", 3, TimeUnit.SECONDS);
1
2
2
# 关于While方法体出现的sleep interrupted异常
若是以这种形式执行代码,可能会出现sleep interrupted的问题,在并发量高的情况下,请求会被堵塞,因为是因为主方法执行的时间少于方法内线程的执行时间。
//加锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111", 3, TimeUnit.SECONDS);
while (!lock) {
try {
//重试
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
解决方案:
将赋值的逻辑直接写入While条件中
while (!redisTemplate.opsForValue().setIfAbsent("lock", "111", 3, TimeUnit.SECONDS)) {
try {
//重试
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 通过UUID防误删
误删的场景:
- 一般是某一些请求的时间超出了锁的存活时间,导致这些长请求释放的锁并不是自己的,而是其他请求的,这就是误删的原理。只要有一个请求释放了不属于自己的锁,就会导致后续的所有请求都会处于“裸奔”状态。
- 还有一种情况是,我们在加锁的时候加的不对,然后最后又直接进行释放锁的操作,就会导致误删。例如ReentrantLock的加锁例子。
自动续期:
若有的请求的执行时长超过加锁时设置的存活时长,则触发锁的续期
误删解决方案:
使用UUID判断当前锁是否是自己的,再进行释放
public void deduct() {
String uuid = UUID.randomUUID().toString();
//加锁
while (!redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS)) {
try {
//重试
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//1.查询库存信息
String stock = redisTemplate.opsForValue().get("stock");
//2.判断库存是否充足
if (stock != null && stock.length() != 0) {
Integer st = Integer.valueOf(stock);
if (st > 0) {
//3.扣减库存
redisTemplate.opsForValue().set("stock", String.valueOf(--st));
}
}
} finally {
//先判断是否是自己的锁,再进行解锁
if (StringUtils.equals(redisTemplate.opsForValue().get("lock"), uuid)) {
redisTemplate.delete("lock");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29