学习自:【黑马程序员Redis入门到实战教程,深度透析redis底层原理+redis分布式锁+企业解决方案+黑马点评实战项目】https://www.bilibili.com/video/BV1cr4y1671t?p=67&vd_source=9fed3cefc266aa5b3895aaab6e6214f5
本文是基于上述视频教程和视频配套文档的文字总结,包含了以下几方面内容:
基于Redis实现验证码登录
Redis缓存、缓存更新策略、缓存穿透、雪崩、击穿问题
典型的秒杀场景、超卖问题及解决方案
Redis分布式锁、误删问题、原子性问题、lua脚本
Redisson分布式锁,包含其可重入、可重试、自动续期以及MutiLock的原理以及源码的分析
基于Redis实现验证码登录 基于session实现登录 过去我们使用基于session的验证码登录。缺点是session在集群环境下不能使用,因为多台tomcat服务器并不能共享session数据。由此产生了基于redis的登录流程。为了读者理解,先简单介绍一下基于session的登录。
登录的流程
用户输入手机号发起请求,服务端校验手机号格式是否正确,然后生成验证码保存到服务端自己 的session中,同时发送给用户。
用户输入手机号和验证码,服务端从session中取出验证码进行比对,通过后从数据库中查找用户信息。如果用户不存在,直接进行注册。然后将用户信息写入自己的session,返回成功信息给用户。
用户每次请求的时候,都会携带cookie,服务端从cookie中取出JsessionID,根据JessionID查找自己服务器 上的session。如果没有session信息,则进行拦截;如果有session信息,将用户信息写入ThreadLocal中,然后放行。
session共享问题 通过以上分析,我们发现,服务器每次在保存session的时候,只会保存到自己服务器的环境中,并不会保存到其他tomcat服务器上。那么,如果在tomcat服务器集群环境下,用户第一次请求1号tomcat服务器,1号服务器将session保存到自己服务器上。当用户第二次请求的时候,他的请求被nginx分配到了2号tomcat服务器上,2号服务器上并没有用户的session信息,则会进行拦截。问题就出现了。
如何解决呢?早期的方案是,一台tomcat服务器在保存session信息时,同时同步给其他所有tomcat服务器。
但是这种方案有两大问题:
每台tocat服务器都有完整session信息,服务器压力过大
session拷贝数据时,可能会出现延时
而又因为redis本身就是共享的,又是基于内存的,同时也是key、value结构,所以可以采用redis替代session的解决方案。
Redis替代Session 我们要考虑redis的key的结构和key具体的设计细节。
key的结构
采用string数据类型。string:key为string,value使用json字符串保存,比较直观。
key
value
heima:user:1
{name:”jack”,age:21}
采用hash数据类型。可以将对象中的每个字段单独存储,可以针对单个字段做CRUD,并且占用内存更少。
key
value
filed
value
heima:user:1
name
jack
age
21
key的具体细节
一般采用随机生成的token作为key就够了。
访问流程
当用户提交手机号和验证码后,后台校验手机号和验证码,通过后,根据手机号查询用户信息,存在的话将用户信息保存到redis中,并生成token作为redis的key,并且将token返回给用户。当用户携带token访问后台的时候,后台取出用户携带的token,根据token访问对应的redis数据,判断用户信息是否存在,如果存在则写入ThreadLocal,然后放行。
拦截器的配置与优化
在配置拦截器的时候,要手动刷新token的过期时间。但是这样还不够,因为如果用户一直访问拦截器拦截之外的路径,例如主页面的店铺信息,就不会进行token的刷新,token到期后用户就会被剔除。
解决办法:在加一层拦截器,拦截所有的路径,在这个拦截器中获取用户信息并刷新token过期时间。在第二个拦截前中,只判断ThreadLocal中是否包含用户信息,没有则拦截,有的话则放行。
温馨小贴士 tomcat的运行原理
tomcat启动之后,就会有一个监听线程时刻监听8080端口。当用户向tomcat运行的端口发起请求,监听线程就会和用户端创建socket连接。socket都是成对出现的,用户通过socket向tomcat发送数据,tomcat也通过socket向用户返回数据。当tomcat端的socket接收到数据后,监听线程会从tomcat的工作线程池中取出一个线程执行用户的请求。该线程基于用户的请求,访问controller、service、dao层,并且访问对应的DB,用户执行完请求之后,再统一返回,再找到tomcat的socket连接,将数据返回给用户端的socket。
关于threadlocal
在ThreadLocal的源码中,无论是get方法,还是set方法,ThreadLocal都会先获取当前用户的线程,然后从当前线程中取出ThreadLocalMap类型的map成员变量。只要线程不一样,取出的map变量就不一样,通过这种方式就实现了线程的隔离。
因此,基于以上我们可知,用户发出的每次请求都在tomcat取出的一个线程中完成,而且用户的每次请求都是独立的,每次请求都会取出一个线程,请求完之后线程进行回收。而ThreadLocal恰好也是每次都从当前线程中取出变量,将数据和线程做了捆绑。因此,当用户访问我们的工程时,就可以通过ThreadLocal来做到线程隔离,通过ThreadLocal做到每个线程操作自己的数据,每个线程都有自己的变量副本。
查询中的缓存 基本概念 缓存(Cache),就是数据交换的缓冲区,一般从数据库中获取,存储于本地代码。
缓存存储于代码中,而代码运行在内存中,内存的读写性能远高于磁盘,缓存可以大大降低用户访问并发量带来的服务器读写压力。
但是缓存也会增加代码复杂度和运营的成本。
缓存的好处:降低后端负载、提高读写效率,降低响应时间;
缓存的成本:数据一致性成本;代码维护成本;运维成本。
在springboot框架中,我们一般采用Redis缓存来降低数据库压力。
缓存的作用模型和思路
加入缓存的代码如下:
缓存更新策略 缓存更新是redis为了节省内存而设计出来的。我们知道,内存数据宝贵,当我们向redis插入太多数据的时候,内存占用增加,此时就需要更新redis缓存来降低内存的占用。有三种策略。
内存淘汰 :redis自动进行。当数据占用内存达到redis设置的maxmemory时,自动淘汰一些不太重要的数据。
默认淘汰策略为noeviction ,其他策略有:
allkeys-lru :从所有 key 中使用 LRU 算法进行淘汰。
volatile-lru :从设置了过期时间的 key 中使用 LRU 算法进行淘汰。
allkeys-random :从所有 key 中随机淘汰数据。
volatile-random :从设置了过期时间的 key 中随机淘汰。
volatile-ttl :在设置了过期时间的 key 中,根据 key 的过期时间进行淘汰,越早过期的越优先被淘汰。
超时剔除 :我们给redis数据设置了ttl时间后,redis会自动剔除ttl到期的数据。
主动更新 :手动干预。在编写业务逻辑时,在修改数据库的同时更新缓存。
主动更新策略又包括三种具体的方案:
Cache Aside Pattern:由缓存的调用者,在更新数据库的同时,更新缓存。又称之为双写。
Read/Write Through Pattern:缓存与数据库整合成一个服务,由服务来维持一致性,调用者无需关心缓存一致性问题。
Write Behind Caching Pattern:调用者只操作缓存,其他线程异步的将缓存持久化到数据库。
在业务场景为高一致性需求的情况下,我们一般选用主动更新策略,并以超时作为兜底方案。
采用哪种策略? 如果出现数据库和缓存不一致的问题,选用主动更新的哪种策略呢?
综合考虑我们使用方案一Cache Aside Pattern方案。
但是在操作数据库和缓存的同时,需要考虑几个问题。
删除缓存。如果每次操作数据库后,我们都进行缓存的更新,如果中间没有人查询缓存,那么这个缓存更新实际上只有最后一次的更新有效,这样就会多出无效的更新缓存操作。所有我们可以把缓存删除,等有人再来查询时,加载缓存中的数据。
单体系统利用事务操作。分布式系统利用TCC等分布式事务方案。
先操作数据库,再操作缓存。
原因如下:先操作数据库,再删除缓存造成的数据不一致概率更低。
如果是先删除缓存,再操作数据库的情况。假设缓存和数据库的初始值都为10。此时线程1进来,进行更新数据的操作。线程1先删除缓存10,再更新数据库的值为20。再这个间隙中,线程2恰好进来,进行查询的操作。线程2查询缓存,未命中,然后查询数据库得到10,并且将查询到的数据10写入缓存。在线程2写入缓存完成之后,线程1才完成对数据库的更新,数据库中的值变为20。这就造成了数据库和缓存中的数据的不一致性问题。因为线程2查询缓存、写入缓存的速度比线程1更新数据库的动作要快很多,因此数据不一致问题发生的概率很大。
如果是先操作数据库,再删除缓存的情况。必须要在缓存为空的前提下才会发生数据不一致性问题。假设数据库和缓存的值都为10。线程1进来,进行查询操作,查询缓存未命中,则去查数据库,得到值为10,然后写入缓存。在线程1查询数据库和写入缓存的间隙中,此时线程2进来,进行数据更新的操作。线程2更新数据库的值为20,然后删除缓存,线程2操作完成之后,此时线程1才执行写入缓存10的操作。此时数据库的值为20,缓存的值为10,造成了数据的不一致性问题。而线程1写入缓存10这个操作的速度比线程2更新数据库再删除缓存的速度快很多,因此这种情况下数据不一致问题发生的概率很低。
代码示例 根据以上的缓存更新策略。完成以下的需求:
代码如下:
缓存更新策略的最佳实践方案 我们来小结一下。根据不同的需求,采取不同的缓存更新策略。
低一致性需求:使用Redis自带的内存淘汰机制
高一致性需求:主动更新,并以超时剔除作为兜底方案
读操作:缓存命中则直接返回缓存未命中则查询数据库,并写入缓存,设定超时时间
写操作:先写数据库,然后再删除缓存要确保数据库与缓存操作的原子性
缓存穿透问题 缓存穿透是指用户请求数据时,数据库和缓存中都没有数据的存在,这样缓存永远不会生效,请求直接达到数据库中。
常见的缓存穿透解决方案有两种。
缓存空对象
布隆过滤器
优点:没有额外的内存消耗,没有多余的key
缺点:
缓存空对象分析:
就是当用户请求数据库不存在的数据时,将空值写入redis,并设置过期时间 ,避免对数据库的超负荷访问
布隆过滤分析:
布隆过滤实际上一个庞大的二进制数组,采用哈希思想来判断请求数据是否在redis中存在。如果存在,则放行,访问redis,哪怕redis中的数据过期了,数据库中也存在,则访问数据库,写入redis,然后返回。如果不存在,则直接拒绝访问。(相当于在用户请求redis中加了一层,用户先访问布隆过滤器,再访问redis)
图解如下:
小结:
缓存穿透的原因?(略)
缓存穿透的解决方案?
缓存null值
布隆过滤
增加id的复杂性,避免被猜测出id的规律
做好数据的基础格式校验
加强用户权限校验
做好热点参数的限流
缓存雪崩问题 缓存雪崩是指在同一时间段大量的缓存key同时时效或者redis服务宕机,导致大量的请求打到数据库,带来巨大压力。
解决方案:
给不同的key添加随机的ttl值
利用redis集群提高服务的可用性
添加多级缓存
给缓存业务添加降级或者限流策略
缓存击穿问题 缓存击穿问题也叫热点key问题,就是一个被高并发访问 且缓存重建业务较复杂 的key突然失效了,无数的请求瞬间给数据库带来巨大压力。
常见的解决方案有两种:
互斥锁方案 当线程1未命中缓存之后,获取互斥锁,然后进行查询数据库以及缓存的重建。这段时间中,线程2也来查询,缓存未命中,也尝试获取互斥锁,但是获取互斥锁失败,休眠一会再重试。直到线程1完成缓存的重建之后释放锁,线程2才能来访问。此时,线程2缓存命中,直接返回数据。
逻辑过期方案 方案分析:我们之所以会出现缓存击穿的问题,就是因为我们对key设置了过期时间,假设我们不设置过期时间,其实就不会有缓存击穿问题。但是数据会一直占用内存,我们可以设置逻辑过期时间来解决。
逻辑过期时间方案分析:
我们写入缓存数据的时候,在redis的value中设置expire逻辑过期时间。假设线程1请求过来,查询缓存,判断逻辑过期时间是否过期,如果发现已经过期,则尝试获取互斥锁,开启一个独立线程2,由独立线程去完成缓存的重建。线程1直接返回过期的数据。此时线程3也过来请求,发现缓存中的逻辑过期时间已经过期,则尝试获取互斥锁,获取锁失败(因为此时线程2还正在重建缓存,没有释放锁),直接返回过期数据。直到独立线程2重建缓存完成之后释放锁。此时后面的线程再来访问得到的就是新数据了。
注意:逻辑过期时间并不直接作用于redis,不同于设置ttl
两个方案的对比 互斥锁方案:因为实现了互斥性,所以保证了数据的一致,且实现简单。也没有其他的事情需要做,没有额外的内存消耗。缺点在于可能会发生死锁,且只能串行执行,性能有所影响。
逻辑过期时间方案:线程读取数据的过程不需要等待,性能好。额外维护逻辑过期时间,有额外的内存消耗。有一个独立线程进行缓存重建,但在重构数据完成前,其他线程只能返回旧数据,出现了数据的不一致性。实现起来麻烦。
优缺点如下图所示。
代码实现:利用互斥锁解决缓存击穿问题
实现互斥锁的代码:
核心思想是利用redis的setnx方法来表示获取锁。如果返回true,则代码这个key不存在,表示获取锁成功。如果返回false,说明已经有人对这个key进行了设置,获取锁失败。
1 2 3 4 5 6 7 8 private boolean tryLock (String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1" , 10 , TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } private void unlock (String key) { stringRedisTemplate.delete(key); }
主要业务流程的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public Shop queryWithMutex (Long id) { String key = CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get("key" ); if (StrUtil.isNotBlank(shopJson)) { return JSONUtil.toBean(shopJson, Shop.class); } if (shopJson != null ) { return null ; } String lockKey = "lock:shop:" + id; Shop shop = null ; try { boolean isLock = tryLock(lockKey); if (!isLock){ Thread.sleep(50 ); return queryWithMutex(id); } shop = getById(id); if (shop == null ){ stringRedisTemplate.opsForValue().set(key,"" ,CACHE_NULL_TTL,TimeUnit.MINUTES); return null ; } stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_NULL_TTL,TimeUnit.MINUTES); }catch (Exception e){ throw new RuntimeException (e); } finally { unlock(lockKey); } return shop; }
代码实现:利用逻辑过期方案解决缓存击穿问题 需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题
具体的业务流程图如下:
注意:因为我们没有给缓存设置ttl,理论上所有请求的数据在redis中都存在,如果不存在,则认为是该数据是真的不存在。(例如商家上线了一个活动,用户访问活动之外的商品。或者活动已经过期,缓存数据已经被清理,用户还去访问之前活动中的商品)
步骤一:封装逻辑过期时间的数据
现在要在redis存储的数据中带上逻辑过期时间,那么你可以修改原来的实体类加上过期时间。但是这种做法对原来代码有入侵性,不好。因此,我们新建一个实体类,封装原来数据和过期时间。
1 2 3 4 5 @Data public class RedisData { private LocalDateTime expireTime; private Object data; }
步骤二:业务代码
#ShopServiceImpl.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10 );public Shop queryWithLogicalExpire ( Long id ) { String key = CACHE_SHOP_KEY + id; String json = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isBlank(json)) { return null ; } RedisData redisData = JSONUtil.toBean(json, RedisData.class); Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class); LocalDateTime expireTime = redisData.getExpireTime(); if (expireTime.isAfter(LocalDateTime.now())) { return shop; } String lockKey = LOCK_SHOP_KEY + id; boolean isLock = tryLock(lockKey); if (isLock){ CACHE_REBUILD_EXECUTOR.submit( ()->{ try { this .saveShop2Redis(id,20L ); }catch (Exception e){ throw new RuntimeException (e); }finally { unlock(lockKey); } }); } return shop; } public void saveShop2Redis (Long id, Long expireSeconds) throws InterruptedException { Shop shop = getById(id); Thread.sleep(2000 ); RedisData redisData = new RedisData (); redisData.setData(shop); redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds)); stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,JSONUtil.toJsonStr(redisData)); }
步骤三:
因为在该方案中,要保证redis中包含了所有的热点数据。因此,需要先进行缓存的预热。在单元测试中进行缓存预热。
#src/test/java/com/hmdp/HmDianPingApplicationTests.java
1 2 3 4 5 6 7 8 9 10 @Test void testSaveShop2Redis () throws InterruptedException { for (long i = 1 ; i < 15 ; i++) { Shop shop = shopService.getById(i); cacheClient.setWithLogicalExpire(CACHE_SHOP_KEY + i,shop,10L , TimeUnit.SECONDS); } }
然后就可以重启服务,进行测试。
封装redis的工具类 每次解决缓存问题都需要自己一个一个编写,太麻烦了。因此,我们基于StringRedisTemplate封装一个缓存工具类,方便以后我们的调用。缓存工具类满足以下要求:
方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓
存击穿问题
方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
方法5:根据指定的key查询缓存,并反序列化为指定类型,需要利用互斥锁解决缓存击穿问题
封装的工具类如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 @Slf4j @Component public class CacheClient { private final StringRedisTemplate stringRedisTemplate; private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10 ); public CacheClient (StringRedisTemplate stringRedisTemplate) { this .stringRedisTemplate = stringRedisTemplate; } public void set (String key, Object value, Long time, TimeUnit unit) { stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit); } public void setWithLogicalExpire (String key, Object value, Long time, TimeUnit unit) { RedisData redisData = new RedisData (); redisData.setData(value); redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time))); stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData)); } public <R,ID> R queryWithPassThrough ( String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) { String key = keyPrefix + id; String json = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(json)) { return JSONUtil.toBean(json, type); } if (json != null ) { return null ; } R r = dbFallback.apply(id); if (r == null ) { stringRedisTemplate.opsForValue().set(key, "" , CACHE_NULL_TTL, TimeUnit.MINUTES); return null ; } this .set(key, r, time, unit); return r; } public <R, ID> R queryWithLogicalExpire ( String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) { String key = keyPrefix + id; String json = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isBlank(json)) { return null ; } RedisData redisData = JSONUtil.toBean(json, RedisData.class); R r = JSONUtil.toBean((JSONObject) redisData.getData(), type); LocalDateTime expireTime = redisData.getExpireTime(); if (expireTime.isAfter(LocalDateTime.now())) { return r; } String lockKey = LOCK_SHOP_KEY + id; boolean isLock = tryLock(lockKey); if (isLock){ CACHE_REBUILD_EXECUTOR.submit(() -> { try { R newR = dbFallback.apply(id); this .setWithLogicalExpire(key, newR, time, unit); } catch (Exception e) { throw new RuntimeException (e); }finally { unlock(lockKey); } }); } return r; } public <R, ID> R queryWithMutex ( String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) { String key = keyPrefix + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(shopJson)) { return JSONUtil.toBean(shopJson, type); } if (shopJson != null ) { return null ; } String lockKey = LOCK_SHOP_KEY + id; R r = null ; try { boolean isLock = tryLock(lockKey); if (!isLock) { Thread.sleep(50 ); return queryWithMutex(keyPrefix, id, type, dbFallback, time, unit); } r = dbFallback.apply(id); if (r == null ) { stringRedisTemplate.opsForValue().set(key, "" , CACHE_NULL_TTL, TimeUnit.MINUTES); return null ; } this .set(key, r, time, unit); } catch (InterruptedException e) { throw new RuntimeException (e); }finally { unlock(lockKey); } return r; } private boolean tryLock (String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1" , 10 , TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } private void unlock (String key) { stringRedisTemplate.delete(key); } }
在ShopServiceImpl中,利用工具类中的方法解决缓存穿透和缓存击穿问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Resource private CacheClient cacheClient; @Override public Result queryById (Long id) { Shop shop = cacheClient .queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, this ::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES); if (shop == null ) { return Result.fail("店铺不存在!" ); } return Result.ok(shop); }
优惠券的秒杀 全局唯一ID 全局唯一ID的必要性?
以店铺发布优惠券,用户抢购优惠券,将信息保存到订单表中这个业务场景为例。
如果订单表还使用数据库自增的ID,就存在一些问题:
通常情况下,msyql单表数据量不能超过500w,超出之后就要分库分表,但是拆分之后,这些表逻辑上来说是一张表,id不能一样,于是我们就需要保证id的唯一性
全局ID生成器,是一种在分布式系统下用来生成全局ID的工具,一般要满足以下特性:
如果直接使用redis自增的数值,id的规律太明显了,安全性低,因此要拼接一些其他信息:
ID的组成部分:
符号位:1bit,永远为0
时间戳:31bit,以秒为单位,2^31秒≈68年
序列号:32bit,秒内的计数器,支持每秒生成2^32个不同的id
Redis实现全局唯一Id 代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Component public class RedisIdWorker { private static final long BEGIN_TIMESTAMP = 1640995200L ; private static final int COUNT_BITS = 32 ; private StringRedisTemplate stringRedisTemplate; public RedisIdWorker (StringRedisTemplate stringRedisTemplate) { this .stringRedisTemplate = stringRedisTemplate; } public long nextId (String keyPrefix) { LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timestamp = nowSecond - BEGIN_TIMESTAMP; String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd" )); String orderKey = "icr:" + keyPrefix + ":" + date; long count = stringRedisTemplate.opsForValue().increment(orderKey); return timestamp << COUNT_BITS | count; } }
测试类代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private ExecutorService es = Executors.newFixedThreadPool(500 );@Test void testIdWorker () throws InterruptedException { CountDownLatch latch = new CountDownLatch (300 ); Runnable task = () -> { for (int i = 0 ; i < 100 ; i++) { long id = redisIdWorker.nextId("order" ); System.out.println("id = " + id); } latch.countDown(); }; long begin = System.currentTimeMillis(); for (int i = 0 ; i < 300 ; i++) { es.submit(task); } latch.await(); long end = System.currentTimeMillis(); System.out.println("time = " + (end - begin)); }
小贴士:关于countDownLatch
countDownLatch名为信号枪:主要的作用是同步协调多线程的等待与唤醒问题(允许一个或多个线程等待直到其他线程执行的一组操作完成的同步辅助)
给定计数初始化CountDownLatch,所调用的await方法阻塞,直到当前计数达到0。每调用一次countDown方法,计数减一。当计数为0的时候,所有等待的线程被释放和任何后续调用的await立刻返回。
秒杀下单和库存超卖问题 秒杀下单是基本的业务逻辑,比较简单。秒杀下单的流程图如下:
代码实现如下:
#VoucherOrderServiceImpl.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Override public Result seckillVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始!" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!" ); } if (voucher.getStock() < 1 ) { return Result.fail("库存不足!" ); } boolean success = seckillVoucherService.update() .setSql("stock= stock -1" ) .eq("voucher_id" , voucherId).update(); if (!success) { return Result.fail("库存不足!" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); Long userId = UserHolder.getUser().getId(); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); }
这里我们就要注意了,在判断是否超卖的代码中,我们是这样写的:
1 2 3 4 5 6 7 8 9 10 11 12 if (voucher.getStock() < 1 ) { return Result.fail("库存不足!" ); } boolean success = seckillVoucherService.update() .setSql("stock= stock -1" ) .eq("voucher_id" , voucherId).update(); if (!success) { return Result.fail("库存不足!" ); }
仔细观察,会发现它存在问题。假设一个场景,线程1来查询库存,判断库存充足之后,准备扣减库存,在线程1还没来及扣减库存时,线程2进来了。线程2也查询库存,此时因为线程1还没有来得及扣减库存,那么线程2也发现库存充足,也去扣减库存。两个线程同时扣减库存,就会出现库存超卖的问题。
超卖问题是典型的线程安全问题,针对这一问题的常见解决方案是加锁。而对于加锁,我们有两种方案:
悲观锁
乐观锁
认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。 例如Synchronized、Lock都属于悲观锁
认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其他线程对数据做了修改。 如果没有修改则认为是安全的,自己才更新数据。 如果已经被其他线程修改则说明发生了线程安全问题,则重试或者抛出异常。
使用乐观锁解决超卖问题 方案一
VoucherOrderServiceImpl在扣减库存时改为:
1 2 3 boolean success = seckillVoucherService.update() .setSql("stock= stock -1" ) .eq("voucher_id" , voucherId).eq("stock" ,voucher.getStock()).update();
核心是,在修改库存的时候判断现在的库存和上次查询出的库存是否一致。如果一致,说明没有人在中间修改过库存,是线程安全的。但是这样会有个问题:如果100个线程同时来查询库存并进行扣减,第1个线程首先进行扣减库存的操作,因为它是第一个进来的,没有人修改库存,则线程1成功扣减库存。那么后面99个线程来扣减库存的时候,都会发现现在的库存和它们上次查询出的库存不一致,扣减库存会失败。这样一来,成功扣减库存的概率太低。
方案二
只需要改成stock > 0,即可解决以上的问题。
1 2 3 boolean success = seckillVoucherService.update() .setSql("stock= stock -1" ) .eq("voucher_id" , voucherId).update().gt("stock" ,0 );
优惠券秒杀-一人一单 优惠券的目的是为了引流,应该确保一人只能抢购一单,目前的情况是可以无限制购买,因此要修改业务逻辑,确保一人只能抢购某种优惠券的一个。流程图如下:
#VoucherOrderServiceImpl的初始代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 @Override public Result seckillVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始!" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!" ); } if (voucher.getStock() < 1 ) { return Result.fail("库存不足!" ); } Long userId = UserHolder.getUser().getId(); int count = query().eq("user_id" , userId).eq("voucher_id" , voucherId).count(); if (count > 0 ) { return Result.fail("用户已经购买过一次!" ); } boolean success = seckillVoucherService.update() .setSql("stock= stock -1" ) .eq("voucher_id" , voucherId).update(); if (!success) { return Result.fail("库存不足!" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); }
我们注意到,又出现了并发安全问题。当多个线程并发过来,查询数据库,发现订单都不存在,都会去创建订单。所以还是需要加锁,而乐观锁比较适合更新数据,现在是插入数据,我们使用悲观锁。
我们先把从一人一单到创建订单的逻辑抽离出一个方法createVocuherOrder。为了确保线程安全,在方法上加了一把sychronized锁。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Transactional public synchronized Result createVoucherOrder (Long voucherId) { Long userId = UserHolder.getUser().getId(); int count = query().eq("user_id" , userId).eq("voucher_id" , voucherId).count(); if (count > 0 ) { return Result.fail("用户已经购买过一次!" ); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId).gt("stock" , 0 ) .update(); if (!success) { return Result.fail("库存不足!" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); }
但是这样添加锁,锁的粒度太粗了。sychronized锁加在方法上,使用的是当前实例对象作为锁,这就意味着同一时间只能有一个线程进入该方法。即便是不同的用户操作抢购不同的优惠券,也会因为锁住的是同一个实例而串行执行,并发性能差。因此要修改锁的粒度。
我们的业务是一人一单,因此只要将锁的粒度改为用户级别 就可以了。将加锁处的代码改为:
1 synchronized (userId.toString().intern())
这个代码会userId对象加锁。其中,intern确保相同userId返回同一个字符串对象。
完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Transactional public Result createVoucherOrder (Long voucherId) { Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()){ int count = query().eq("user_id" , userId).eq("voucher_id" , voucherId).count(); if (count > 0 ) { return Result.fail("用户已经购买过一次!" ); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId).gt("stock" , 0 ) .update(); if (!success) { return Result.fail("库存不足!" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); } }
但是以上代码还存在问题,细心的同学会发现,代码中锁的释放在事务提交之前,这样就会出现问题。例如线程1释放锁之后,还未提交事务,这时线程2进来,读取线程1未提交的数据例如库存,就会造成超卖。
在事务中,所有的数据库操作都会先记录在事务日志中,等事务提交后才真正写入数据库。
因此,我们需要将锁的范围扩大到事务提交之后才释放。在seckillVoucher 方法中,添加以下逻辑,
1 2 3 4 Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) { return this .createVoucherOrder(voucherId); }
但是以上做法仍有问题,因为事务想要生效,必须利用代理生效,所以这个地方,我们要先获取原始的事务对象,再操作事务:
1 2 3 4 5 synchronized (userId.toString().intern()) { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); }
至此,我们就解决了一人一单的问题。但是这是在单机环境下,在集群环境下,仍有问题。
集群环境下的并发问题 我们利用idea启动两份服务。然后前端nginx服务器中配置负载均衡。如图:
这个时候,我们利用postman进行测试。创建两个相同抢购订单的接口,执行之后会发现一个用户依旧抢了两单。这是为什么呢?我们不是已经加锁了吗?
有关锁失效原因分析
这是因为我们启动了两个tomcat,每个tomcat都有自己的jvm,每个jvm有自己的锁监视器。例如线程1对第一台tomcat服务器发起请求,此时请求的是jvm1,线程1尝试获取锁,获取锁成功,jvm1的锁监视器就知道此时syn这把锁被线程1持有。后面再有线程进来,都会获取锁失败。但是如果此时线程3进来,它请求的不是第一个tomcat服务器,而是第二个tomcat服务器,那么不同的tomcat有自己的jvm,有自己的锁监视器。此时jvm2的锁监视器中没有人获取锁,线程3就会获取锁成功,造成一人两单的问题。
这就是集群环境下syn锁失效的原因,我们需要使用分布式锁来解决这个问题。
分布式锁 基本原理与实现方式对比 分布式锁概念:满足分布式系统或集群环境下多进程可见并且互斥的锁。
分布式锁的核心思路就是让大家使用同一把锁(只要一个锁监视器),这样就能锁住线程,让程序串行执行。
分布式锁满足的条件:
多进程可见(不同的jvm实例中的多个进程可见,多个进程能感知到变化)
互斥
高可用
高性能
安全性
分布式锁的实现方式:
Redis分布式锁的核心实现思路 实现分布式锁时需要两个方法:
获取锁
互斥:确保只要一个线程能获取锁
非阻塞:尝试一次,成功返回true,失败返回false
释放锁
核心思路:
我们利用sexnx命令,当有多个线程进入时,调用该方法,只有第一个进来的线程setnx调用成功返回true,然后线程1执行业务逻辑,释放锁。其他线程都返回false,等待后重试,直到锁被释放,后面的线程才去执行任务。
代码:实现分布式锁 定义锁接口
1 2 3 4 5 6 7 8 9 10 11 public interface Ilock { void unLock () ; }
实现加锁 、释放锁的逻辑
#SimpleRedisLock.java
加锁逻辑:
1 2 3 4 5 6 7 8 9 10 private static final String KEY_PREFIX="lock:" @Override public boolean tryLock (long timeoutSec) { String threadId = Thread.currentThread().getId() Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId + "" , timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); }
释放锁逻辑:
1 2 3 4 public void unlock () { stringRedisTemplate.delete(KEY_PREFIX + name); }
修改业务代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Override public Result seckillVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始!" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!" ); } if (voucher.getStock() < 1 ) { return Result.fail("库存不足!" ); } Long userId = UserHolder.getUser().getId(); SimpleRedisLock lock = new SimpleRedisLock ("order:" + userId, stringRedisTemplate); boolean isLock = lock.tryLock(1200 ); if (!isLock) { return Result.fail("不允许重复下单" ); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { lock.unlock(); } }
Redis分布式锁的误删 问题:
假设这样一个情况,线程1获取锁,执行业务逻辑,突然出现了阻塞,阻塞过程中,线程1的锁到期释放。此时线程2进来,获取了锁,执行自己的业务逻辑。线程1阻塞完成,准备执行删锁的逻辑,但此时锁是线程2的,就会造成分布式锁误删的问题。
解决方案 :
在释放锁的代码中,加入判断该锁是不是自己的锁的逻辑。
代码实现 :
核心逻辑:获取锁时,放入自己的线程标识;释放锁时,判断锁中的线程标识和当前线程的标识是否一致,一致则删除。不一致则不删除。
流程图如下:
#修改加锁的逻辑:
1 2 3 4 5 6 7 8 9 10 private static final String ID_PREFIX = UUID.randomUUID().toString(true ) + "-" ;@Override public boolean tryLock (long timeoutSec) { String threadId = ID_PREFIX + Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); }
为什么锁的标识不直接使用线程id,而是加入了UUID?
线程id只能在单个jvm上唯一,不同机器上的jvm可能有相同的线程id,这样在分布式系统下就无法保证唯一性。而使用UUID保证了跨jvm、跨机器场景下id的唯一性。
#修改释放锁的逻辑:
1 2 3 4 5 6 7 8 9 10 11 public void unlock () { String threadId = ID_PREFIX + Thread.currentThread().getId(); String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); if (threadId.equals(id)) { stringRedisTemplate.delete(KEY_PREFIX + name); } }
测试 :
修改完代码之后,重启两个工程,重启两个线程。当线程1持有锁之后,手动释放锁。线程2进到锁内部获取锁。此时放行线程1,线程1进入到释放锁逻辑中,发现该锁的value值并不是自己的锁,因此不能释放锁。
到此,我们通过在释放锁代码中加入判断锁是不是自己的逻辑,初步解决了分布式锁误删的问题。
Redis分布式锁的原子性问题 此时分布式锁仍有问题。试想一个极端的情况,线程1进入释放锁的逻辑,假设是线程1的锁,线程1比锁成功,接着线程1准备删除锁,但此时线程1的锁正好到期释放。此时线程2抢先进来,获取锁成功。这时线程1反应过来,继续自己的流程,但是线程1直接执行删锁的逻辑,把线程2的锁给删除了,相当于条件判断并没有起到作用。这就是删锁的原子性问题。
为什么呢?因为我们的拿锁、比锁、删锁并不符合原子性,中间会被其他线程抢进。我们需要借助lua脚本解决这个问题。
lua脚本解决多条命令原子性问题 Redis提供了lua脚本功能,在一个脚本中编写多条redis命令,它能够确保命令执行的原子性。
Redis提供的调用函数如下:
1 redis.call('命令名称' , 'key' , '其它参数' , ...)
例如,我们要执行set name jack,lua脚本是这样:
1 2 # 执行 set name jack redis.call('set' , 'name' , 'jack' )
例如,我们要先执行set name Rose,再执行get name,则脚本如下:
1 2 3 4 5 6 # 先执行 set name jack redis.call('set' , 'name' , 'Rose' ) # 再执行 get name local name = redis.call('get' , 'name' )# 返回 return name
写好脚本之后,就需要通过Redis调用脚本,命令如下:
1 2 3 4 5 127.0.0.1:6379> help @scripting EVAL script numkeys key [key ...] arg [arg ...] summary: Execute a Lua script server side since: 2.6.0
例如,我们要执行redis.call(‘set’,’name’,’jack’)这个脚本,语法如下:
如果不想脚本中的key、value写死,可以作为参数传递。key类型的参数放在KEYS数组,其他参数放入ARGV数组,在脚本中可以从KEYS和ARGV数组中获取这些参数:
然后我们就可以编写释放锁逻辑的lua脚本了。先梳理一下流程:
获取线程中的锁的标识
判断是否与当前线程标识一样
如果一样,释放锁
不一样,什么都不做
其lua脚本如下:
1 2 3 4 5 6 7 8 if (redis.call('GET' , KEYS[1 ]) == ARGV[1 ]) then return redis.call('DEL' , KEYS[1 ]) end return 0
利用Java代码调用lua脚本改造分布式锁 在RedisTempalte中,可以利用execute方法执行lua脚本,execute方法如下:
1 2 3 4 5 6 7 8 @Override public <T> T execute (RedisScript<T> script, List<K> keys, Object... args) { return scriptExecutor.execute(script, keys, args); }
execute方法和Redis执行lua脚本命令的参数对应如下:
Java代码 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript <>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource ("unlock.lua" )); UNLOCK_SCRIPT.setResultType(Long.class); } public void unlock () { stringRedisTemplate.execute( UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId()); }
测试 (略)
小总结 :
基于Redis实现分布式锁思路:
利用set nx ex获取锁,设置过期时间,保存线程标识
释放锁时先判断线程标识是否与自己一致,一致则删除
特性:
利用set nx满足互斥性
利用set ex保证发生故障时依然能够释放锁,避免死锁
利用Redis集群保证高可用和高并发特性(这里貌似并没有用到)
分布式锁-Redisson 分布式锁-Redisson功能介绍 基于setnx实现的分布式锁存在以下问题:
重入问题 :可重入锁是指获得锁的线程可以再次进入相同的获取锁代码块中。但是setnx锁不可重入。可重入锁是为了避免死锁的发生。
死锁发生的案例:
有这么一段获取锁的代码:
1 2 3 4 5 6 7 8 private boolean tryLock (String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1" , 10L , TimeUnit.SECONDS); while (!BooleanUtil.isTrue(flag)) { } return true ; }
有这么一个场景:
1 2 3 4 5 6 7 8 9 10 public void methodA () { tryLock("order:1" ); methodB(); ........ } public void methodB () { try ("order:1" ); }
线程进入methodA方法中,尝试获取锁,获取锁成功,然后进入methodB方法中,再次尝试获取同一把锁,因为锁已经被当前线程获取了,会获取锁失败。而methodB方法则会一直等待当前线程释放锁,但这是不可能的,就出现了死锁问题。
不可重试问题 :指目前的分布式锁只能重试获取一次。我们认为合理的情况是,当线程获取锁失败后,能不断的重新获取锁。
超时释放问题 :设置锁的超时时间,能够防止死锁的发生。但如果我们的业务执行耗时较长,业务还未完成,锁就到期释放了,就会存在一定的安全隐患。
主从一致性问题 :如果Redis提供了集群,当我们向集群写入数据的时候,主机需要异步的将数据同步给从机,而如果在主机同步完成之前突然宕机,就会出现死锁问题。(也就是如果主机同步给从机的数据有锁数据,就会造成锁的丢失,从而出现死锁)
而我们的Redisson就可以解决这几个问题。
那什么是Redisson呢?
Redisson是在java基础上实现的驻内存的数据网络(In-Memory Data Grid)。它不仅提供了一系列的分布式java常用对象,还提供了许多分布式服务。其中就包含了各种分布式锁的实现。
官方wiki文档:8. 分布式锁和同步器 · redisson/redisson Wiki
分布式锁-Redisson快速入门 在java中引入redisson中有两种方式,一种是直接引入redisson,一种是通过springboot集成。这里我们采取第一种。
引入依赖:
1 2 3 4 5 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson</artifactId > <version > 3.13.6</version > </dependency >
配置Redisson客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient () { Config config = new Config (); config.useSingleServer().setAddress("redis://192.168.150.101:6379" ) .setPassword("123321" ); return Redisson.create(config); } }
如何使用Redission的分布式锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Resource private RedissionClient redissonClient;@Test void testRedisson () throws Exception{ RLock lock = redissonClient.getLock("anyLock" ); boolean isLock = lock.tryLock(1 ,10 ,TimeUnit.SECONDS); if (isLock){ try { System.out.println("执行业务" ); }finally { lock.unlock(); } } }
在 VoucherOrderServiceImpl,使用redisson替换掉之前的redis自定义锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Resource private RedissonClient redissonClient;@Override public Result seckillVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始!" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!" ); } if (voucher.getStock() < 1 ) { return Result.fail("库存不足!" ); } Long userId = UserHolder.getUser().getId(); RLock lock = redissonClient.getLock("lock:order:" + userId); boolean isLock = lock.tryLock(); if (!isLock) { return Result.fail("不允许重复下单" ); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { lock.unlock(); } }
Redisson分布式锁可重入的原理 原理 :
Redisson采用hash结构实现锁的可重入,替代了上面我们redis使用的string结构。采用hash结构存储线程id和重入次数。每当线程获取锁的时候,先判断锁释是否存在,如果不存在,则获取锁成功,记录线程标识,重入次数记为1。如果锁存在也不一定说明获取锁失败,如果线程标识和锁中的线程标识一致,说明是同一个线程来获取锁,重入次数+1即可。每当线程释放锁的时候,重入次数-1,直到重入次数为0,才真正的释放锁。
源码解读 :
通过一步步跟踪以下代码中的tryLock 方法来分析可重入锁的原理。
在tyrLock处,ctrl+alt+B,选择RedissonLock(org.redisson)跟踪
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Test void method1 () throws InterruptedException { boolean isLock = lock.tryLock(1 , TimeUnit.SECONDS); if (!isLock){ log.error("获取锁失败,1" ); return ; } try { log.info("获取锁成功,1" ); method2(); log.info("开始执行业务...1" ); }finally { log.info("释放锁,1" ); lock.unlock(); } }
进入RedissLock.java的tryLock方法中,
1 2 3 4 @Override public boolean tryLock (long waitTime, TimeUnit unit) throws InterruptedException { return tryLock(waitTime, -1 , unit); }
继续跟踪tryLock(waitTime, -1, unit)方法,如下代码所示。
由于源码太复杂,借助了大模型来理解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 @Override public boolean tryLock (long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - current; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } current = System.currentTimeMillis(); RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false )) { subscribeFuture.onComplete((res, e) -> { if (e == null ) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false ; } try { time -= System.currentTimeMillis() - current; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } while (true ) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } currentTime = System.currentTimeMillis(); RedissonLockEntry entry = subscribeFuture.getNow(); if (ttl >= 0 && ttl < time) { entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { entry.getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } } } finally { unsubscribe(subscribeFuture, threadId); } }
然后继续跟踪tryAcquire(waitTime, leaseTime, unit, threadId)方法,进入到它的实现方法中,如下所示
1 2 3 private Long tryAcquire (long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); }
紧接着,跟踪tryAcquireAsync(waitTime, leaseTime, unit, threadId)方法,进入到它的实现方法中,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 private <T> RFuture<Long> tryAcquireAsync (long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1 ) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } long leaseTimeForWatchdog = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); RFuture<Long> ttlRemainingFuture = tryLockInnerAsync( waitTime, leaseTimeForWatchdog, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null ) { return ; } if (ttlRemaining == null ) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
跟踪tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG)方法进入到最终的实现方法中。在这个方法中,通过lua脚本实现了锁的可重入。
lua脚本解释:
首先调用(redis.call(‘exists’, KEYS[1]) == 0判断锁是否存在,如果不存在则获取锁,并设置重入次数,此时获取锁成功,返回nil。如果锁存在,判断锁是否是当前线程已经获取的锁,如果是,重入次数+1,也返回nil。
如果以上两个都失败,则获取锁失败,调用return redis.call(‘pttl’, KEYS[1]),返回锁的剩余时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <T> RFuture<T> tryLockInnerAsync (long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);" , Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
通过以下案例,测试redisson分布式锁的可重入特性:
Redisson分布式锁可重试原理 原理 :
利用信号量和PubSub功能实现。当线程尝试获取锁失败后,会进行等待,等待锁释放的消息。获取锁的线程释放锁时会发出释放锁的消息,被等待线程捕捉到之后,等待线程重试获取锁。如果获取锁依然失败,则不断等待、唤醒。当然,会有一个最大等待时间,超过了则返回失败。
源码解读 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 @Override public boolean tryLock (long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - current; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } current = System.currentTimeMillis(); RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false )) { subscribeFuture.onComplete((res, e) -> { if (e == null ) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false ; } try { time -= System.currentTimeMillis() - current; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } while (true ) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } } } finally { unsubscribe(subscribeFuture, threadId); } }
重试流程 :
首先尝试直接获取锁(tryAcquire
)
如果失败,订阅锁释放通道
在循环中:
再次尝试获取锁
如果失败,等待锁释放信号(不超过剩余时间)
收到信号或超时后继续尝试
直到成功获取锁或总等待时间耗尽
Redisson分布式锁自动续约原理 原理:
利用watchDog机制,每隔一段时间重置超时时间。当线程获取锁时,如果不指定leaseTime,Redisson会开启一个“开门狗”线程定时(默认每隔10s)检查锁是否存在,存在的话则重置过期时间。
源码解读 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 private void renewExpiration () { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null ) { return ; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask () { @Override public void run (Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null ) { return ; } Long threadId = ent.getFirstThreadId(); if (threadId == null ) { return ; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null ) { log.error("Can't update lock " + getName() + " expiration" , e); return ; } if (res) { renewExpiration(); } }); } }, internalLockLeaseTime / 3 , TimeUnit.MILLISECONDS); ee.setTimeout(task); }
核心流程 :
步骤
说明
✅ 1. 续期触发时机
每次加锁成功后,Redisson 会启动这个 renewExpiration()
方法。
✅ 2. 续期间隔
使用 internalLockLeaseTime / 3
作为定时器延迟(默认 leaseTime=30s
,则每 10s 续一次)。
✅ 3. 续期操作
调用 renewExpirationAsync()
向 Redis 发送命令(如 EXPIRE
)将锁的 TTL 重置为 leaseTime
。
✅ 4. 循环续期
续期成功后,再次调用 renewExpiration()
,形成一个递归循环 ,持续续期。
✅ 5. 安全退出
当锁被释放(unlock()
)或客户端断开时,会从 EXPIRATION_RENEWAL_MAP
中移除条目,下次 renewExpiration()
调用将直接返回,停止续期。
Redission分布式锁的MutiLock原理 TODO