IT评测·应用市场-qidao123.com

标题: 业务幂等性技术架构体系之服务幂等深入分析 [打印本页]

作者: 吴旭华    时间: 2025-1-16 16:47
标题: 业务幂等性技术架构体系之服务幂等深入分析
简单来说,服务幂等是指一个操作(或哀求)无论被执行多少次,其对体系状态的影响都是一样的,就似乎这个操作只执行了一次一样。换句话说,对于同一组输入参数,幂等的服务会给出相同的结果,并且不会改变体系的最终状态。例如:

与接口幂等的区别
特征服务幂等(Service Idempotency)接口幂等(API Idempotency)作用范围整个业务流程,大概跨越多个API接口单个API接口,专注于技术实现依赖因素详细业务逻辑,如订单处置惩罚、库存管理等HTTP方法选择,API筹划原则长期化影响涉及数据库或其他长期存储的状态更新紧张是API相应行为,不肯定涉及长期化状态示例支付服务、注册服务GET/PUT/DELETE哀求 接下来我们将分别介绍几种服务幂等的办理方案
一、防重表

对于防止数据重复提交,还有一种办理方案就是通过防重表实现。防重表的实现思路也非常简单。首先创建一张表 作为防重表,同时在该表中创建一个或多个字段的唯一索引作为防重字段,用于保证并发情况下,数据只有一条。 在向业务表中插入数据之前先向防重表插入,假如插入失败则表示是重复数据。

对于防重表的办理方案,大概有人会说为什么不使用灰心锁。灰心锁在使用的过程中也是会发生死锁的。灰心锁是 通过锁表的方式实现的。 假设如今一个用户A访问表A(锁住了表A),然后试图访问表B; 另一个用户B访问表 B(锁住了表B),然后试图访问表A。 这时对于用户A来说,由于表B已经被用户B锁住了,所以用户A必须比及用 户B释放表B才能访问。 同时对于用户B来说,由于表A已经被用户A锁住了,所以用户B必须比及用户A释放表A才 能访问。此时死锁就已经产生了。
二、select+insert防重提交

说白了,就是在插入之前,先查询一下数据库是否存在,存在了,则不插入,不存在再新增插入
对于一些后台体系,并发量并不高的情况下,对于幂等的实现非常简单,通过select+insert思想即可完成幂等控制。

 详细的,我们的业务流程总结为下图:

代码实现如下:
  1. @Override
  2. @Transactional(rollbackFor = Exception.class)
  3. public String addOrder(Order order) {
  4.     order.setCreateTime(new Date());
  5.     order.setUpdateTime(new Date());
  6.     //查询
  7.     Order orderResult = orderMapper.selectByPrimaryKey(order.getId());
  8.     Optional<Order> orderOptional = Optional.ofNullable(orderResult);
  9.     if (orderOptional.isPresent()){
  10.         return "repeat request";
  11.     }
  12.     int result = orderMapper.insert(order);
  13.     if (result != 1){
  14.         return "fail";
  15.     }
  16.     return "success";
  17. }
复制代码
但是我们颠末Jemeter 压测模仿100个线程,发现并不能保证幂等性

为什么呢?
我们分析,由于代码中,我们是先查询,再插入,这是两步操作(并非原子性操作),那么在这个间隙中,就会产生题目无法保证幂等性
办理方案:可以在方法上加上synchronized锁,但是这就降低体系承载的并发能力了
三、MySQL乐观锁

假设如今订单已经生成成功,那么就会涉及到扣减库存的操作。当高并发下同时扣减库存时,非常容易出现数据错误题目。

我们假如直接使用上面这段代码,会造成大量的数据不一致线程安全题目
但是假如我们加上synchronized呢?
我们用Jemeter模仿10000个线程扣减初始库存量为100000的库存,还是有题目
  1. @Service
  2. public class StockServiceImpl implements StockService {
  3.     @Autowired
  4.     private StockMapper stockMapper;
  5.     @Override
  6.     @Transactional(rollbackFor = Exception.class)
  7.     public synchronized int lessInventory(String goodsId, int num) {
  8.         return stockMapper.lessInventory(goodsId, num);
  9.     }
  10. }
复制代码
当前已经在在方法上添加了synchronized,对当前方法对象举行了锁定。 通过Jemeter,模仿一万并发对其举行 访问。可以发现,仍然出现了脏数据。 

该题目的产生缘故原由,就在于在方法上synchronized搭配使用了@Transactional。首先synchronized锁定的是当 前方法对象,而@Transactional会对当前方法举行AOP增强,动态代理出一个代理对象,在方法执行前开启事 务,执行后提交事件。 所以synchronized和@Transactional其实操作的是两个不同的对象,换句话说就是 @Transactional的事件操作并不在synchronized锁定范围之内。
假设A线程执行完扣减库存方法,会释放锁并提交事件。但A线程释放锁但还没提交事件前,B线程执行扣减库存方 法,B线程执行后,和A线程一起提交事件,就出现了线程安全题目,造成脏数据的出现。
那我们怎样通过MySQL乐观锁保证幂等呢,请看下文分析
MySQL乐观锁保证幂等

MySQL乐观锁是基于数据库完成分布式锁的一种实现,实现的方式有两种:基于版本号、基于条件。但是实现思想都是基于MySQL的行锁思想来实现的。

 基于版本号实现

1)修改数据表,添加version字段,默认值为0
2)修改StockMapper添加基于版本修改数据方法
  1. @Update("update tb_stock set amount=amount‐#{num},version=version+1 where goods_id=#{goodsId} and version=#{version}")
  2. int lessInventoryByVersion(@Param("goodsId") String goodsId,@Param("num") int num,@Param("version") int version);
复制代码
 3)测试模仿一万并发举行数据修改,此时可以发现当前版本号从0变为1,且库存量精确。

基于条件实现

通过版本号控制是一种非常常见的方式,适合于大多数场景。但如今库存扣减的场景来说,通过版本号控制就是多人并发访问购买时,查询时表现可以购买,但最终只有一个人能成功,这也是不可以的。其实最终只要商品库存不 发生超卖就可以。那此时就可以通过条件来举行控制。
1)修改StockMapper:
  1. @Update("update tb_stock set amount=amount‐#{num} where goods_id=#{goodsId} and amount‐# {num}>=0")
  2. int lessInventoryByVersionOut(@Param("goodsId") String goodsId,@Param("num") int num);
复制代码
2)修改StockController:
  1. @PutMapping("/lessInventoryByVersionOut/{goodsId}/{num}")
  2. public String lessInventoryByVersionOut(@PathVariable("goodsId") String goodsId,
  3.                                     @PathVariable("num") Integer num) throws InterruptedException {
  4.         System.out.println("reduce stock");
  5.         int result = stockService.reduceStockNoLock(goodsId, num);
  6.         if (result != 1){
  7.             return "reduce stock fail";
  8.         }
  9.         //延迟
  10.         TimeUnit.SECONDS.sleep(6000);
  11.         return "reduce stock success";
  12. }
复制代码
3)通过jemeter举行测试,可以发现当多人并发扣减库存时,控制住了商品超卖的题目 
四、zookeeper分布式锁

实现原理

对于分布式锁的实现,zookeeper天然携带的一些特性能够很完美的实现分布式锁。其内部紧张是使用znode节点 特性和watch机制完成。
在zookeeper中节点会分为四类,分别是:

watch监听机制紧张用于监听节点状态变更,用于后续事件触发,假设当B节点监听A节点时,一旦A节点发生修 改、删除、子节点列表发生变更等事件,B节点则会收到A节点改变的通知,接着完成其他额外事情。

实现原理如下:
其实现思想是当某个线程要对方法加锁时,首先会在zookeeper中创建一个与当前方法对应的父节点,接着每个要 获取当前方法的锁的线程,都会在父节点下创建一个暂时有序节点,由于节点序号是递增的,所以后续要获取锁的 线程在zookeeper中的序号也是逐次递增的。根据这个特性,当前序号最小的节点肯定是首先要获取锁的线程,因 此可以规定序号最小的节点获得锁。所以,每个线程再要获取锁时,可以判定本身的节点序号是否是最小的,假如 是则获取到锁。当释放锁时,只需将本身的暂时有序节点删除即可。

根据上图,在并发下,每个线程都会在对应方法节点下创建属于本身的暂时节点,且每个节点都是暂时且有序的。 那么zookeeper又是怎样有序的将锁分配给不同线程呢? 这里就应用到了watch监听机制。每当添加一个新的暂时 节点时,其都会基于watcher机制监听着它本身的前一个节点等待前一个节点的通知,当前一个节点删除时,就轮 到它来持有锁了。然后依次类推。

分布式锁的实现

低效锁思想&实现

在通过zookeeper实现分布式锁时,有另外一种实现的写法,这种也黑白常常见的,但是它的效率并不高,此处可以先对这种实现方式举行探究。

此种实现方式,只会存在一个锁节点。当创建锁节点时,假如锁节点不存在,则创建成功,代表当火线程获取到 锁,假如创建锁节点失败,代表已经有其他线程获取到锁,则该线程会监听锁节点的释放。当锁节点释放后,则继 续尝试创建锁节点加锁。
1)在zookeeper_common中创建抽象类AbstractLock
  1. public abstract class AbstractLock {
  2.     //zookeeper服务器地址
  3.     public static final String ZK_SERVER_ADDR="192.168.200.131:2181";
  4.     //zookeeper超时时间
  5.     public static final int CONNECTION_TIME_OUT=30000;
  6.     public static final int SESSION_TIME_OUT=30000;
  7.     //创建zk客户端
  8.     protected ZkClient zkClient = new ZkClient(ZK_SERVER_ADDR,SESSION_TIME_OUT,CONNECTION_TIME_OUT);
  9.     /**
  10.      * 获取锁
  11.      * @return
  12.      */
  13.     public abstract boolean tryLock();
  14.     /**
  15.      * 等待加锁
  16.      */
  17.     public abstract void waitLock();
  18.     /**
  19.      * 释放锁
  20.      */
  21.     public abstract void releaseLock();
  22.     public void getLock() {
  23.         String threadName = Thread.currentThread().getName();
  24.         if (tryLock()) {
  25.             System.out.println(threadName+":   获取锁成功");
  26.         }else {
  27.             System.out.println(threadName+":   获取锁失败,等待中");
  28.             //等待锁
  29.             waitLock();
  30.             getLock();
  31.         }
  32.     }
  33. }
复制代码
2)创建LowLock
  1. public class LowLock extends AbstractLock{
  2.     private static final String LOCK_NODE_NAME = "/lock_node";
  3.     private CountDownLatch countDownLatch;
  4.     @Override
  5.     public boolean tryLock() {
  6.         if (zkClient == null){
  7.             return false;
  8.         }
  9.         try {
  10.             zkClient.createEphemeral(LOCK_NODE_NAME);
  11.             return true;
  12.         } catch (Exception e) {
  13.             return false;
  14.         }
  15.     }
  16.     @Override
  17.     public void waitLock() {
  18.         IZkDataListener zkDataListener = new IZkDataListener() {
  19.             //节点被改变时触发
  20.             @Override
  21.             public void handleDataChange(String dataPath, Object data) throws Exception {
  22.             }
  23.             //节点被删除时触发
  24.             @Override
  25.             public void handleDataDeleted(String dataPath) throws Exception {
  26.                 if (countDownLatch != null){
  27.                     countDownLatch.countDown();
  28.                 }
  29.             }
  30.         };
  31.         //注册监听器
  32.         zkClient.subscribeDataChanges(LOCK_NODE_NAME,zkDataListener);
  33.         //如果锁节点存在,阻塞当前线程
  34.         if (zkClient.exists(LOCK_NODE_NAME)){
  35.             countDownLatch = new CountDownLatch(1);
  36.             try {
  37.                 countDownLatch.await();
  38.                 System.out.println(Thread.currentThread().getName()+":   等待获取锁");
  39.             } catch (InterruptedException e) {
  40.             }
  41.         }
  42.         //删除监听
  43.         zkClient.unsubscribeDataChanges(LOCK_NODE_NAME,zkDataListener);
  44.     }
  45.     @Override
  46.     public void releaseLock() {
  47.         zkClient.delete(LOCK_NODE_NAME);
  48.         zkClient.close();
  49.         System.out.println(Thread.currentThread().getName()+":   释放锁");
  50.     }
  51. }
复制代码
 3)创建测试类
  1. public class LockTest {
  2.     public static void main(String[] args) {
  3.         //模拟多个10个客户端
  4.         for (int i=0;i<10;i++) {
  5.             Thread thread = new Thread(new LockRunnable());
  6.             thread.start();
  7.         }
  8.     }
  9.     private static class LockRunnable implements Runnable {
  10.         @Override
  11.         public void run() {
  12.             AbstractLock abstractLock = new LowLock();
  13.             abstractLock.getLock();
  14.             try {
  15.                 TimeUnit.SECONDS.sleep(5);
  16.             } catch (InterruptedException e) {
  17.                 e.printStackTrace();
  18.             }
  19.             abstractLock.releaseLock();
  20.         }
  21.     }
  22. }
