概述
单机架构下,一个进程中的多个线程竞争同一共享资源时,通常使用 JVM 级别的锁即可保证互斥,以对商品下单并扣库存为例:- public String deductStock() {
- synchronized (this){
- // 获取库存值
- int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
- if (stock > 0) {
- int realStock = stock - 1;
- stringRedisTemplate.opsForValue().set("stock", realStock + "")
- System.out.println("扣减成功,剩余库存:" + realStock);
- } else {
- System.out.println("扣减失败,库存不足");
- }
- }
- return "end";
- }
复制代码 然而,当使用分布式架构时,这种方式就不管用了,因为 JVM 锁只能控制自家应用,其他机器的应用时管不了的,这时候分布式锁就派上用场了,它能保证分布式系统下不同进程对共享资源访问的互斥性
案例分析
下面对使用 Redis 实现分布式锁的案例进行分析:
1. Case1
使用 Redis 中的 setnx() 设计一个入门级别的分布式锁- public String deductStock1() {
- String localKey = "lock:product:0001";
- Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true");
- if (!aBoolean){
- return "当前系统繁忙";
- }
- try {
- // 获取库存值
- int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
- if (stock > 0) {
- int realStock = stock - 1;
- stringRedisTemplate.opsForValue().set("stock", realStock + "");
- System.out.println("扣减成功,剩余库存:" + realStock);
- } else {
- System.out.println("扣减失败,库存不足");
- }
- } finally {
- // 即使中间的任何一处逻辑抛出异常,也能保证锁释放
- stringRedisTemplate.delete(localKey);
- }
- return "end";
- }
复制代码 存在的问题:锁没有释放,机器却宕机了,这时候其他机器将无法获取锁
2. Case2
设置一个过期时间,解决 Case1 中存在的宕机没有释放锁的问题- public String deductStock2() {
- String localKey = "lock:product:0001";
- Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true");
- stringRedisTemplate.expire(localKey,10,TimeUnit.SECONDS);
- if (!aBoolean){
- return "当前系统繁忙";
- }
- try {
- // 获取库存值
- int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
- if (stock > 0) {
- int realStock = stock - 1;
- stringRedisTemplate.opsForValue().set("stock", realStock + "");
- System.out.println("扣减成功,剩余库存:" + realStock);
- } else {
- System.out.println("扣减失败,库存不足");
- }
- } finally {
- stringRedisTemplate.delete(localKey);
- }
- return "end";
- }
复制代码 存在的问题:有可能还没有执行到 expire() 就宕机了,没有保证原子性
3. Case3
在加锁时就设置超时时间,保证加锁和设置超时时间是原子操作- public String deductStock3() {
- String localKey = "lock:product:0001";
- // 这条命令能够保证原子性
- Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true",10,TimeUnit.SECONDS);
- if (!aBoolean){
- return "当前系统繁忙";
- }
- try {
- // 获取库存值
- int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
- if (stock > 0) {
- int realStock = stock - 1;
- stringRedisTemplate.opsForValue().set("stock", realStock + "");
- System.out.println("扣减成功,剩余库存:" + realStock);
- } else {
- System.out.println("扣减失败,库存不足");
- }
- } finally {
- stringRedisTemplate.delete(localKey);
- }
- return "end";
- }
复制代码 存在问题:如果系统并发量不是特别的大,那么问题不大,但如果并发量很大,就会出现严重的并发问题:
- 假设线程 A 的时间超过了超时时间,锁失效了,此时该线程 A 还没有执行 delete 方法
- 线程 B 这时候加锁成功了,与此同时线程 A 执行了 delete 方法,但是这时候线程 A 释放的锁是线程 B 的
- 于是极端情况下就会出现:线程 A 释放线程 B 的锁,B 释放 C 的,C 释放 D 的 ......
4. Case4
Case3 存在的问题的根本原因就是在执行 delete 方法的时候,自己的锁被其他的线程释放了,所以解决办法就是给每个线程生成一个唯一 ID,在最后释放锁的时候判断是否是自己的锁,如果是自己的才释放- public String deductStock4() {
- String localKey = "lock:product:0001";
- String uuid = UUID.randomUUID().toString();
- // 这条命令能够保证原子性
- Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, uuid,10,TimeUnit.SECONDS);
- if (!aBoolean){
- return "当前系统繁忙";
- }
- try {
- // 获取库存值
- int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
- if (stock > 0) {
- int realStock = stock - 1;
- stringRedisTemplate.opsForValue().set("stock", realStock + "");
- System.out.println("扣减成功,剩余库存:" + realStock);
- } else {
- System.out.println("扣减失败,库存不足");
- }
- } finally {
- if (uuid.equals(stringRedisTemplate.opsForValue().get(localKey))){
- stringRedisTemplate.delete(localKey);
- }
- }
- return "end";
- }
复制代码 存在问题:存在原子性问题,问题代码如下:- if (uuid.equals(stringRedisTemplate.opsForValue().get(localKey))){
- stringRedisTemplate.delete(localKey);
- }
复制代码 有可能出现当前线程执行完 if 判断却还没执行 delete 操作的时候当前锁过期了,于是又会出现当前线程释放了其他线程的锁的情况
5. Case5
对于 Case4 的问题,本质是 「判断是不是当前线程加的锁」和「释放锁」不是一个原子操作,可以用 Lua 脚本代替,Redis 会将整个脚本作为一个整体执行- String redisScript = "
- if redis.call('get',KEYS[1]) == ARGV[1] then
- return redis.call('del',KEYS[1])
- else
- return 0
- end;"
- public String deductStock5() {
- String localKey = "lock:product:0001";
- String uuid = UUID.randomUUID().toString();
- // 这条命令能够保证原子性
- Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, uuid,10,TimeUnit.SECONDS);
- if (!aBoolean){
- return "当前系统繁忙";
- }
- try {
- // 获取库存值
- int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
- if (stock > 0) {
- int realStock = stock - 1;
- stringRedisTemplate.opsForValue().set("stock", realStock + "");
- System.out.println("扣减成功,剩余库存:" + realStock);
- } else {
- System.out.println("扣减失败,库存不足");
- }
- } finally {
- redisTemplate.execute(redisScript, Arrays.asList(localKey), uuid);
- }
- return "end";
- }
复制代码 也可以使用锁续命的方式解决,即创建一个守护线程,每过一段时间,判断业务的主线程有没有结束(是否还加着锁),如果还加着锁,将锁的超时时间重新设置- public String deductStock5() {
- String localKey = "lock:product:0001";
- String uuid = UUID.randomUUID().toString();
- // 这条命令能够保证原子性
- Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, uuid,10,TimeUnit.SECONDS);
- if (!aBoolean){
- return "当前系统繁忙";
- } else {
- // 续命
- Thread demo = new Thread(new Runnable() {
- @Override
- public void run() {
- while (true) {
- Boolean expire = redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
- // 有可能已经主动删除key,不需要在续命
- if(!expire){
- return;
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- });
- demo.setDaemon(true);
- demo.start();
- }
- try {
- // 获取库存值
- int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
- if (stock > 0) {
- int realStock = stock - 1;
- stringRedisTemplate.opsForValue().set("stock", realStock + "");
- System.out.println("扣减成功,剩余库存:" + realStock);
- } else {
- System.out.println("扣减失败,库存不足");
- }
- } finally {
- if (uuid.equals(stringRedisTemplate.opsForValue().get(localKey))){
- stringRedisTemplate.delete(localKey);
- }
- }
- return "end";
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |