学习自:【黑马程序员Redis入门到实战教程,深度透析redis底层原理+redis分布式锁+企业解决方案+黑马点评实战项目】https://www.bilibili.com/video/BV1cr4y1671t?p=67&vd_source=9fed3cefc266aa5b3895aaab6e6214f5

本文是基于上述视频教程和视频配套文档的文字总结,包含了以下几方面内容:

  • 基于Redis实现验证码登录
  • Redis缓存、缓存更新策略、缓存穿透、雪崩、击穿问题
  • 典型的秒杀场景、超卖问题及解决方案
  • Redis分布式锁、误删问题、原子性问题、lua脚本
  • Redisson分布式锁,包含其可重入、可重试、自动续期以及MutiLock的原理以及源码的分析

基于Redis实现验证码登录

基于session实现登录

过去我们使用基于session的验证码登录。缺点是session在集群环境下不能使用,因为多台tomcat服务器并不能共享session数据。由此产生了基于redis的登录流程。为了读者理解,先简单介绍一下基于session的登录。

登录的流程

  • 发送验证码

用户输入手机号发起请求,服务端校验手机号格式是否正确,然后生成验证码保存到服务端自己的session中,同时发送给用户。

  • 短信验证码登录、注册

用户输入手机号和验证码,服务端从session中取出验证码进行比对,通过后从数据库中查找用户信息。如果用户不存在,直接进行注册。然后将用户信息写入自己的session,返回成功信息给用户。

  • 校验登录状态

用户每次请求的时候,都会携带cookie,服务端从cookie中取出JsessionID,根据JessionID查找自己服务器上的session。如果没有session信息,则进行拦截;如果有session信息,将用户信息写入ThreadLocal中,然后放行。

session共享问题

通过以上分析,我们发现,服务器每次在保存session的时候,只会保存到自己服务器的环境中,并不会保存到其他tomcat服务器上。那么,如果在tomcat服务器集群环境下,用户第一次请求1号tomcat服务器,1号服务器将session保存到自己服务器上。当用户第二次请求的时候,他的请求被nginx分配到了2号tomcat服务器上,2号服务器上并没有用户的session信息,则会进行拦截。问题就出现了。

如何解决呢?早期的方案是,一台tomcat服务器在保存session信息时,同时同步给其他所有tomcat服务器。

但是这种方案有两大问题:

  1. 每台tocat服务器都有完整session信息,服务器压力过大
  2. session拷贝数据时,可能会出现延时

而又因为redis本身就是共享的,又是基于内存的,同时也是key、value结构,所以可以采用redis替代session的解决方案。

Redis替代Session

我们要考虑redis的key的结构和key具体的设计细节。

  1. key的结构

采用string数据类型。string:key为string,value使用json字符串保存,比较直观。

key value
heima:user:1 {name:”jack”,age:21}

采用hash数据类型。可以将对象中的每个字段单独存储,可以针对单个字段做CRUD,并且占用内存更少。

key value
filed value
heima:user:1 name jack
age 21
  1. key的具体细节
  • 唯一性
  • 方便性

一般采用随机生成的token作为key就够了。

  1. 访问流程

当用户提交手机号和验证码后,后台校验手机号和验证码,通过后,根据手机号查询用户信息,存在的话将用户信息保存到redis中,并生成token作为redis的key,并且将token返回给用户。当用户携带token访问后台的时候,后台取出用户携带的token,根据token访问对应的redis数据,判断用户信息是否存在,如果存在则写入ThreadLocal,然后放行。

  1. 拦截器的配置与优化

在配置拦截器的时候,要手动刷新token的过期时间。但是这样还不够,因为如果用户一直访问拦截器拦截之外的路径,例如主页面的店铺信息,就不会进行token的刷新,token到期后用户就会被剔除。

解决办法:在加一层拦截器,拦截所有的路径,在这个拦截器中获取用户信息并刷新token过期时间。在第二个拦截前中,只判断ThreadLocal中是否包含用户信息,没有则拦截,有的话则放行。

温馨小贴士

tomcat的运行原理

tomcat启动之后,就会有一个监听线程时刻监听8080端口。当用户向tomcat运行的端口发起请求,监听线程就会和用户端创建socket连接。socket都是成对出现的,用户通过socket向tomcat发送数据,tomcat也通过socket向用户返回数据。当tomcat端的socket接收到数据后,监听线程会从tomcat的工作线程池中取出一个线程执行用户的请求。该线程基于用户的请求,访问controller、service、dao层,并且访问对应的DB,用户执行完请求之后,再统一返回,再找到tomcat的socket连接,将数据返回给用户端的socket。

关于threadlocal

在ThreadLocal的源码中,无论是get方法,还是set方法,ThreadLocal都会先获取当前用户的线程,然后从当前线程中取出ThreadLocalMap类型的map成员变量。只要线程不一样,取出的map变量就不一样,通过这种方式就实现了线程的隔离。

因此,基于以上我们可知,用户发出的每次请求都在tomcat取出的一个线程中完成,而且用户的每次请求都是独立的,每次请求都会取出一个线程,请求完之后线程进行回收。而ThreadLocal恰好也是每次都从当前线程中取出变量,将数据和线程做了捆绑。因此,当用户访问我们的工程时,就可以通过ThreadLocal来做到线程隔离,通过ThreadLocal做到每个线程操作自己的数据,每个线程都有自己的变量副本。

查询中的缓存

基本概念

缓存(Cache),就是数据交换的缓冲区,一般从数据库中获取,存储于本地代码。

缓存存储于代码中,而代码运行在内存中,内存的读写性能远高于磁盘,缓存可以大大降低用户访问并发量带来的服务器读写压力。

但是缓存也会增加代码复杂度和运营的成本。

缓存的好处:降低后端负载、提高读写效率,降低响应时间;

缓存的成本:数据一致性成本;代码维护成本;运维成本。

在springboot框架中,我们一般采用Redis缓存来降低数据库压力。

缓存的作用模型和思路

加入缓存的代码如下:

缓存更新策略

缓存更新是redis为了节省内存而设计出来的。我们知道,内存数据宝贵,当我们向redis插入太多数据的时候,内存占用增加,此时就需要更新redis缓存来降低内存的占用。有三种策略。

内存淘汰:redis自动进行。当数据占用内存达到redis设置的maxmemory时,自动淘汰一些不太重要的数据。

默认淘汰策略为noeviction,其他策略有:

  • allkeys-lru:从所有 key 中使用 LRU 算法进行淘汰。
  • volatile-lru:从设置了过期时间的 key 中使用 LRU 算法进行淘汰。
  • allkeys-random:从所有 key 中随机淘汰数据。
  • volatile-random:从设置了过期时间的 key 中随机淘汰。
  • volatile-ttl:在设置了过期时间的 key 中,根据 key 的过期时间进行淘汰,越早过期的越优先被淘汰。

超时剔除:我们给redis数据设置了ttl时间后,redis会自动剔除ttl到期的数据。

主动更新:手动干预。在编写业务逻辑时,在修改数据库的同时更新缓存。

主动更新策略又包括三种具体的方案:

  1. Cache Aside Pattern:由缓存的调用者,在更新数据库的同时,更新缓存。又称之为双写。
  2. Read/Write Through Pattern:缓存与数据库整合成一个服务,由服务来维持一致性,调用者无需关心缓存一致性问题。
  3. Write Behind Caching Pattern:调用者只操作缓存,其他线程异步的将缓存持久化到数据库。

在业务场景为高一致性需求的情况下,我们一般选用主动更新策略,并以超时作为兜底方案。

采用哪种策略?

如果出现数据库和缓存不一致的问题,选用主动更新的哪种策略呢?

综合考虑我们使用方案一Cache Aside Pattern方案。

但是在操作数据库和缓存的同时,需要考虑几个问题。

  • 是更新缓存,还是删除缓存?

删除缓存。如果每次操作数据库后,我们都进行缓存的更新,如果中间没有人查询缓存,那么这个缓存更新实际上只有最后一次的更新有效,这样就会多出无效的更新缓存操作。所有我们可以把缓存删除,等有人再来查询时,加载缓存中的数据。

  • 如何保证缓存和数据库的操作同时成功或者失败?

单体系统利用事务操作。分布式系统利用TCC等分布式事务方案。

  • 先操作缓存还是先操作数据库?

先操作数据库,再操作缓存。

原因如下:先操作数据库,再删除缓存造成的数据不一致概率更低。

如果是先删除缓存,再操作数据库的情况。假设缓存和数据库的初始值都为10。此时线程1进来,进行更新数据的操作。线程1先删除缓存10,再更新数据库的值为20。再这个间隙中,线程2恰好进来,进行查询的操作。线程2查询缓存,未命中,然后查询数据库得到10,并且将查询到的数据10写入缓存。在线程2写入缓存完成之后,线程1才完成对数据库的更新,数据库中的值变为20。这就造成了数据库和缓存中的数据的不一致性问题。因为线程2查询缓存、写入缓存的速度比线程1更新数据库的动作要快很多,因此数据不一致问题发生的概率很大。

如果是先操作数据库,再删除缓存的情况。必须要在缓存为空的前提下才会发生数据不一致性问题。假设数据库和缓存的值都为10。线程1进来,进行查询操作,查询缓存未命中,则去查数据库,得到值为10,然后写入缓存。在线程1查询数据库和写入缓存的间隙中,此时线程2进来,进行数据更新的操作。线程2更新数据库的值为20,然后删除缓存,线程2操作完成之后,此时线程1才执行写入缓存10的操作。此时数据库的值为20,缓存的值为10,造成了数据的不一致性问题。而线程1写入缓存10这个操作的速度比线程2更新数据库再删除缓存的速度快很多,因此这种情况下数据不一致问题发生的概率很低。

代码示例

根据以上的缓存更新策略。完成以下的需求:

  • 根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间

  • 根据id修改店铺时,先修改数据库,再删除缓存

代码如下:

缓存更新策略的最佳实践方案

我们来小结一下。根据不同的需求,采取不同的缓存更新策略。

  1. 低一致性需求:使用Redis自带的内存淘汰机制
  2. 高一致性需求:主动更新,并以超时剔除作为兜底方案
    • 读操作:缓存命中则直接返回缓存未命中则查询数据库,并写入缓存,设定超时时间
    • 写操作:先写数据库,然后再删除缓存要确保数据库与缓存操作的原子性

缓存穿透问题

缓存穿透是指用户请求数据时,数据库和缓存中都没有数据的存在,这样缓存永远不会生效,请求直接达到数据库中。

常见的缓存穿透解决方案有两种。

  1. 缓存空对象

    • 优点:实现简单,维护方便

    • 缺点

      • 有额外的内存消耗

      • 可能造成数据短期的不一致

  2. 布隆过滤器

    • 优点:没有额外的内存消耗,没有多余的key
    • 缺点:
      • 实现复杂
      • 会有误判率(可能存在哈希冲突)

缓存空对象分析:

就是当用户请求数据库不存在的数据时,将空值写入redis,并设置过期时间,避免对数据库的超负荷访问

布隆过滤分析:

布隆过滤实际上一个庞大的二进制数组,采用哈希思想来判断请求数据是否在redis中存在。如果存在,则放行,访问redis,哪怕redis中的数据过期了,数据库中也存在,则访问数据库,写入redis,然后返回。如果不存在,则直接拒绝访问。(相当于在用户请求redis中加了一层,用户先访问布隆过滤器,再访问redis)

图解如下:

小结:

缓存穿透的原因?(略)

缓存穿透的解决方案?

  • 缓存null值
  • 布隆过滤
  • 增加id的复杂性,避免被猜测出id的规律
  • 做好数据的基础格式校验
  • 加强用户权限校验
  • 做好热点参数的限流

缓存雪崩问题

缓存雪崩是指在同一时间段大量的缓存key同时时效或者redis服务宕机,导致大量的请求打到数据库,带来巨大压力。

解决方案:

  • 给不同的key添加随机的ttl值
  • 利用redis集群提高服务的可用性
  • 添加多级缓存
  • 给缓存业务添加降级或者限流策略

缓存击穿问题

缓存击穿问题也叫热点key问题,就是一个被高并发访问缓存重建业务较复杂的key突然失效了,无数的请求瞬间给数据库带来巨大压力。

常见的解决方案有两种:

  • 互斥锁
  • 逻辑过期

互斥锁方案

当线程1未命中缓存之后,获取互斥锁,然后进行查询数据库以及缓存的重建。这段时间中,线程2也来查询,缓存未命中,也尝试获取互斥锁,但是获取互斥锁失败,休眠一会再重试。直到线程1完成缓存的重建之后释放锁,线程2才能来访问。此时,线程2缓存命中,直接返回数据。

逻辑过期方案

方案分析:我们之所以会出现缓存击穿的问题,就是因为我们对key设置了过期时间,假设我们不设置过期时间,其实就不会有缓存击穿问题。但是数据会一直占用内存,我们可以设置逻辑过期时间来解决。

逻辑过期时间方案分析:

我们写入缓存数据的时候,在redis的value中设置expire逻辑过期时间。假设线程1请求过来,查询缓存,判断逻辑过期时间是否过期,如果发现已经过期,则尝试获取互斥锁,开启一个独立线程2,由独立线程去完成缓存的重建。线程1直接返回过期的数据。此时线程3也过来请求,发现缓存中的逻辑过期时间已经过期,则尝试获取互斥锁,获取锁失败(因为此时线程2还正在重建缓存,没有释放锁),直接返回过期数据。直到独立线程2重建缓存完成之后释放锁。此时后面的线程再来访问得到的就是新数据了。

注意:逻辑过期时间并不直接作用于redis,不同于设置ttl

两个方案的对比

互斥锁方案:因为实现了互斥性,所以保证了数据的一致,且实现简单。也没有其他的事情需要做,没有额外的内存消耗。缺点在于可能会发生死锁,且只能串行执行,性能有所影响。

逻辑过期时间方案:线程读取数据的过程不需要等待,性能好。额外维护逻辑过期时间,有额外的内存消耗。有一个独立线程进行缓存重建,但在重构数据完成前,其他线程只能返回旧数据,出现了数据的不一致性。实现起来麻烦。

优缺点如下图所示。

代码实现:利用互斥锁解决缓存击穿问题

实现互斥锁的代码:

核心思想是利用redis的setnx方法来表示获取锁。如果返回true,则代码这个key不存在,表示获取锁成功。如果返回false,说明已经有人对这个key进行了设置,获取锁失败。

1
2
3
4
5
6
7
8
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}

private void unlock(String key) {
stringRedisTemplate.delete(key);
}

主要业务流程的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public Shop queryWithMutex(Long id)  {
String key = CACHE_SHOP_KEY + id;
// 1、从redis中查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get("key");
// 2、判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 存在,直接返回
return JSONUtil.toBean(shopJson, Shop.class);
}
//判断命中的值是否是空值
if (shopJson != null) {
//返回一个错误信息
return null;
}
// 4.实现缓存重构
//4.1 获取互斥锁
String lockKey = "lock:shop:" + id;
Shop shop = null;
try {
boolean isLock = tryLock(lockKey);
// 4.2 判断否获取成功
if(!isLock){
//4.3 失败,则休眠重试
Thread.sleep(50);
return queryWithMutex(id);
}
//4.4 成功,根据id查询数据库
shop = getById(id);
// 5.不存在,返回错误
if(shop == null){
//将空值写入redis
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
//返回错误信息
return null;
}
//6.写入redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_NULL_TTL,TimeUnit.MINUTES);

}catch (Exception e){
throw new RuntimeException(e);
}
finally {
//7.释放互斥锁
unlock(lockKey);
}
return shop;
}

代码实现:利用逻辑过期方案解决缓存击穿问题

需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题

具体的业务流程图如下:

注意:因为我们没有给缓存设置ttl,理论上所有请求的数据在redis中都存在,如果不存在,则认为是该数据是真的不存在。(例如商家上线了一个活动,用户访问活动之外的商品。或者活动已经过期,缓存数据已经被清理,用户还去访问之前活动中的商品)

步骤一:封装逻辑过期时间的数据

现在要在redis存储的数据中带上逻辑过期时间,那么你可以修改原来的实体类加上过期时间。但是这种做法对原来代码有入侵性,不好。因此,我们新建一个实体类,封装原来数据和过期时间。

1
2
3
4
5
@Data
public class RedisData {
private LocalDateTime expireTime; // 逻辑过期时间
private Object data;
}

步骤二:业务代码

#ShopServiceImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public Shop queryWithLogicalExpire( Long id ) {
String key = CACHE_SHOP_KEY + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isBlank(json)) {
// 3.存在,直接返回
return null;
}
// 4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断是否过期
if(expireTime.isAfter(LocalDateTime.now())) {
// 5.1.未过期,直接返回店铺信息
return shop;
}
// 5.2.已过期,需要缓存重建
// 6.缓存重建
// 6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 6.2.判断是否获取锁成功
if (isLock){
CACHE_REBUILD_EXECUTOR.submit( ()->{

try{
//重建缓存
this.saveShop2Redis(id,20L);
}catch (Exception e){
throw new RuntimeException(e);
}finally {
unlock(lockKey);
}
});
}
// 6.4.返回过期的商铺信息
return shop;
}

/**
* 重建缓存
* @param id 商铺id
* @param expireSeconds 逻辑过期时间
*/
public void saveShop2Redis(Long id, Long expireSeconds) throws InterruptedException {
// 1.从数据库查询店铺信息
Shop shop = getById(id);

// 模拟缓存重建的延时
Thread.sleep(2000);

// 2.封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));

// 3.写入redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,JSONUtil.toJsonStr(redisData));
}

步骤三:

因为在该方案中,要保证redis中包含了所有的热点数据。因此,需要先进行缓存的预热。在单元测试中进行缓存预热。

#src/test/java/com/hmdp/HmDianPingApplicationTests.java

1
2
3
4
5
6
7
8
9
10
/**
* 对所有的商铺数据进行缓存预热
*/
@Test
void testSaveShop2Redis() throws InterruptedException {
for (long i = 1; i < 15; i++) {
Shop shop = shopService.getById(i);
cacheClient.setWithLogicalExpire(CACHE_SHOP_KEY + i,shop,10L, TimeUnit.SECONDS);
}
}

然后就可以重启服务,进行测试。

封装redis的工具类

每次解决缓存问题都需要自己一个一个编写,太麻烦了。因此,我们基于StringRedisTemplate封装一个缓存工具类,方便以后我们的调用。缓存工具类满足以下要求:

  • 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
  • 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓

存击穿问题

  • 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
  • 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
  • 方法5:根据指定的key查询缓存,并反序列化为指定类型,需要利用互斥锁解决缓存击穿问题

封装的工具类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@Slf4j
@Component
public class CacheClient {

private final StringRedisTemplate stringRedisTemplate;

private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

public CacheClient(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

public void set(String key, Object value, Long time, TimeUnit unit) {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
}

public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {
// 设置逻辑过期
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
// 写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}

public <R,ID> R queryWithPassThrough(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit){
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isNotBlank(json)) {
// 3.存在,直接返回
return JSONUtil.toBean(json, type);
}
// 判断命中的是否是空值
if (json != null) {
// 返回一个错误信息
return null;
}

// 4.不存在,根据id查询数据库
R r = dbFallback.apply(id);
// 5.不存在,返回错误
if (r == null) {
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return null;
}
// 6.存在,写入redis
this.set(key, r, time, unit);
return r;
}

public <R, ID> R queryWithLogicalExpire(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isBlank(json)) {
// 3.存在,直接返回
return null;
}
// 4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断是否过期
if(expireTime.isAfter(LocalDateTime.now())) {
// 5.1.未过期,直接返回店铺信息
return r;
}
// 5.2.已过期,需要缓存重建
// 6.缓存重建
// 6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 6.2.判断是否获取锁成功
if (isLock){
// 6.3.成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
// 查询数据库
R newR = dbFallback.apply(id);
// 重建缓存
this.setWithLogicalExpire(key, newR, time, unit);
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
// 释放锁
unlock(lockKey);
}
});
}
// 6.4.返回过期的商铺信息
return r;
}

public <R, ID> R queryWithMutex(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 3.存在,直接返回
return JSONUtil.toBean(shopJson, type);
}
// 判断命中的是否是空值
if (shopJson != null) {
// 返回一个错误信息
return null;
}

// 4.实现缓存重建
// 4.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
R r = null;
try {
boolean isLock = tryLock(lockKey);
// 4.2.判断是否获取成功
if (!isLock) {
// 4.3.获取锁失败,休眠并重试
Thread.sleep(50);
return queryWithMutex(keyPrefix, id, type, dbFallback, time, unit);
}
// 4.4.获取锁成功,根据id查询数据库
r = dbFallback.apply(id);
// 5.不存在,返回错误
if (r == null) {
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return null;
}
// 6.存在,写入redis
this.set(key, r, time, unit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
// 7.释放锁
unlock(lockKey);
}
// 8.返回
return r;
}

private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}

private void unlock(String key) {
stringRedisTemplate.delete(key);
}
}

在ShopServiceImpl中,利用工具类中的方法解决缓存穿透和缓存击穿问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Resource
private CacheClient cacheClient;

@Override
public Result queryById(Long id) {
// 解决缓存穿透
Shop shop = cacheClient
.queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);

// 互斥锁解决缓存击穿
// Shop shop = cacheClient
// .queryWithMutex(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);

// 逻辑过期解决缓存击穿
// Shop shop = cacheClient
// .queryWithLogicalExpire(CACHE_SHOP_KEY, id, Shop.class, this::getById, 20L, TimeUnit.SECONDS);

if (shop == null) {
return Result.fail("店铺不存在!");
}
// 7.返回
return Result.ok(shop);
}

优惠券的秒杀

全局唯一ID

全局唯一ID的必要性?

以店铺发布优惠券,用户抢购优惠券,将信息保存到订单表中这个业务场景为例。

如果订单表还使用数据库自增的ID,就存在一些问题:

  • id的规律太明显了
  • 受单表数据量的限制

通常情况下,msyql单表数据量不能超过500w,超出之后就要分库分表,但是拆分之后,这些表逻辑上来说是一张表,id不能一样,于是我们就需要保证id的唯一性

全局ID生成器,是一种在分布式系统下用来生成全局ID的工具,一般要满足以下特性:

  • 唯一性
  • 高可用
  • 高性能
  • 递增性
  • 安全性

如果直接使用redis自增的数值,id的规律太明显了,安全性低,因此要拼接一些其他信息:

ID的组成部分:

  • 符号位:1bit,永远为0
  • 时间戳:31bit,以秒为单位,2^31秒≈68年
  • 序列号:32bit,秒内的计数器,支持每秒生成2^32个不同的id

Redis实现全局唯一Id

代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Component
public class RedisIdWorker {
/**
* 开始时间戳
*/
private static final long BEGIN_TIMESTAMP = 1640995200L;
/**
* 序列号的位数
*/
private static final int COUNT_BITS = 32;

private StringRedisTemplate stringRedisTemplate;

public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

public long nextId(String keyPrefix) {
// 1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;

// 2.生成序列号
// 2.1.获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 生成序列号对应的key
String orderKey = "icr:" + keyPrefix + ":" + date;
// 2.2.自增长
long count = stringRedisTemplate.opsForValue().increment(orderKey);
// 3.拼接并返回
return timestamp << COUNT_BITS | count;
}

// 生成开始时间戳
// public static void main(String[] args) {
// LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
// long second = time.toEpochSecond(ZoneOffset.UTC);
// System.out.println(second);
// }
}

测试类代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private ExecutorService es = Executors.newFixedThreadPool(500);

@Test
void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);

Runnable task = () -> {
for (int i = 0; i < 100; i++) {
long id = redisIdWorker.nextId("order");
System.out.println("id = " + id);
}
latch.countDown();
};
long begin = System.currentTimeMillis();
for (int i = 0; i < 300; i++) {
es.submit(task);
}
latch.await();
long end = System.currentTimeMillis();
System.out.println("time = " + (end - begin));
}

小贴士:关于countDownLatch

countDownLatch名为信号枪:主要的作用是同步协调多线程的等待与唤醒问题(允许一个或多个线程等待直到其他线程执行的一组操作完成的同步辅助)

给定计数初始化CountDownLatch,所调用的await方法阻塞,直到当前计数达到0。每调用一次countDown方法,计数减一。当计数为0的时候,所有等待的线程被释放和任何后续调用的await立刻返回。

秒杀下单和库存超卖问题

秒杀下单是基本的业务逻辑,比较简单。秒杀下单的流程图如下:

代码实现如下:

#VoucherOrderServiceImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
//5,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update();
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}
//6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 6.2.用户id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
// 6.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

return Result.ok(orderId);

}

这里我们就要注意了,在判断是否超卖的代码中,我们是这样写的:

1
2
3
4
5
6
7
8
9
10
11
12
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
//5,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update();
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}

仔细观察,会发现它存在问题。假设一个场景,线程1来查询库存,判断库存充足之后,准备扣减库存,在线程1还没来及扣减库存时,线程2进来了。线程2也查询库存,此时因为线程1还没有来得及扣减库存,那么线程2也发现库存充足,也去扣减库存。两个线程同时扣减库存,就会出现库存超卖的问题。

超卖问题是典型的线程安全问题,针对这一问题的常见解决方案是加锁。而对于加锁,我们有两种方案:

