一、分布式锁介绍
1.1 为什么必要分布式锁
在单机部署的系统中,使用线程锁来办理高并发的问题,多线程访问共享变量的问题到达数据一致性,如使用synchornized、ReentrantLock等;
但是在后端集群部署的系统中,步调在不同的JVM虚拟机中运行,且由于synchronized或ReentrantLock都只能保证同一个JVM进程中保证有用,所以这时就必要使用分布式锁了。
1.2 什么是分布式锁?
分布式锁实在就是,控制分布式系统不同进程共同访问共享资源的 一种锁的实现。假如不同的系统或同一个系统的不同主机之间共享 了某个临界资源,往往必要互斥来防止彼此干扰,以保证一致性。
1.3 分布式锁特点
二. 传统锁回顾
2.1商品超卖演示
- create table mall_stock (
- id int primary key auto_increment COMMENT '库存ID',
- product_id varchar(20) not null COMMENT '商品编号',
- sock_id int not null default 1 COMMENT '仓库ID',
- count int not null default 0 COMMENT '数量'
- )
复制代码 搭建环境
- 压测
下载地址:https://jmeter.apache.org/download_jmeter.cgi
双击jemter.bat运行软件
2.2 JVM锁演示
- 在方法上添加同步关键字
- ReentrantLock对象使用
- ReentrantLock lock=new ReentrantLock();
复制代码
2.3 JVM锁失效的三种环境
2.3.1 多例模式
业务对象或锁对象是多例的环境下
原因:业务中一般使用的lock对象锁,lock锁的范围是针对同一个对象内里不同的线程,也就是说,jvm锁是对象锁,对象之间锁不共用
- @Scope("singleton") // prototype 原型模式(多例)singleton 单例模式(单例)
- public class LockController
复制代码 2.3.2 事务
在使用了spring事物注解的环境下(不单是jvm锁,大部分锁实现都会出现这个问题)
原因:spring事务是基于aop的方式实现的,是包裹着整个方法的(包括锁),事务不在锁的范围内,很轻易出现并发执行的时候,a方法的事务还没提交上去,b事务就读了数据库的旧值。
2.3.3 分布式集群
原因:服务都不一样了,锁和对象自然也不一样(就和第一个环境下的环境一样)
办理方法:利用mysql的排他锁机制,将所有业务sql会合成一条sql(以上三种问题都能办理,但是不灵活,只能在业务答应的环境下使用)
总结
综上所述,我们可以发现jvm锁只适合在单体项目中并且业务需求简朴的环境下使用,所以有条件照旧使用分布式锁吧。
三. 基于mysql实现分布式锁
3.1 一条SQL
update ,insert,delete 写操纵本身带排他锁
长处:一个sgl语句:更新数量时判断办理:办理了上面三个锁失效的问题
缺点:
1 锁范围问题:是表级锁照旧行级锁
一个sgl语句:更新数量时判断办理:办理了上面三个锁失效的问题,但是它是表级锁,这种是不能接受的,我要买多种商品结果你把表锁了,整张表都不能并发了、性能肯定就是不行的,最好使用行级锁。
mysql悲观锁中使用行级锁
1,锁的查询或者跟新条件必须是索引字段
2,查询或者更新条件必须是具体值
2.同一个商品有多条库存记载:堆栈有多个、商品ID是一个,可以根据算法减库存、一个sql语句做不到
3.无法记载库存变化前后的状态
Mysql锁的区分
参考:https://blog.csdn.net/name_sakura/article/details/129286136
3.2 悲观锁
悲观锁认为被它保护的数据是极其不安全的,每时每刻都有可能被改动,一个事务拿到悲观锁后,其他任何事务都不能对该数据举行修改,只能等候锁被释放才可以执行。
数据库中的行锁,表锁,读锁,写锁均为悲观锁。
- select .... for update
- service
复制代码- @Transactional
- public Boolean reduceStock(String productId, int count) {
- // 查询商品库存
- Stock stock = stockMapper.getStockByProductId(productId);
- if (stock!= null && stock.getCount() >= count) {
- // 减少库存
- stock.setCount(stock.getCount() - count);
- this.updateById(stock); // 更新库存
- return true; // 减库存成功
- }
- return false;
- }
复制代码 mapper.xml
- <select id="getStockByProductId" resultType="com.syh.model.entity.Stock">
- select <include refid="Base_Column_List"/>
- from mall_stock
- where product_id = #{productId} for update
- </select>
复制代码 问题:
1,性能问题
2,死锁问题
3,库存操纵要统一
3.3 乐观锁
乐观锁认为数据的变动不会太频繁。
乐观锁通常是通过在表中增加一个版本(version)或时间戳(timestamp)来实现,此中,版本最为常用。
事务在从数据库中取数据时,会将该数据的版本也取出来(v1),当事务对数据变动完毕想要将其更新到表中时,会将之前取出的版本v1与数据中最新的版本v2相对比,假如v1=v2,那么阐明在数据变动期间,没有其他事务对数据举行修改,此时,就答应事务对表中的数据举行修改,并且修改时version会加1,以此来表明数据已被变动。
假如,v1不即是v2,那么阐明数据变动期间,数据被其他事务改动了,此时不答应数据更新到表中,一般的处理办法是关照用户让其重新操纵。不同于悲观锁,乐观锁通常是由开发者实现的。(CAS机制:Compare And Swap 比力并交换)
给表添加version字段
service方法
- public Boolean reduceStock(String productId, int count) {
- // 查询商品库存
- Stock stock = stockMapper.selectOne(new QueryWrapper<Stock>().eq("product_id", productId));
- if (stock != null && count >= 0) {
- stock.setCount(stock.getCount() - count);//改数量
- Integer version = stock.getVersion(); // 原版本号
- stock.setVersion(version + 1);//改版本号
- if(!this.update(stock,new QueryWrapper<Stock>().eq("product_id", productId)
- .eq("version", version))){
- // 更新库存 更新失败重试
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- reduceStock(productId, count);
- }
- }
- return false;
- }
复制代码 问题:
1,高并发环境下,性能极低
2,读写分离环境下导致乐观锁不可靠
3.4 总结
性能:一个sql>悲观锁>jvm锁>乐观锁
假如寻求极致性能、业务场景简朴并且不必要记载数据前后变化的环境下。
优先选择:一个sql
假如写并发量较低(多读),争抢不是很激烈的环境下优先选择:乐观锁
假如写并发量较高,一般会经常辩论,此时选择乐观锁的话,会导致业务代码不间断的重试。
优先选择:mysql悲观锁
不推荐jvm当地锁。
基于mysql实现分布式锁
不管是jvm锁照旧mysql锁,为了保证线程的并发安全,都提供了悲观独占排他锁。所以独占排他也是 分布式锁的基本要求。 可以利用唯一键索引不能重复插入的特点实现。计划表如下:
- CREATE TABLE `db_lock` (
- `id` bigint(20) NOT NULL AUTO_INCREMENT,
- `lock_name` varchar(50) NOT NULL COMMENT '锁名',
- `class_name` varchar(100) DEFAULT NULL COMMENT '类名',
- `method_name` varchar(50) DEFAULT NULL COMMENT '方法名',
- `server_name` varchar(50) DEFAULT NULL COMMENT '服务器ip',
- `thread_name` varchar(50) DEFAULT NULL COMMENT '线程名',
- `create_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP
- COMMENT '获取锁时间',
- `desc` varchar(100) DEFAULT NULL COMMENT '描述',
- PRIMARY KEY (`id`),
- UNIQUE KEY `idx_unique` (`lock_name`)
- ) ENGINE=InnoDB AUTO_INCREMENT=1332899824461455363 DEFAULT CHARSET=utf8;
复制代码 Lock实体类:
- @Data
- @AllArgsConstructor
- @NoArgsConstructor
- @TableName("db_lock")
- public class Lock {
- private Long id;
- private String lockName;
- private String className;
- private String methodName;
- private String serverName;
- private String threadName;
- private Date createTime;
- private String desc;
- }
复制代码 LockMapper接口:
- public interface LockMapper extends BaseMapper<Lock> {
- }
复制代码 2.1. 基本思路
synchronized关键字和ReetrantLock锁都是独占排他锁,即多个线程争抢一个资源时,同一时刻只有 一个线程可以抢占该资源,其他线程只能阻塞等候,直到占有资源的线程释放该资源。
- 线程同时获取锁(insert)
- 获取成功,执行业务逻辑,执行完成释放锁(delete)
- 其他线程等候重试
2.2. 代码实现
改造StockService:
- @Service
-
- public class StockService {
- @Autowired
- private StockMapper stockMapper;
- @Autowired
- private LockMapper lockMapper;
- /**
- * 数据库分布式锁
- */
- public void checkAndLock() {
- // 加锁
- Lock lock = new Lock(null, "lock", this.getClass().getName(), new
- Date(), null);
- try {
- this.lockMapper.insert(lock);
- } catch (Exception ex) {
- // 获取锁失败,则重试
- try {
- Thread.sleep(50);
- this.checkAndLock();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- // 先查询库存是否充足
- Stock stock = this.stockMapper.selectById(1L);
- // 再减库存
- if (stock != null && stock.getCount() > 0){
- stock.setCount(stock.getCount() - 1);
- this.stockMapper.updateById(stock);
- }
- // 释放锁
- this.lockMapper.deleteById(lock.getId());
- }
- }
复制代码 加锁:
- // 加锁
- Lock lock = new Lock(null, "lock", this.getClass().getName(), new Date(), null);
- try {
- this.lockMapper.insert(lock);
- } catch (Exception ex) {
- // 获取锁失败,则重试
- try {
- Thread.sleep(50);
- this.checkAndLock();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
复制代码 解锁:
- // 释放锁
- this.lockMapper.deleteById(lock.getId());
复制代码 2.3. 缺陷及办理方案
- 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
办理方案:给锁数据库 搭建主备
- 这把锁没有失效时间,一旦解锁操纵失败,就会导致锁记载一直在数据库中,其他线程无法再得到到锁。
办理方案:只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
- 这把锁是非重入的,同一个线程在没有释放锁之前无法再次得到该锁。由于数据中数据已经存在了。
办理方案:记载获取锁的主机信息和线程信息,假如雷同线程要获取锁,直接重入。
- 受制于数据库性能,并发能力有限。
办理方案:无法办理。
四. 基于Redis实现分布式锁
4.1 基本实现
借助于redis中的下令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发 送setnx下令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false)。
● 1. 多个客户端同时获取锁(setnx)
● 2. 获取成功,执行业务逻辑,执行完成释放锁(del)
● 3. 其他客户端等候重试
- @Service
- public class StockService {
- @Autowired
- private StockMapper stockMapper;
- @Autowired
- private LockMapper lockMapper;
- @Autowired
- private StringRedisTemplate redisTemplate;
- public void checkAndLock() {
- // 加锁,获取锁失败重试
- while (!this.redisTemplate.opsForValue().setIfAbsent("lock","xxx")){
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- // 先查询库存是否充足
- Stock stock = this.stockMapper.selectById(1L);
- // 再减库存
- if (stock != null && stock.getCount() > 0){
- stock.setCount(stock.getCount() - 1);
- this.stockMapper.updateById(stock);
- }
- // 释放锁
- this.redisTemplate.delete("lock");
- }
- }
复制代码 4.2 防死锁
办理:给锁设置逾期时间,自动释放锁。 设置逾期时间两种方式:
- 通过expire设置逾期时间(缺乏原子性:假如在setnx和expire之间出现异常,锁也无法释放)
- 使用set指令设置逾期时间:set key value ex 3 nx(既到达setnx的结果,又设置了逾期时间)
4.3 防误删
问题:可能会释放其他服务器的锁。 场景:假如业务逻辑的执行时间是7s。执行流程如下
- index1业务逻辑没执行完,3秒后锁被自动释放。
- index2获取到锁,执行业务逻辑,3秒后锁被自动释放。
- index3获取到锁,执行业务逻辑
- index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只 执行1s就被别人释放。 终极即是没锁的环境。
办理:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否本身的 锁
问题:删除操纵缺乏原子性。 场景:
- index1执行删除时,查询到的lock值确实和uuid相等
- index1执行删除前,lock刚好逾期时间已到,被redis自动释放
- index2获取了lock 4. index1执行删除,此时会把index2的lock删除
办理方案:没有一个下令可以同时做到判断 + 删除,所有只能通过其他方式实现(LUA脚本)
4.4 使用lua保证删除原子性
lua脚本入门
Lua 是一种轻量小巧的脚本语言,用尺度C语言编写并以源代码形式开放, 其计划目标是为了嵌入应用步调中,从而为应用步调提供灵活的扩展和定制功能。
Lua 是巴西里约热内卢天主教大学(Pontifical Catholic University of Rio de Janeiro)里的一个研究小组于 1993 年开发的,该小组成员有:Roberto Ierusalimschy、Waldemar Celes 和 Luiz Henrique de Figueiredo。
Redis 操纵lua脚本
下令:EVAL下令
下令格式:EVAL script numkeys key [key …] arg [arg …]
script参数是一段 Lua5.1 脚本步调。脚本不必(也不应该[^1])界说为一个 Lua 函数
numkeys指定后续参数有几个key,即:key [key …]中key的个数。如没有key,则为0
key [key …] 从 EVAL 的第三个参数开始算起,表现在脚本中所用到的那些 Redis 键(key)。在Lua脚本中通过KEYS[1], KEYS[2]获取。
arg [arg …] 附加参数。在Lua脚本中通过ARGV[1],ARGV[2]获取。
删除LUA脚本:
- if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',
- KEYS[1]) else return 0 end
复制代码 代码实现:
- public void checkAndLock() { // 加锁,获取锁失败重试 String uuid = UUID.randomUUID().toString(); while (!this.redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS)){ try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } // 先查询库存是否充足 Stock stock = this.stockMapper.selectById(1L); // 再减库存 if (stock != null && stock.getCount() > 0){ stock.setCount(stock.getCount() - 1); this.stockMapper.updateById(stock); } // 释放锁 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',
- KEYS[1]) else return 0 end
- "; this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList("lock"), uuid);}
复制代码 五. 使用Redisson实现分布式锁
导包:
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson-spring-boot-starter</artifactId>
- <version>3.24.3</version>
- </dependency>
复制代码 设置:
- package com.syh.config;
- import org.redisson.Redisson;
- import org.redisson.api.RedissonClient;
- import org.redisson.codec.JsonJacksonCodec;
- import org.redisson.config.Config;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class RedissionConfig {
- @Value("${spring.data.redis.host}")
- private String redisHost;
- @Value("${spring.data.redis.password}")
- private String password;
- @Value("${spring.data.redis.port}")
- private int port;
- @Bean
- public RedissonClient getRedisson() {
- System.out.println(redisHost+":"+port);
- Config config = new Config();
- config.useSingleServer().
- setAddress("redis://" + redisHost + ":" + port).
- setPassword(password);
- config.setCodec(new JsonJacksonCodec());
- return Redisson.create(config);
- }
- }
复制代码 使用:
- public Boolean reduceStock(String productId, int count) {
- RLock rLock = redissonClient.getLock("lock");
- try {
- boolean isLocked = rLock.tryLock(3, TimeUnit.SECONDS);
- if (isLocked) {
- // TODO
- // Reids 查询库存
- String stock = redisTemplate.opsForValue().get("count");
- if (stock != null && Integer.parseInt(stock) >= count) {
- redisTemplate.opsForValue().set("count", String.valueOf(Integer.parseInt(stock) - count));
- }
- }
- } catch (Exception e) {
- }finally {
- if (rLock.isHeldByCurrentThread()) {
- rLock.unlock();
- }
- }
- return false;
- }
复制代码 六. 基于zookeeper实现分布式锁
方法概述
ZooKeeper 提供了一种可靠的机制来实现分布式锁,这有助于办理分布式系统中的并发控制问题。为了确保多个节点之间的操纵一致性,可以通过创建暂时顺序节点并利用其唯一性和有序性特点来构建锁定逻辑。
当应用步调哀求获取锁时,会在指定路径下创建一个带有特定前缀的暂时顺序节点;随后通过检查当前所创建节点是否是最小编号的那个节点(即第一个),假如是,则认为成功得到了锁;假如不是,则监听比本身序号小一位的节点变化事件,在前任节点消失之后再次尝试获取锁直到成为最小编号为止。
对于异常状况如网络分区或者服务器崩溃等环境下的处理也非常紧张。例如连接断开、会话逾期等问题都必要被妥善思量以保障锁的安全性与可靠性。别的,设置合理的超时时间可以帮助预防潜在的死锁征象发生。
示例代码
- import org.apache.zookeeper.CreateMode;
- import org.apache.zookeeper.KeeperException;
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.ZooDefs.Ids;
- import org.apache.zookeeper.ZooKeeper;
- public class SimpleDistributedLock {
- private static final String LOCK_ROOT_PATH = "/locks";
-
- public void acquireLock(ZooKeeper zk, String lockName) throws KeeperException, InterruptedException {
- String lockPathPrefix = LOCK_ROOT_PATH + "/" + lockName + "-";
-
- // Create ephemeral sequential node.
- String ourLockNode = zk.create(lockPathPrefix, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
-
- while (true){
- List<String> childrenNodes = zk.getChildren(LOCK_ROOT_PATH, false);
- Collections.sort(childrenNodes);
- int index = childrenNodes.indexOf(zk.getState().toString());
-
- if(index == 0){
- System.out.println(Thread.currentThread().getName() +" acquired the lock.");
- break;
- }else{
- Stat predecessorStat = null;
-
- try {
- String watchOnPredecessor = LOCK_ROOT_PATH +"/"+childrenNodes.get(index-1);
-
- zk.exists(watchOnPredecessor,true);
-
- synchronized(this){
- wait();
- }
- } catch(Exception e){}
- }
- }
- }
- public void releaseLock(){
- // Release logic here...
- System.out.println(Thread.currentThread().getName()+" released the lock.");
- }
- }
复制代码 此段步调展示了如安在一个给定名称空间内竞争一把互斥锁的过程。请注意实际应用中还必要加入更多坚固性的计划以及错误恢复机制等细节。
推荐实践
思量到复杂度和维护本钱,在真实环境中发起采用成熟的第三方库比如 Curator 来简化开发工作量。Curator 对于上述提到的各种范例的锁都提供了精良的支持,并且颠末了广泛的测试验证可以或许很好地适应生产环境的需求。
思维导图
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |