Redisson——Semaphore信号量

# 信号量

Redisson的信号量和JUC中的信号量没有太大的差别,只是JUC的信号量是单机的,Redisson的信号量是分布式的。

Redisson信号量的作用:

  • 限流

# JUC信号量限流

public static void main(String[] args) {
        //模拟场景:3个停车位,6辆车
        //3个资源
        Semaphore semaphore = new Semaphore(3);
        //6辆车
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "抢到了停车位");
                    TimeUnit.SECONDS.sleep(new Random().nextInt(10));
                    System.out.println(Thread.currentThread().getName() + "停了一会,开走了!");
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, i + "号车").start();
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0号车抢到了停车位
2号车抢到了停车位
1号车抢到了停车位
1号车停了一会,开走了!
4号车抢到了停车位
4号车停了一会,开走了!
3号车抢到了停车位
2号车停了一会,开走了!
0号车停了一会,开走了!
5号车抢到了停车位
3号车停了一会,开走了!
5号车停了一会,开走了!
1
2
3
4
5
6
7
8
9
10
11
12

# 分布式信号量

假如我们在多服务下仍然使用JUC的信息量去限流,会失效。即使我们把Semaphore初始化到外部,也解决不了服务与服务之间的并发问题。

我们需要使用Redisson的信号量:

public void testSemaphore() {
        RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
        semaphore.trySetPermits(3);
        try {
            semaphore.acquire();
            redisTemplate.opsForList().rightPush("log", "10086获取了资源,开始处理业务逻辑。" + Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(new Random().nextInt(10));
            redisTemplate.opsForList().rightPush("log", "10086处理完了业务逻辑,释放资源===========" + Thread.currentThread().getName());
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13

# 可过期信号量

基于Redis的Redisson可过期性信号量(PermitExpirableSemaphore (opens new window))是在RSemaphore对象的基础上,为每个信号增加了一个过期时间。每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放。它提供了异步(Async) (opens new window)反射式(Reactive) (opens new window)RxJava2标准 (opens new window)的接口。

RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);
1
2
3
4
5
6