ToB企服应用市场:ToB评测及商务社交产业平台

标题: 通过redis实现高性能计费处置惩罚逻辑 [打印本页]

作者: 郭卫东    时间: 2024-10-15 03:50
标题: 通过redis实现高性能计费处置惩罚逻辑
计费服务一般都是跟资金相关,所以它在体系中是非常核心的模块,要包管服务的高可用、事务一致性、高性能。服务高可用必要集群摆设,要包管事务一致性可以通过数据库来实现,但是只通过数据库却很难实现高性能的体系。
这篇文章通过使用 redis+消息队列+数据库 实现一套高性能的计费服务,接口请求扣费分为两步: 首先 使用redis扣减用户余额,扣费乐成后将数据发送到消息队列,整个扣费的第一步就完成了,这时就可以返回乐成给调用端; 第二步 数据持久化,由别的一个服务订阅消息队列,批量消费队列中的消息实现数据库持久化。
上面两个步调完成后,整个扣费过程就结束了,这里最重要的就是扣费的第一步,业务流程图如下:

通过上面的流程图可以知道,整个扣费的第一步重要分为两部分:redis扣费+kafka消费;单独使用redis扣费可以通过lua脚本实现事务,kafka消息队列也可以实现事务,但是怎样将这两步操纵实现事务就要通过编程来保障。
实现上面的处置惩罚流程计划使用一个demo项目来演示。项目使用 SpringBoot+redis+kafka+MySQL 的架构。
首先在MySQL数据库创建用到的几个表:
  1. -- 用户信息
  2. CREATE TABLE `t_user`  (
  3.   `id` int NOT NULL AUTO_INCREMENT,
  4.   `user_name` varchar(128) DEFAULT NULL,
  5.   `create_time` datetime NULL DEFAULT NULL,
  6.   `modify_time` datetime NULL DEFAULT NULL,
  7.   `api_open` tinyint NULL DEFAULT 0,
  8.   `deduct_type` varchar(16) DEFAULT NULL,
  9.   PRIMARY KEY (`id`) USING BTREE,
  10.   UNIQUE INDEX `user_name`(`user_name` ASC) USING BTREE
  11. ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
  12. -- 账户信息
  13. CREATE TABLE `t_account`  (
  14.   `user_id` int NOT NULL,
  15.   `balance` decimal(20, 2) NULL DEFAULT NULL,
  16.   `modify_time` datetime NULL DEFAULT NULL,
  17.   `create_time` datetime NULL DEFAULT NULL,
  18.   `version` bigint NULL DEFAULT 1,
  19.   `threshold` decimal(20, 2) NULL DEFAULT NULL,
  20.   PRIMARY KEY (`user_id`) USING BTREE
  21. ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = DYNAMIC;
  22. -- 订单信息
  23. CREATE TABLE `t_order`  (
  24.   `id` bigint NOT NULL,
  25.   `user_id` int NULL DEFAULT NULL,
  26.   `amount` decimal(20, 2) NULL DEFAULT NULL,
  27.   `balance` decimal(20, 2) NULL DEFAULT NULL,
  28.   `modify_time` datetime NULL DEFAULT NULL,
  29.   `create_time` datetime NULL DEFAULT NULL,
  30.   PRIMARY KEY (`id`) USING BTREE
  31. ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = DYNAMIC;
  32. -- 支付信息
  33. CREATE TABLE `t_pay`  (
  34.   `id` bigint NOT NULL,
  35.   `user_id` int NULL DEFAULT NULL,
  36.   `pay` decimal(20, 2) NULL DEFAULT NULL,
  37.   `create_time` datetime NULL DEFAULT NULL,
  38.   PRIMARY KEY (`id`) USING BTREE
  39. ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
复制代码
用户信息表 t_user 有两个字段必要阐明:
api_open 表示用户接口是否开启,如果用户账号余额不足可以把该字段设置为false拦截扣费请求;
deduct_type 这个字段表示扣费方式,ASYNC表示异步扣费,先通过redis扣减余额再异步扣减数据库,SYNC表示同步扣费,直接扣减数据库。
账户余额表 t_account 中也要阐明两个字段:
version 表示余额扣减的版本号,这个版本号只能递增,每更新一次数据库版本号就增长1,在异步扣费时,可以比力redis中数据和db中数据的差异,正常来说redis中的版本号要大于等于db中的版本号;
threshold 是余额的一个阈值,由于redis中数据和db数据的差异,当异步扣费时redis中的余额小于该值时,避免有大概存在的超扣情况,要对用户的请求限流。
第一步在redis中扣减客户的余额时,必要处置惩罚三个数据:
(1)判断客户余额信息是否存在,如果存在并且余额充足,则扣减客户余额;
(2)生成订单信息缓存,由于数据库的订单和redis扣费订单存在时间差,这里的订单信息可以用于查询订单数据;redis如果发生主从切换,订单信息也可以用于判断该订单是否要重新加载历史数据;
(3)添加订单一致性集合数据,当kafka消息被消费后会把订单ID在这个集合中删除,它重要有两个用途:订单已经在redis中扣费但是由于某些缘故原由没能在kafka中消费到,可以通过补偿逻辑将该订单入库;也可以通过这个集合中的订单条数判断redis处置惩罚数据与db处置惩罚数据的差异;
正常流程这三步的lua脚本:
  1. local rs
  2. if redis.call('exists', KEYS[1]) > 0 then
  3.   local vals = redis.call('hmget', KEYS[1], 'balance', 'threshold', 'v')
  4.   local balance = tonumber(vals[1])
  5.   local threshold = tonumber(vals[2])
  6.   local v0 = tonumber(vals[3])
  7.   if balance >= tonumber(ARGV[1]) and (balance - ARGV[1]) >= threshold then
  8.     local b = redis.call('hincrby', KEYS[1], 'balance', 0 - ARGV[1])
  9.     redis.call('hset', KEYS[1], 'ts', ARGV[2])
  10.     redis.call('set', KEYS[2], ARGV[1] .. ';' .. ARGV[2] .. ';' .. b, 'EX', 604800)
  11.     redis.call('zadd', KEYS[3], ARGV[2], ARGV[3])
  12.     local v = redis.call('hincrby', KEYS[1], 'v', 1)
  13.     rs = 'ok;' .. b .. ';' .. v
  14.   else
  15.     rs = 'fail;' .. balance .. ';' .. v0
  16.   end
  17. else
  18.   rs = 'null'
  19. end
  20. return rs
复制代码
最初redis中是不存在数据的,必要将db数据加载到redis中:
  1. if redis.call('exists', KEYS[1]) > 0 then
  2.   redis.call('hget', KEYS[1], 'balance')
  3. else
  4.   redis.call('hmset', KEYS[1], 'balance', ARGV[1], 'v', ARGV[2], 'threshold', ARGV[3])
  5. end
  6. return 'ok'
复制代码
当redis扣费乐成而后面操纵出现非常时必要回滚redis的扣费:
  1. local rs
  2. if redis.call('exists', KEYS[1]) > 0 then
  3.   local v = tonumber(redis.call('hget', KEYS[1], 'v'))
  4.   if v >= tonumber(ARGV[2]) then
  5.     rs = redis.call('hincrby', KEYS[1], 'balance', ARGV[1])
  6.     redis.call('hincrby', KEYS[1], 'v', 1)
  7.     redis.call('del', KEYS[2])
  8.     redis.call('zrem', KEYS[3], ARGV[3])
  9.   else
  10.     rs = 'fail'
  11.   end
  12. else
  13.   rs = 'null'
  14. end
  15. return rs
复制代码
上面的lua脚本涉及到多个键操纵,要包管在集群模式下命令精确执行,要让全部的键落在一个hash槽,所以在键的计划时要包管计算hash的部分相同,使用用户ID包裹在{}内就能到达目的,全部涉及到的键封装成一个工具类:
  1. /**
  2. * @Author xingo
  3. * @Date 2024/9/10
  4. */
  5. public class RedisKeyUtils {
  6.     /**
  7.      * 用户余额键
  8.      * @param userId    用户ID
  9.      * @return
  10.      */
  11.     public static String userBalanceKey(int userId) {
  12.         return ("user:balance:{" + userId + "}").intern();
  13.     }
  14.     /**
  15.      * 用户订单键
  16.      * @param userId    用户ID
  17.      * @param orderId   订单ID
  18.      * @return
  19.      */
  20.     public static String userOrderKey(int userId, long orderId) {
  21.         return ("user:order:{" + userId + "}:" + orderId).intern();
  22.     }
  23.     /**
  24.      * 保存用户订单一致性的订单集合
  25.      * 保存可能已经在redis中但不在数据库中的订单ID集合
  26.      * @param userId    用户ID
  27.      * @return
  28.      */
  29.     public static String userOrderZsetKey(int userId) {
  30.         return ("user:order:consistency:{" + userId + "}").intern();
  31.     }
  32.     /**
  33.      * 用户分布式锁键
  34.      * @param userId
  35.      * @return
  36.      */
  37.     public static String userLockKey(int userId) {
  38.         return ("user:lock:" + userId).intern();
  39.     }
  40. }
复制代码
使用springboot开发还必要将lua脚本封装成工具类:
  1. import org.springframework.data.redis.core.script.DefaultRedisScript;
  2. /**
  3. * lua脚本工具类
  4. *
  5. * @Author xingo
  6. * @Date 2024/9/10
  7. */
  8. public class LuaScriptUtils {
  9.     /**
  10.      * redis数据分隔符
  11.      */
  12.     public static final String SEPARATOR = ";";
  13.     /**
  14.      * 异步扣费lua脚本
  15.      * 键1:用户余额键,hash结构
  16.      * 键2:用户订单键,string结构
  17.      * 参数1:扣减的金额,单位是分;
  18.      * 参数2:扣减余额的时间戳
  19.      */
  20.     public static final String ASYNC_DEDUCT_BALANCE_STR =
  21.             "local rs " +
  22.             "if redis.call('exists', KEYS[1]) > 0 then " +
  23.             "  local vals = redis.call('hmget', KEYS[1], 'balance', 'threshold', 'v') " +
  24.             "  local balance = tonumber(vals[1]) " +
  25.             "  local threshold = tonumber(vals[2]) " +
  26.             "  local v0 = tonumber(vals[3]) " +
  27.             "  if balance >= tonumber(ARGV[1]) and (balance - ARGV[1]) >= threshold then " +
  28.             "    local b = redis.call('hincrby', KEYS[1], 'balance', 0 - ARGV[1]) " +
  29.             "    redis.call('hset', KEYS[1], 'ts', ARGV[2]) " +
  30.             "    redis.call('set', KEYS[2], ARGV[1] .. '" + SEPARATOR + "' .. ARGV[2] .. '" + SEPARATOR + "' .. b, 'EX', 604800) " +
  31.             "    redis.call('zadd', KEYS[3], ARGV[2], ARGV[3]) " +
  32.             "    local v = redis.call('hincrby', KEYS[1], 'v', 1) " +
  33.             "    rs = 'ok" + SEPARATOR + "' .. b .. '" + SEPARATOR + "' .. v " +
  34.             "  else " +
  35.             "    rs = 'fail" + SEPARATOR + "' .. balance .. '" + SEPARATOR + "' .. v0 " +
  36.             "  end " +
  37.             "else " +
  38.             "  rs = 'null' " +
  39.             "end " +
  40.             "return rs";
  41.     /**
  42.      * 同步余额数据lua脚本
  43.      * 键:用户余额键,hash结构
  44.      * 参数1:扣减的金额,单位是分
  45.      * 参数2:扣减余额的时间戳
  46.      * 参数3:余额的阈值,单位是分
  47.      */
  48.     public static final String SYNC_BALANCE_STR =
  49.             "if redis.call('exists', KEYS[1]) > 0 then " +
  50.             "  redis.call('hget', KEYS[1], 'balance') " +
  51.             "else " +
  52.             "  redis.call('hmset', KEYS[1], 'balance', ARGV[1], 'v', ARGV[2], 'threshold', ARGV[3]) " +
  53.             "end " +
  54.             "return 'ok'";
  55.     /**
  56.      * 回滚扣款lua脚本
  57.      * 键1:用户余额键,hash结构
  58.      * 键2:用户订单键,string结构
  59.      * 参数1:回滚的金额,单位是分
  60.      * 参数2:扣款时对应的版本号
  61.      */
  62.     public static final String ROLLBACK_DEDUCT_STR =
  63.             "local rs " +
  64.             "if redis.call('exists', KEYS[1]) > 0 then " +
  65.             "  local v = tonumber(redis.call('hget', KEYS[1], 'v')) " +
  66.             "  if v >= tonumber(ARGV[2]) then " +
  67.             "    rs = redis.call('hincrby', KEYS[1], 'balance', ARGV[1]) " +
  68.             "    redis.call('hincrby', KEYS[1], 'v', 1) " +
  69.             "    redis.call('del', KEYS[2]) " +
  70.             "    redis.call('zrem', KEYS[3], ARGV[3]) " +
  71.             "  else " +
  72.             "    rs = 'fail' " +
  73.             "  end " +
  74.             "else " +
  75.             "  rs = 'null' " +
  76.             "end " +
  77.             "return rs";
  78.     public static final DefaultRedisScript<String> ASYNC_DEDUCT_SCRIPT = new DefaultRedisScript<>(LuaScriptUtils.ASYNC_DEDUCT_BALANCE_STR, String.class);
  79.     public static final DefaultRedisScript<String> SYNC_BALANCE_SCRIPT = new DefaultRedisScript<>(LuaScriptUtils.SYNC_BALANCE_STR, String.class);
  80.     public static final DefaultRedisScript<String> ROLLBACK_DEDUCT_SCRIPT = new DefaultRedisScript<>(LuaScriptUtils.ROLLBACK_DEDUCT_STR, String.class);
  81. }
复制代码
全部底子组件都已经准备好了,接下来就是编写任务代码:
处置惩罚请求接口:
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.web.bind.annotation.GetMapping;
  4. import org.springframework.web.bind.annotation.RestController;
  5. import org.xingo.domain.ApiResult;
  6. import org.xingo.front.service.DeductService;
  7. import java.math.BigDecimal;
  8. /**
  9. * @Author xingo
  10. * @Date 2024/9/9
  11. */
  12. @Slf4j
  13. @RestController
  14. public class DeductController {
  15.     @Autowired
  16.     private DeductService deductService;
  17.     /**
  18.      * 异步扣减余额
  19.      * @param userId    用户ID
  20.      * @param amount    扣减金额
  21.      * @return
  22.      */
  23.     @GetMapping("/async/dudect")
  24.     public ApiResult asyncDeduct(int userId, BigDecimal amount) {
  25.         return deductService.asyncDeduct(userId, amount);
  26.     }
  27. }
复制代码
扣费处置惩罚服务:
  1. import org.xingo.domain.ApiResult;
  2. import java.math.BigDecimal;
  3. /**
  4. * @Author xingo
  5. * @Date 2024/9/13
  6. */
  7. public interface DeductService {
  8.     /**
  9.      * 异步扣减余额
  10.      * @param userId
  11.      * @param amount
  12.      * @return
  13.      */
  14.     ApiResult asyncDeduct(int userId, BigDecimal amount);
  15. }
复制代码
  1. import com.baomidou.mybatisplus.core.toolkit.StringUtils;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Service;
  5. import org.xingo.domain.ApiResult;
  6. import org.xingo.domain.DeductResult;
  7. import org.xingo.entity.UserData;
  8. import org.xingo.enums.DeductType;
  9. import org.xingo.front.config.CacheUtils;
  10. import org.xingo.front.service.DeductService;
  11. import org.xingo.front.service.OrderService;
  12. import java.math.BigDecimal;
  13. /**
  14. * @Author xingo
  15. * @Date 2024/9/13
  16. */
  17. @Slf4j
  18. @Service
  19. public class DeductServiceImpl implements DeductService {
  20.     @Autowired
  21.     private OrderService orderService;
  22.     @Autowired
  23.     private CacheUtils cacheUtils;
  24.     @Override
  25.     public ApiResult asyncDeduct(int userId, BigDecimal amount) {
  26.         ApiResult result = null;
  27.         UserData user = cacheUtils.getUserData(userId);
  28.         if(user != null && user.isApiOpen()) {
  29.             amount = amount.abs();
  30.             if(user.getDeductType() == DeductType.ASYNC) {
  31.                 DeductResult rs = orderService.asyncDeductOrder(userId, amount);
  32.                 if(StringUtils.isNotBlank(rs.getMessage()) && "null".equals(rs.getMessage())) {
  33.                     BigDecimal rsBalance = orderService.syncBalanceToRedis(userId);
  34.                     if(rsBalance != null) {
  35.                         return this.asyncDeduct(userId, amount);
  36.                     } else {
  37.                         result = ApiResult.fail("同步余额失败");
  38.                     }
  39.                 } else {
  40.                     result = ApiResult.success(rs);
  41.                 }
  42.             } else {
  43.                 DeductResult rs = orderService.syncDeductOrder(userId, amount);
  44.                 return StringUtils.isBlank(rs.getMessage()) ? ApiResult.success(rs) : ApiResult.fail(rs.getMessage());
  45.             }
  46.         } else {
  47.             result = ApiResult.fail("用户接口未开启");
  48.         }
  49.         return result;
  50.     }
  51. }
复制代码
订单扣减服务
  1. import org.xingo.domain.DeductResult;
  2. import java.math.BigDecimal;
  3. /**
  4. * @Author xingo
  5. * @Date 2024/9/9
  6. */
  7. public interface OrderService {
  8.     /**
  9.      * 同步扣减订单
  10.      * @param userId
  11.      * @param amount
  12.      * @return
  13.      */
  14.     DeductResult syncDeductOrder(Integer userId, BigDecimal amount);
  15.     /**
  16.      * 同步余额数据到redis
  17.      * @param userId
  18.      * @return
  19.      */
  20.     BigDecimal syncBalanceToRedis(Integer userId);
  21.     /**
  22.      * 同步扣减订单
  23.      * @param userId
  24.      * @param amount
  25.      * @return
  26.      */
  27.     DeductResult asyncDeductOrder(Integer userId, BigDecimal amount);
  28. }
复制代码
  1. import cn.hutool.core.util.IdUtil;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.redisson.api.RLock;
  4. import org.redisson.api.RedissonClient;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.data.redis.core.StringRedisTemplate;
  9. import org.springframework.kafka.core.KafkaTemplate;
  10. import org.springframework.stereotype.Service;
  11. import org.springframework.transaction.annotation.Transactional;
  12. import org.xingo.common.ConstantVal;
  13. import org.xingo.common.JacksonUtils;
  14. import org.xingo.common.RedisKeyUtils;
  15. import org.xingo.domain.DeductResult;
  16. import org.xingo.entity.AccountData;
  17. import org.xingo.entity.OrderData;
  18. import org.xingo.front.mapper.AccountMapper;
  19. import org.xingo.front.mapper.OrderMapper;
  20. import org.xingo.front.service.OrderService;
  21. import org.xingo.front.service.UserService;
  22. import java.math.BigDecimal;
  23. import java.math.RoundingMode;
  24. import java.time.LocalDateTime;
  25. import java.util.Arrays;
  26. import java.util.Collections;
  27. import java.util.List;
  28. import java.util.concurrent.TimeUnit;
  29. /**
  30. * @Author xingo
  31. * @Date 2024/9/9
  32. */
  33. @Slf4j
  34. @Service
  35. public class OrderServiceImpl implements OrderService {
  36.     /**
  37.      * 100倍
  38.      */
  39.     BigDecimal ONE_HUNDRED = BigDecimal.valueOf(100);
  40.     Logger deductLogger = LoggerFactory.getLogger("deduct");
  41.     @Autowired
  42.     private AccountMapper accountMapper;
  43.     @Autowired
  44.     private OrderMapper orderMapper;
  45.     @Autowired
  46.     private StringRedisTemplate redisTemplate;
  47.     @Autowired
  48.     private KafkaTemplate<String, String> kafkaTemplate;
  49.     @Autowired
  50.     private RedissonClient redissonClient;
  51.     @Autowired
  52.     private UserService userService;
  53.     @Override
  54.     @Transactional(rollbackFor = Exception.class)
  55.     public DeductResult syncDeductOrder(Integer userId, BigDecimal amount) {
  56.         String lockKey = RedisKeyUtils.userLockKey(userId);
  57.         RLock rlock = redissonClient.getLock(lockKey);
  58.         try {
  59.             boolean tryLock = rlock.tryLock(5, 5, TimeUnit.SECONDS);
  60.             if(tryLock) {
  61.                 // 扣减账号余额
  62.                 boolean rs = accountMapper.deductBalanceByUserId(userId, amount);
  63.                 if(rs) {    // 扣减余额成功
  64.                     // 查找余额
  65.                     AccountData account = accountMapper.selectById(userId);
  66.                     if(account.getBalance().compareTo(BigDecimal.ZERO) <= 0) {
  67.                         userService.closeUserApi(userId);
  68.                     }
  69.                     // 增加订单
  70.                     long id = IdUtil.getSnowflake(1, 1).nextId();
  71.                     OrderData orderData = OrderData.builder()
  72.                             .id(id)
  73.                             .userId(userId)
  74.                             .amount(amount)
  75.                             .balance(account.getBalance())
  76.                             .modifyTime(LocalDateTime.now())
  77.                             .createTime(LocalDateTime.now())
  78.                             .build();
  79.                     orderMapper.insert(orderData);
  80.                     log.info("同步扣减账号余额|{}|{}|{}|{}", userId, amount, account.getBalance(), id);
  81.                     // 实时同步余额到redis
  82.                     String key = RedisKeyUtils.userBalanceKey(userId);
  83.                     String key1 = RedisKeyUtils.userOrderKey(userId, id);
  84.                     List<String> keys = Arrays.asList(key, key1);
  85.                     String execute = redisTemplate.execute(LuaScriptUtils.DB_DEDUCT_SCRIPT, keys,
  86.                             amount.multiply(ONE_HUNDRED).intValue() + "", System.currentTimeMillis() + "");
  87.                     if(execute.startsWith("ok")) {
  88.                         String[] arr = execute.split(";");
  89.                         int redisVersion = Integer.parseInt(arr[2]);
  90.                         if(redisVersion < account.getVersion()) {
  91.                             redisTemplate.delete(key);
  92.                             log.info("缓存数据版本低于数据库|{}|{}|{}|{}|{}", userId, redisVersion, account.getVersion(), arr[1], account.getBalance());
  93.                         }
  94.                     }
  95.                     log.info("同步扣减数据到缓存|{}|{}|{}|{}", userId, amount, account.getBalance(), execute);
  96.                     return DeductResult.builder()
  97.                             .id(id)
  98.                             .userId(userId)
  99.                             .amount(amount)
  100.                             .balance(account.getBalance())
  101.                             .build();
  102.                 }
  103.             }
  104.         } catch (InterruptedException e) {
  105.             log.error("获取redisson锁异常", e);
  106.             throw new RuntimeException(e);
  107.         } finally {
  108.             if(rlock.isHeldByCurrentThread()) {
  109.                 rlock.unlock();
  110.             }
  111.         }
  112.         return DeductResult.builder().message("扣费失败").build();
  113.     }
  114.     @Override
  115.     public BigDecimal syncBalanceToRedis(Integer userId) {
  116.         String lockKey = RedisKeyUtils.userLockKey(userId);
  117.         RLock rlock = redissonClient.getLock(lockKey);
  118.         try {
  119.             boolean tryLock = rlock.tryLock(5, 5, TimeUnit.SECONDS);
  120.             if(tryLock) {
  121.                 AccountData balance = accountMapper.selectById(userId);
  122.                 if(balance != null && balance.getBalance().compareTo(BigDecimal.ZERO) > 0) {
  123.                     String key = RedisKeyUtils.userBalanceKey(userId);
  124.                     List<String> keys = Collections.singletonList(key);
  125.                     try {
  126.                         String execute = redisTemplate.execute(LuaScriptUtils.SYNC_BALANCE_SCRIPT, keys,
  127.                                 balance.getBalance().multiply(ONE_HUNDRED).intValue() + "",
  128.                                 balance.getVersion().toString(),
  129.                                 balance.getThreshold().multiply(ONE_HUNDRED).intValue() + "");
  130.                         log.info("同步账号余额到缓存|{}|{}|{}|{}|{}", userId, balance.getBalance(), balance.getVersion(), balance.getThreshold(), execute);
  131.                         return balance.getBalance();
  132.                     } catch (Exception e) {
  133.                         e.printStackTrace();
  134.                         return null;
  135.                     }
  136.                 }
  137.             }
  138.         } catch (InterruptedException e) {
  139.             log.error("获取redisson锁异常", e);
  140.             throw new RuntimeException(e);
  141.         } finally {
  142.             if(rlock.isHeldByCurrentThread()) {
  143.                 rlock.unlock();
  144.             }
  145.         }
  146.         return null;
  147.     }
  148.     @Override
  149.     public DeductResult asyncDeductOrder(Integer userId, BigDecimal amount) {
  150.         long id = IdUtil.getSnowflake(1, 1).nextId();
  151.         String key = RedisKeyUtils.userBalanceKey(userId);
  152.         String key1 = RedisKeyUtils.userOrderKey(userId, id);
  153.         String key2 = RedisKeyUtils.userOrderZsetKey(userId);
  154.         List<String> keys = Arrays.asList(key, key1, key2);
  155.         try {
  156.             long ts = System.currentTimeMillis();
  157.             int fee = amount.multiply(ONE_HUNDRED).intValue();
  158.             String execute = redisTemplate.execute(LuaScriptUtils.ASYNC_DEDUCT_SCRIPT, keys,
  159.                     fee + "", ts + "", id + "");
  160.             log.info("异步扣减缓存余额|{}|{}|{}", userId, amount, execute);
  161.             if(execute.startsWith("ok")) {      // 扣费成功
  162.                 return this.deductSuccess(keys, id, userId, amount, ts, execute);
  163.             } else {    // 扣费失败
  164.                 return DeductResult.builder().message(execute).build();
  165.             }
  166.         } catch (Exception e) {
  167.             log.error("扣费异常", e);
  168.             // 扣费失败
  169.             return DeductResult.builder().message("fail").build();
  170.         }
  171.     }
  172.     /**
  173.      * 扣费成功处理逻辑
  174.      * @param keys
  175.      * @param userId
  176.      * @param amount
  177.      * @param execute
  178.      * @return
  179.      * @throws Exception
  180.      */
  181.     private DeductResult deductSuccess(List<String> keys, long id, Integer userId, BigDecimal amount, long ts, String execute) throws Exception {
  182.         String[] arr = execute.split(";");
  183.         BigDecimal balance = new BigDecimal(arr[1]).divide(ONE_HUNDRED, 2, RoundingMode.HALF_UP);
  184.         if(balance.compareTo(BigDecimal.ZERO) <= 0) {
  185.             userService.closeUserApi(userId);
  186.         }
  187.         String version = arr[2];
  188.         // 扣费成功发送kafka消息
  189.         DeductResult deductResult = DeductResult.builder()
  190.                 .id(id)
  191.                 .userId(userId)
  192.                 .balance(balance)
  193.                 .amount(amount)
  194.                 .build();
  195.         // 发送消息队列采用同步方式,判断发送消息队列成功后才返回接口成功
  196.         kafkaTemplate.send(ConstantVal.KAFKA_CONSUMER_TOPIC, userId.toString(), JacksonUtils.toJSONString(deductResult)).handle((rs, throwable) -> {
  197.                 if (throwable == null) {
  198.                     return rs;
  199.                 }
  200.                 String topic = rs.getProducerRecord().topic();
  201.                 log.info("异步扣减余额后发送消息队列失败|{}|{}|{}|{}", topic, userId, amount, execute);
  202.                 // kafka消息发送失败回滚扣费
  203.                 String execute1 = redisTemplate.execute(LuaScriptUtils.ROLLBACK_DEDUCT_SCRIPT, keys,
  204.                         amount.multiply(ONE_HUNDRED).intValue() + "", version, id + "");
  205.                 log.info("异步扣减余额后发送消息队列失败回滚扣费|{}|{}|{}|{}|{}", topic, userId, amount, execute, execute1);
  206.                 // 提示失败,调用同步扣费
  207.                 deductResult.setMessage("fail");
  208.                 return null;
  209.             })
  210.             .thenAccept(rs -> {
  211.                 if (rs != null) {
  212.                     deductLogger.info("{}|{}|{}|{}", userId, id, amount, ts);
  213.                 }
  214.             }).get();
  215.         return deductResult;
  216.     }
  217. }
复制代码
上面的代码已经完成了扣费的第一步全部逻辑,只通过redis的高性能扣费逻辑就完成了。使用lua脚本能够包管事务,同时redis是单线程执行命令的不用考虑锁的标题。如果使用数据库完成扣费逻辑,就要考虑使用分布式锁包管服务的安全。
但是上面的代码还有一些点要优化的:
余额数据在redis中的结构是一个hash,金额的单位为分:
  1. 127.0.0.1:7001> hgetall user:balance:{9}
  2. 1) "balance"
  3. 2) "9970000"
  4. 3) "v"
  5. 4) "4"
  6. 5) "threshold"
  7. 6) "500000"
  8. 7) "ts"
  9. 8) "1728436908699"
复制代码
订单信息是一个键值对,值的结构是多个;分隔的值,分别为本次扣减金额、时间戳、扣减后余额:
  1. 127.0.0.1:7001> get user:order:{9}:1843824075538042880
  2. "10000;1728436890860;9990000"
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4