分布式锁——自动续期
# 自动续期场景
我们拿可重入锁的例子来讲,当我们一段需要加锁的代码中,需要调用另一个方法,而另一个方法也需要加锁解锁,这时,如果主线程执行的时间比较长,那么留给线程内其他方法的时间就会很少,这样的话方法没执行完,锁的时间到了被释放掉了,那么锁的机制就出问题了。
这时就需要一种机制,能够检测到我们的锁与未执行任务之间的状态,分析并且选择续期。
# 自动续期技术选择
我们选择采用Timer定时任务+Lua脚本来实现自动续期
定时任务:
- JUC定时任务 线程池去承载的,不太好确定每一个定时任务
- Spring定时任务 在业务量大的项目中,会有很多不同的定时任务,不建议选择
- ...
# 为什么不选JUC定时任务?
我们创建一个JUC定时任务:
public static void main(String[] args) {
//不建议使用该方法创建线程池,因为会导致内存溢出
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
System.out.println("定时任务的初始时间:" + System.currentTimeMillis());
scheduledExecutorService.scheduleAtFixedRate(() -> {
System.out.println("定时任务的执行时间:" + System.currentTimeMillis());
}, 5, 10, TimeUnit.SECONDS);
}
2
3
4
5
6
7
8
打印结果:
定时任务的初始时间:1665382518131
定时任务的执行时间:1665382523220
定时任务的执行时间:1665382533226
定时任务的执行时间:1665382543225
定时任务的执行时间:1665382553219
定时任务的执行时间:1665382563220
定时任务的执行时间:1665382573218
定时任务的执行时间:1665382583218
2
3
4
5
6
7
8
除了第一次隔了5秒执行,其他的每隔10秒执行一次。
但是我们无法直接取消这个定时任务,我们只能销毁掉线程池:
//取消这个定时任务,ScheduledExecutorService没有提供方法,我们只能通过shutdown销毁线程池
scheduledExecutorService.shutdown();
2
# Java的Timer定时器
Java的util提供了一个Timer类型的定时器,Timer定时器使用起来非常简单,并且也可以取消定时任务。
public static void main(String[] args) {
/**
* delay:延迟时间
* period:间隔时间
*/
System.out.println("定时任务初始时间" + System.currentTimeMillis());
new Timer().schedule(new TimerTask() {
@Override
public void run() {
System.out.println("定时任务执行时间" + System.currentTimeMillis());
}
}, 5000, 10000);
}
2
3
4
5
6
7
8
9
10
11
12
13
task – task to be scheduled.
delay – delay in milliseconds before task is to be executed.
period – time in milliseconds between successive task executions.
打印结果:
定时任务初始时间1665383250207
定时任务执行时间1665383255222
定时任务执行时间1665383265232
定时任务执行时间1665383275242
2
3
4
# Lua脚本实现
在自动续期里面,Lua脚本主要负责:
- 判断自己的锁是否存在(hexists),如果存在则续期(重置过期时间)
if redis.call('hexists', KEYS[1], ARGV[1])==1
then
return redis.call('expire', KEYS[1], ARGV[2])
else
return 0
end
2
3
4
5
6
# 封装自动续期代码方法
在自定义分布式锁类(DistributedRedisLock)里面封装一个我们的自动续期方法,代码如下。
这里为什么我们只设置了延迟时间,没有设置间隔时间,这是因为在判断Lua脚本返回值以及关闭定时器之间存在一些复杂性,不好去执行,所以为了灵活,我们直接让定时器变为一次性执行,加入续期成功了,重调自身方法,再次执行定时器即可。
/**
* 自动续期方法
* 大致逻辑:
* 隔三分之一时间,执行lua脚本,一次性执行
* 锁存在,续期
* 锁不存在,取消定时任务
*/
private void renewExpire() {
//导入Lua脚本
String script = "if redis.call('hexists', KEYS[1], ARGV[1])==1 " +
"then " +
" return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if (redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), getThreadId(), String.valueOf(expire))) {
renewExpire();
}
}
}, expire * 1000 / 3);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
然后我们在加锁的方法里,在加锁成功后,调用定时器方法:
/**
* 我们需要重点编写tryLock有参方法
*
* @param time the maximum time to wait for the lock
* @param unit the time unit of the {@code time} argument
* @return
* @throws InterruptedException
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
//若没有传入时间,则使用默认时间
if (time != -1) {
this.expire = unit.toSeconds(time);
}
//Lua脚本字符串导入
String script = "if redis.call('exists',KEYS[1])==0 or redis.call('hexists',KEYS[1],ARGV[1])==1 " +
"then " +
" redis.call('hincrby',KEYS[1],ARGV[1],1) " +
" redis.call('expire',KEYS[1],ARGV[2]) " +
" return 1 " +
"else " +
" return 0 " +
"end";
while (!redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), getThreadId(), String.valueOf(expire))) {
Thread.sleep(50);
}
//加锁成功,执行定时任务
renewExpire();
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 代码问题
以上代码的思路是正确的,但是通过测试我们发现并没有对锁进行续期。
出现这个问题,关键点在于线程id问题,回到我们的定时器代码,我们在执行Lua脚本的时候,传入的唯一标识是我们自定义的getThreadId()方法,就是uuid加上当前线程的id,这就是出问题的地方。
因为在这种定时任务中,它会另外开辟一个线程池去执行这个定时任务,所以在定时器的线程里获取到的线程id不是主线程的线程id。
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if (redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), getThreadId(), String.valueOf(expire))) {
renewExpire();
}
}
}, expire * 1000 / 3);
2
3
4
5
6
7
8
# 解决线程id不一致导致的问题
通过上述分析,我们知道问题出在获取线程id的方法上,为了确保子线程用的也是主线程的id,我们需要把主线程的id放入上一级中,以供下级使用。
/**
* 获取线程id并且拼接uuid
*
* @return
*/
String getThreadId() {
return uuid + ":" + Thread.currentThread().getId();
}
2
3
4
5
6
7
8
我们直接在构造方法中,直接获取到线程id,然后拼接赋值给uuid变量,这样我们直接使用uuid就行了,并且效果是一样的。
/**
* 自定义构造方法,为了使用工厂方法外部注入redisTemplate,还有外部传入锁名称
*
* @param redisTemplate
* @param lockName
* @param uuid
*/
public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {
this.redisTemplate = redisTemplate;
this.lockName = lockName;
this.uuid = uuid + ":" + Thread.currentThread().getId();
}
2
3
4
5
6
7
8
9
10
11
12
再把定时任务中的获取id方法替换掉,之前的加锁解锁也需要替换。
/**
* 自动续期方法
* 大致逻辑:
* 每隔三分之一时间,执行lua脚本
* 锁存在,续期
* 锁不存在,取消定时任务
*/
private void renewExpire() {
//导入Lua脚本
String script = "if redis.call('hexists', KEYS[1], ARGV[1])==1 " +
"then " +
" return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if (redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {
renewExpire();
}
}
}, expire * 1000 / 3);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24