悲观锁 乐观锁
认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。
例如Synchronized、Lock都属于悲观锁
认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其他线程对数据做了修改。
如果没有修改则认为是安全的,自己才更新数据。
如果已经被其他线程修改则说明发生了线程安全问题,则重试或者抛出异常。
  • 悲观锁:悲观锁中又可以细分为公平锁、非公平锁、可重入锁等

  • 乐观锁:会有一个版本号,每次操作数据的时候,判断版本号是不是比上次记录的大了1,如果是则执行修改操作。如果不是,说明数据已经被修改过了。

  • 乐观锁的常见实现方式有两种

    • 版本号法

    • CAS法

      • 它涉及三个操作数:

        V:变量当前的内存值

        E:变量的预期值

        U:新值

        CAS的操作过程如下:

        1. 读取变量的当前值V
        2. 如果变量的当前值V等于预期值E,则将变量的值更新为U
        3. 如果不等,则不进行任何操作,并返回当前值

使用乐观锁解决超卖问题

方案一

VoucherOrderServiceImpl在扣减库存时改为:

1
2
3
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1") //set stock = stock -1
.eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update(); //where id = ? and stock = ?

核心是,在修改库存的时候判断现在的库存和上次查询出的库存是否一致。如果一致,说明没有人在中间修改过库存,是线程安全的。但是这样会有个问题:如果100个线程同时来查询库存并进行扣减,第1个线程首先进行扣减库存的操作,因为它是第一个进来的,没有人修改库存,则线程1成功扣减库存。那么后面99个线程来扣减库存的时候,都会发现现在的库存和它们上次查询出的库存不一致,扣减库存会失败。这样一来,成功扣减库存的概率太低。

方案二

只需要改成stock > 0,即可解决以上的问题。

1
2
3
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update().gt("stock",0); //where id = ? and stock > 0

优惠券秒杀-一人一单

优惠券的目的是为了引流,应该确保一人只能抢购一单,目前的情况是可以无限制购买,因此要修改业务逻辑,确保一人只能抢购某种优惠券的一个。流程图如下:

#VoucherOrderServiceImpl的初始代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
// 5.一人一单逻辑
// 5.1.用户id
Long userId = UserHolder.getUser().getId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

//6,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update();
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}
//7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);

voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

return Result.ok(orderId);

}

我们注意到,又出现了并发安全问题。当多个线程并发过来,查询数据库,发现订单都不存在,都会去创建订单。所以还是需要加锁,而乐观锁比较适合更新数据,现在是插入数据,我们使用悲观锁。

我们先把从一人一单到创建订单的逻辑抽离出一个方法createVocuherOrder。为了确保线程安全,在方法上加了一把sychronized锁。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {

Long userId = UserHolder.getUser().getId();
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

// 7.返回订单id
return Result.ok(orderId);
}

但是这样添加锁,锁的粒度太粗了。sychronized锁加在方法上,使用的是当前实例对象作为锁,这就意味着同一时间只能有一个线程进入该方法。即便是不同的用户操作抢购不同的优惠券,也会因为锁住的是同一个实例而串行执行,并发性能差。因此要修改锁的粒度。

我们的业务是一人一单,因此只要将锁的粒度改为用户级别就可以了。将加锁处的代码改为:

1
synchronized(userId.toString().intern())

这个代码会userId对象加锁。其中,intern确保相同userId返回同一个字符串对象。

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
synchronized(userId.toString().intern()){
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

// 7.返回订单id
return Result.ok(orderId);
}
}

但是以上代码还存在问题,细心的同学会发现,代码中锁的释放在事务提交之前,这样就会出现问题。例如线程1释放锁之后,还未提交事务,这时线程2进来,读取线程1未提交的数据例如库存,就会造成超卖。

在事务中,所有的数据库操作都会先记录在事务日志中,等事务提交后才真正写入数据库。

因此,我们需要将锁的范围扩大到事务提交之后才释放。在seckillVoucher 方法中,添加以下逻辑,

1
2
3
4
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
return this.createVoucherOrder(voucherId);
}

但是以上做法仍有问题,因为事务想要生效,必须利用代理生效,所以这个地方,我们要先获取原始的事务对象,再操作事务:

1
2
3
4
5
synchronized (userId.toString().intern()) {
// 获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}

至此,我们就解决了一人一单的问题。但是这是在单机环境下,在集群环境下,仍有问题。

集群环境下的并发问题

我们利用idea启动两份服务。然后前端nginx服务器中配置负载均衡。如图:

这个时候,我们利用postman进行测试。创建两个相同抢购订单的接口,执行之后会发现一个用户依旧抢了两单。这是为什么呢?我们不是已经加锁了吗?

有关锁失效原因分析

这是因为我们启动了两个tomcat,每个tomcat都有自己的jvm,每个jvm有自己的锁监视器。例如线程1对第一台tomcat服务器发起请求,此时请求的是jvm1,线程1尝试获取锁,获取锁成功,jvm1的锁监视器就知道此时syn这把锁被线程1持有。后面再有线程进来,都会获取锁失败。但是如果此时线程3进来,它请求的不是第一个tomcat服务器,而是第二个tomcat服务器,那么不同的tomcat有自己的jvm,有自己的锁监视器。此时jvm2的锁监视器中没有人获取锁,线程3就会获取锁成功,造成一人两单的问题。

这就是集群环境下syn锁失效的原因,我们需要使用分布式锁来解决这个问题。

分布式锁

基本原理与实现方式对比

分布式锁概念:满足分布式系统或集群环境下多进程可见并且互斥的锁。

分布式锁的核心思路就是让大家使用同一把锁(只要一个锁监视器),这样就能锁住线程,让程序串行执行。

分布式锁满足的条件:

  • 多进程可见(不同的jvm实例中的多个进程可见,多个进程能感知到变化)
  • 互斥
  • 高可用
  • 高性能
  • 安全性

分布式锁的实现方式:

  • MySQL
  • Redis
  • Zookeeper

Redis分布式锁的核心实现思路

实现分布式锁时需要两个方法:

  • 获取锁
    • 互斥:确保只要一个线程能获取锁
    • 非阻塞:尝试一次,成功返回true,失败返回false
  • 释放锁
    • 手动释放
    • 超时释放:获取锁时添加一个超时时间

核心思路:

我们利用sexnx命令,当有多个线程进入时,调用该方法,只有第一个进来的线程setnx调用成功返回true,然后线程1执行业务逻辑,释放锁。其他线程都返回false,等待后重试,直到锁被释放,后面的线程才去执行任务。

代码:实现分布式锁

定义锁接口

1
2
3
4
5
6
7
8
9
10
11
public interface Ilock{
/**
* 尝试获取锁
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功,false代表失败
boolean tryLock(long timeoutSec);

/**
释放锁逻辑*/
void unLock();
}

实现加锁释放锁的逻辑

#SimpleRedisLock.java

加锁逻辑:

1
2
3
4
5
6
7
8
9
10
private static final String KEY_PREFIX="lock:"
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = Thread.currentThread().getId()
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}