复制代码
4)颠末测试可以发现,当一个线程获取到锁之后,其他线程都会监听这把锁进入到等待状态,一旦持有锁的线程释放锁后,其他线程则都会监听到,并竞争这把锁。
这种方案的低效点就在于,只有一个锁节点,其他线程都会监听同一个锁节点,一旦锁节点释放后,其他线程都会 收到通知,然后竞争获取锁节点。这种大量的通知操作会严峻降低zookeeper性能,对于这种由于一个被watch的 znode节点的变化,而造成大量的通知操作,叫做羊群效应。
高效锁思想&实现

为了避免羊群效应的出现,业界内广泛的办理方案就是,让获取锁的线程产生排队,后一个监听前一个,依次排 序。保举使用这种方式实现分布式锁

按照上述流程会在根节点下为每一个等待获取锁的线程创建一个对应的暂时有序节点,序号最小的节点会持有锁, 并且后一个节点只监听其前面的一个节点,从而可以让获取锁的过程有序且高效
1)定义HighLock类
  1. public class HighLock extends AbstractLock{
  2.     private static final String PARENT_NODE_PATH="/high_lock";
  3.     //当前节点路径
  4.     private String currentNodePath;
  5.     //前一个节点的路径
  6.     private String preNodePath;
  7.     private CountDownLatch countDownLatch;
  8.     @Override
  9.     public boolean tryLock() {
  10.         //判断父节点是否存在
  11.         if (!zkClient.exists(PARENT_NODE_PATH)){
  12.             //不存在
  13.             zkClient.createPersistent(PARENT_NODE_PATH);
  14.         }
  15.         //创建第一个临时有序子节点
  16.         if (currentNodePath == null || "".equals(currentNodePath)){
  17.             //根节点下没有节点信息,将当前节点作为第一个子节点,类型:临时有序
  18.             currentNodePath = zkClient.createEphemeralSequential(PARENT_NODE_PATH+"/","lock");
  19.         }
  20.         //不是第一个子节点,获取父节点下所有子节点
  21.         List<String> childrenNodeList = zkClient.getChildren(PARENT_NODE_PATH);
  22.         //子节点升序排序
  23.         Collections.sort(childrenNodeList);
  24.         //判断是否加锁成功
  25.         if (currentNodePath.equals(PARENT_NODE_PATH+"/"+childrenNodeList.get(0))){
  26.             //当前节点是序号最小的节点
  27.             return true;
  28.         }else {
  29.             //当前节点不是序号最小的节点,获取其前面的节点名称,并赋值
  30.             int length = PARENT_NODE_PATH.length();
  31.             int currentNodeNumber = Collections.binarySearch(childrenNodeList, currentNodePath.substring(length + 1));
  32.             preNodePath = PARENT_NODE_PATH+"/"+childrenNodeList.get(currentNodeNumber-1);
  33.         }
  34.         return false;
  35.     }
  36.     @Override
  37.     public void waitLock() {
  38.         IZkDataListener zkDataListener = new IZkDataListener() {
  39.             @Override
  40.             public void handleDataChange(String dataPath, Object data) throws Exception {
  41.             }
  42.             @Override
  43.             public void handleDataDeleted(String dataPath) throws Exception {
  44.                 if (countDownLatch != null){
  45.                     countDownLatch.countDown();
  46.                 }
  47.             }
  48.         };
  49.         //监听前一个节点的改变
  50.         zkClient.subscribeDataChanges(preNodePath,zkDataListener);
  51.         if (zkClient.exists(preNodePath)){
  52.             countDownLatch = new CountDownLatch(1);
  53.             try {
  54.                 countDownLatch.await();
  55.             } catch (InterruptedException e) {
  56.             }
  57.         }
  58.         zkClient.unsubscribeDataChanges(preNodePath,zkDataListener);
  59.     }
  60.     @Override
  61.     public void releaseLock() {
  62.         zkClient.delete(currentNodePath);
  63.         zkClient.close();
  64.     }
  65. }
复制代码
 2)根据结果可以看到,每一个线程都会有本身的节点信息,并且都会有对应的序号。序号最小的节点首先获取到锁,然后依次类推
五、redis分布式锁

详细内容请参考这篇博客
高性能分布式缓存Redis-分布式锁与布隆过滤器_布隆过滤器 分布式锁-CSDN博客

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4