释放锁逻辑:

1
2
3
4
public void unlock() {
//通过del删除锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}

修改业务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
//创建锁对象(新增代码)
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
//获取锁对象
boolean isLock = lock.tryLock(1200);
//加锁失败
if (!isLock) {
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
//释放锁
lock.unlock();
}
}

Redis分布式锁的误删

问题:

假设这样一个情况,线程1获取锁,执行业务逻辑,突然出现了阻塞,阻塞过程中,线程1的锁到期释放。此时线程2进来,获取了锁,执行自己的业务逻辑。线程1阻塞完成,准备执行删锁的逻辑,但此时锁是线程2的,就会造成分布式锁误删的问题。

解决方案

在释放锁的代码中,加入判断该锁是不是自己的锁的逻辑。

代码实现

核心逻辑:获取锁时,放入自己的线程标识;释放锁时,判断锁中的线程标识和当前线程的标识是否一致,一致则删除。不一致则不删除。

流程图如下:

#修改加锁的逻辑:

1
2
3
4
5
6
7
8
9
10
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}

为什么锁的标识不直接使用线程id,而是加入了UUID?

线程id只能在单个jvm上唯一,不同机器上的jvm可能有相同的线程id,这样在分布式系统下就无法保证唯一性。而使用UUID保证了跨jvm、跨机器场景下id的唯一性。

#修改释放锁的逻辑:

1
2
3
4
5
6
7
8
9
10
11
public void unlock() {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

测试

修改完代码之后,重启两个工程,重启两个线程。当线程1持有锁之后,手动释放锁。线程2进到锁内部获取锁。此时放行线程1,线程1进入到释放锁逻辑中,发现该锁的value值并不是自己的锁,因此不能释放锁。

到此,我们通过在释放锁代码中加入判断锁是不是自己的逻辑,初步解决了分布式锁误删的问题。

Redis分布式锁的原子性问题

此时分布式锁仍有问题。试想一个极端的情况,线程1进入释放锁的逻辑,假设是线程1的锁,线程1比锁成功,接着线程1准备删除锁,但此时线程1的锁正好到期释放。此时线程2抢先进来,获取锁成功。这时线程1反应过来,继续自己的流程,但是线程1直接执行删锁的逻辑,把线程2的锁给删除了,相当于条件判断并没有起到作用。这就是删锁的原子性问题。

为什么呢?因为我们的拿锁、比锁、删锁并不符合原子性,中间会被其他线程抢进。我们需要借助lua脚本解决这个问题。

lua脚本解决多条命令原子性问题

Redis提供了lua脚本功能,在一个脚本中编写多条redis命令,它能够确保命令执行的原子性。

Redis提供的调用函数如下:

1
redis.call('命令名称', 'key', '其它参数', ...)

例如,我们要执行set name jack,lua脚本是这样:

1
2
# 执行 set name jack
redis.call('set', 'name', 'jack')

例如,我们要先执行set name Rose,再执行get name,则脚本如下:

1
2
3
4
5
6
# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

写好脚本之后,就需要通过Redis调用脚本,命令如下:

1
2
3
4
5
127.0.0.1:6379> help @scripting

EVAL script numkeys key [key ...] arg [arg ...]
summary: Execute a Lua script server side
since: 2.6.0

例如,我们要执行redis.call(‘set’,’name’,’jack’)这个脚本,语法如下:

如果不想脚本中的key、value写死,可以作为参数传递。key类型的参数放在KEYS数组,其他参数放入ARGV数组,在脚本中可以从KEYS和ARGV数组中获取这些参数:

然后我们就可以编写释放锁逻辑的lua脚本了。先梳理一下流程:

  1. 获取线程中的锁的标识
  2. 判断是否与当前线程标识一样
  3. 如果一样,释放锁
  4. 不一样,什么都不做

其lua脚本如下:

1
2
3
4
5
6
7
8
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

利用Java代码调用lua脚本改造分布式锁

在RedisTempalte中,可以利用execute方法执行lua脚本,execute方法如下:

1
2
3
4
5
6
7
8
/*
* (non-Javadoc)
* @see org.springframework.data.redis.core.RedisOperations#execute(org.springframework.data.redis.core.script.RedisScript, java.util.List, java.lang.Object[])
*/
@Override
public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) {
return scriptExecutor.execute(script, keys, args);
}

execute方法和Redis执行lua脚本命令的参数对应如下:

Java代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}

public void unlock() {
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
// 经过以上代码改造后,我们就能够实现 拿锁比锁删锁的原子性动作了~

测试(略)

小总结

基于Redis实现分布式锁思路:

  • 利用set nx ex获取锁,设置过期时间,保存线程标识
  • 释放锁时先判断线程标识是否与自己一致,一致则删除

特性:

  • 利用set nx满足互斥性
  • 利用set ex保证发生故障时依然能够释放锁,避免死锁
  • 利用Redis集群保证高可用和高并发特性(这里貌似并没有用到)

分布式锁-Redisson

分布式锁-Redisson功能介绍

基于setnx实现的分布式锁存在以下问题:

  • 重入问题:可重入锁是指获得锁的线程可以再次进入相同的获取锁代码块中。但是setnx锁不可重入。可重入锁是为了避免死锁的发生。

死锁发生的案例:

有这么一段获取锁的代码:

1
2
3
4
5
6
7
8
// 获取锁,成功返回true,失败返回false
private boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10L, TimeUnit.SECONDS);
while(!BooleanUtil.isTrue(flag)) {
// 重试获取锁,直到成功
}
return true;
}

有这么一个场景:

1
2
3
4
5
6
7
8
9
10
public void methodA() {
tryLock("order:1");
methodB(); // 调用另一个同步方法
// 执行A的业务
........
}

public void methodB() {
try("order:1");
}

线程进入methodA方法中,尝试获取锁,获取锁成功,然后进入methodB方法中,再次尝试获取同一把锁,因为锁已经被当前线程获取了,会获取锁失败。而methodB方法则会一直等待当前线程释放锁,但这是不可能的,就出现了死锁问题。

  • 不可重试问题:指目前的分布式锁只能重试获取一次。我们认为合理的情况是,当线程获取锁失败后,能不断的重新获取锁。

  • 超时释放问题:设置锁的超时时间,能够防止死锁的发生。但如果我们的业务执行耗时较长,业务还未完成,锁就到期释放了,就会存在一定的安全隐患。

  • 主从一致性问题:如果Redis提供了集群,当我们向集群写入数据的时候,主机需要异步的将数据同步给从机,而如果在主机同步完成之前突然宕机,就会出现死锁问题。(也就是如果主机同步给从机的数据有锁数据,就会造成锁的丢失,从而出现死锁)

而我们的Redisson就可以解决这几个问题。

那什么是Redisson呢?

Redisson是在java基础上实现的驻内存的数据网络(In-Memory Data Grid)。它不仅提供了一系列的分布式java常用对象,还提供了许多分布式服务。其中就包含了各种分布式锁的实现。

官方wiki文档:8. 分布式锁和同步器 · redisson/redisson Wiki

分布式锁-Redisson快速入门

在java中引入redisson中有两种方式,一种是直接引入redisson,一种是通过springboot集成。这里我们采取第一种。

引入依赖:

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>

配置Redisson客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.150.101:6379")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}

如何使用Redission的分布式锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Resource
private RedissionClient redissonClient;

@Test
void testRedisson() throws Exception{
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
//判断获取锁成功
if(isLock){
try{
System.out.println("执行业务");
}finally{
//释放锁
lock.unlock();
}
}
}

在 VoucherOrderServiceImpl,使用redisson替换掉之前的redis自定义锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Resource
private RedissonClient redissonClient;

@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
//创建锁对象 这个代码不用了,因为我们现在要使用分布式锁
//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId);
//获取锁对象
boolean isLock = lock.tryLock();

//加锁失败
if (!isLock) {
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
//释放锁
lock.unlock();
}
}

Redisson分布式锁可重入的原理

原理

Redisson采用hash结构实现锁的可重入,替代了上面我们redis使用的string结构。采用hash结构存储线程id和重入次数。每当线程获取锁的时候,先判断锁释是否存在,如果不存在,则获取锁成功,记录线程标识,重入次数记为1。如果锁存在也不一定说明获取锁失败,如果线程标识和锁中的线程标识一致,说明是同一个线程来获取锁,重入次数+1即可。每当线程释放锁的时候,重入次数-1,直到重入次数为0,才真正的释放锁。

源码解读

通过一步步跟踪以下代码中的tryLock方法来分析可重入锁的原理。

  1. 在tyrLock处,ctrl+alt+B,选择RedissonLock(org.redisson)跟踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
void method1() throws InterruptedException {
boolean isLock = lock.tryLock(1, TimeUnit.SECONDS);
if(!isLock){
log.error("获取锁失败,1");
return;
}
try{
log.info("获取锁成功,1");
method2();
log.info("开始执行业务...1");
}finally {
log.info("释放锁,1");
lock.unlock();
}
}
  1. 进入RedissLock.java的tryLock方法中,
1
2
3
4
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit);
}
  1. 继续跟踪tryLock(waitTime, -1, unit)方法,如下代码所示。

由于源码太复杂,借助了大模型来理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/**
* 尝试获取锁,支持等待时间和锁自动释放时间。
*
* @param waitTime 等待获取锁的最大时间(超时时间)
* @param leaseTime 锁的持有时间(TTL,过期时间)。如果为 -1,则使用默认的 internalLockLeaseTime(30秒)
* @param unit 时间单位
* @return true 表示成功获取锁;false 表示在 waitTime 内未能获取锁
* @throws InterruptedException 如果当前线程被中断
*/
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 将等待时间转换为毫秒
long time = unit.toMillis(waitTime);
// 记录当前时间,用于计算耗时
long current = System.currentTimeMillis();
// 获取当前线程的 ID(用于支持可重入锁)
long threadId = Thread.currentThread().getId();

// 尝试立即获取锁
// 返回值 ttl:
// - null:表示成功获取锁
// - 非 null:表示锁已被其他线程持有,返回值是锁的剩余生存时间(TTL)
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);

// 如果 ttl 为 null,说明锁获取成功,直接返回 true
if (ttl == null) {
return true;
}

// 更新剩余等待时间:减去 tryAcquire 所消耗的时间
time -= System.currentTimeMillis() - current;

// 如果剩余等待时间已 <= 0,说明在尝试获取锁的过程中已经超时
if (time <= 0) {
// 触发获取锁失败的回调(可用于监控或日志)
acquireFailed(waitTime, unit, threadId);
return false;
}

// 重置当前时间,用于后续耗时计算
current = System.currentTimeMillis();

// 订阅锁释放的通知(Redis 的 pub/sub 机制)
// 当持有锁的线程释放锁时,会通过 channel 发送消息,唤醒等待的线程
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);

// 等待订阅完成,最多等待剩余的 time 毫秒
// 如果在指定时间内未完成订阅(如网络问题),则返回 false
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
// 如果无法取消 future,添加 onComplete 回调来处理后续清理
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
// 订阅取消后,执行反订阅清理资源
unsubscribe(subscribeFuture, threadId);
}
});
}
// 获取锁失败(订阅失败或超时)
acquireFailed(waitTime, unit, threadId);
return false;
}

// 进入循环尝试获取锁(基于事件通知 + 轮询)
try {
// 再次更新剩余等待时间
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}

// 循环尝试获取锁,直到成功或超时
while (true) {
long currentTime = System.currentTimeMillis();

// 再次尝试获取锁
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 成功获取锁
if (ttl == null) {
return true;
}

// 更新剩余等待时间
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}

// 等待锁释放的通知,或超时唤醒
currentTime = System.currentTimeMillis();

// 获取当前的订阅实体(包含 CountDownLatch 用于阻塞等待)
RedissonLockEntry entry = subscribeFuture.getNow();

// 根据锁的剩余 TTL 和剩余等待时间,决定阻塞多久:
// - 如果锁的 TTL 小于剩余等待时间,则最多等待 TTL 时间(因为锁可能很快释放)
// - 否则等待剩余的 time 时间
if (ttl >= 0 && ttl < time) {
// 阻塞等待 ttl 毫秒,或被 signal 唤醒
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 阻塞等待剩余的 time 时间
entry.getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

// 更新剩余等待时间
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 继续循环,再次尝试获取锁
}
} finally {
// 无论成功或失败,最终都要取消订阅,释放资源
unsubscribe(subscribeFuture, threadId);
}

// 注:原始代码中被注释掉的这行是异步版本的入口
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
  1. 然后继续跟踪tryAcquire(waitTime, leaseTime, unit, threadId)方法,进入到它的实现方法中,如下所示
1
2
3
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
  1. 紧接着,跟踪tryAcquireAsync(waitTime, leaseTime, unit, threadId)方法,进入到它的实现方法中,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* 异步尝试获取锁,并根据情况决定是否启动自动续期(Watch Dog 机制)
*
* @param waitTime 等待获取锁的最大时间(超时时间)
* @param leaseTime 锁的持有时间(TTL)。如果为 -1,表示使用默认的看门狗超时时间(默认 30 秒),并启用自动续期
* @param unit 时间单位
* @param threadId 当前线程 ID(用于支持可重入锁)
* @param <T> 返回值类型(此处为 Long)
* @return RFuture<Long> 异步结果:
* - null:表示成功获取锁
* - 非 null:表示锁已被其他线程持有,返回值是当前锁的剩余生存时间(TTL)
*/
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {

// ================= 第一种情况:用户指定了明确的 leaseTime(不等于 -1)=================
// 说明:用户不希望使用 Redisson 的自动续期机制(Watch Dog),锁会在 leaseTime 时间后自动释放
if (leaseTime != -1) {
// 直接调用底层 Lua 脚本尝试获取锁
// 不会启动自动续期任务
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}

// ================= 第二种情况:leaseTime == -1(默认行为)=================
// 说明:启用 Watch Dog 自动续期机制
// 使用默认的看门狗超时时间(默认 30 秒)作为初始锁过期时间
// 并在获取锁成功后,自动启动定时任务定期续期

// 获取 Redisson 配置中的看门狗超时时间(默认为 30 秒)
long leaseTimeForWatchdog = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();

// 调用底层 Lua 脚本尝试获取锁,使用看门狗超时时间作为锁的初始 TTL
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
waitTime,
leaseTimeForWatchdog, // 使用默认的看门狗时间(如 30s)
TimeUnit.MILLISECONDS,
threadId,
RedisCommands.EVAL_LONG);

// 添加异步回调:当 tryLockInnerAsync 执行完成后触发
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
// 如果发生异常,直接返回(后续由上层处理)
if (e != null) {
return;
}

// ================= 判断是否成功获取锁 =================
// ttlRemaining 为 null 表示当前线程成功获取了锁
if (ttlRemaining == null) {
// 启动自动续期任务(Watch Dog)
// 该任务会每隔一段时间(如 10s)向 Redis 发送命令,将锁的 TTL 重置为 30s
// 只要客户端还活着,锁就不会因超时而被释放
scheduleExpirationRenewal(threadId);
}
// 如果 ttlRemaining 不为 null,说明未获取到锁(返回的是当前锁的剩余时间),无需做任何操作
});

// 返回异步结果,上层可通过它获取锁的状态
return ttlRemainingFuture;
}
  1. 跟踪tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG)方法进入到最终的实现方法中。在这个方法中,通过lua脚本实现了锁的可重入。

lua脚本解释:

首先调用(redis.call(‘exists’, KEYS[1]) == 0判断锁是否存在,如果不存在则获取锁,并设置重入次数,此时获取锁成功,返回nil。如果锁存在,判断锁是否是当前线程已经获取的锁,如果是,重入次数+1,也返回nil。

如果以上两个都失败,则获取锁失败,调用return redis.call(‘pttl’, KEYS[1]),返回锁的剩余时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);

return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

通过以下案例,测试redisson分布式锁的可重入特性:

Redisson分布式锁可重试原理

原理

利用信号量和PubSub功能实现。当线程尝试获取锁失败后,会进行等待,等待锁释放的消息。获取锁的线程释放锁时会发出释放锁的消息,被等待线程捕捉到之后,等待线程重试获取锁。如果获取锁依然失败,则不断等待、唤醒。当然,会有一个最大等待时间,超过了则返回失败。

源码解读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
 @Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}

time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}

current = System.currentTimeMillis();
// 使用`subscribe(threadId)`订阅锁释放事件
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}

try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 只要还有剩余时间,就会继续尝试
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// 每次尝试后都会减少剩余可用时间
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}

// waiting for message
currentTime = System.currentTimeMillis();
// 根据锁的剩余存活时间(ttl)和客户端剩余等待时间(time)中的较小值来等待
if (ttl >= 0 && ttl < time) {
// 通过`RedissonLockEntry`的信号量(`getLatch()`)等待锁释放通知
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}

重试流程:

  1. 首先尝试直接获取锁(tryAcquire)
  2. 如果失败,订阅锁释放通道
  3. 在循环中:
    • 再次尝试获取锁
    • 如果失败,等待锁释放信号(不超过剩余时间)
    • 收到信号或超时后继续尝试
  4. 直到成功获取锁或总等待时间耗尽

Redisson分布式锁自动续约原理

原理:

利用watchDog机制,每隔一段时间重置超时时间。当线程获取锁时,如果不指定leaseTime,Redisson会开启一个“开门狗”线程定时(默认每隔10s)检查锁是否存在,存在的话则重置过期时间。

源码解读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private void renewExpiration() {
// 1. 从全局映射中获取当前锁的续期条目
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return; // 如果没有注册续期任务,直接返回
}

// 2. 创建一个延时任务:在 internalLockLeaseTime / 3 时间后执行
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 3. 再次检查续期条目是否存在(可能已被取消)
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
// 4. 获取持有锁的线程 ID
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}

// 5. 异步发送命令:延长 Redis 中该锁的过期时间
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}

// 6. 如果续期成功,递归调用自身,再次安排下一次续期
if (res) {
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

// 7. 将新创建的定时任务保存到续期条目中,便于后续取消
ee.setTimeout(task);
}

核心流程

步骤 说明
1. 续期触发时机 每次加锁成功后,Redisson 会启动这个 renewExpiration() 方法。
2. 续期间隔 使用 internalLockLeaseTime / 3 作为定时器延迟(默认 leaseTime=30s,则每 10s 续一次)。
3. 续期操作 调用 renewExpirationAsync() 向 Redis 发送命令(如 EXPIRE)将锁的 TTL 重置为 leaseTime
4. 循环续期 续期成功后,再次调用 renewExpiration(),形成一个递归循环,持续续期。
5. 安全退出 当锁被释放(unlock())或客户端断开时,会从 EXPIRATION_RENEWAL_MAP 中移除条目,下次 renewExpiration() 调用将直接返回,停止续期。

Redission分布式锁的MutiLock原理

TODO