这篇文章通过使用 redis+消息队列+数据库 实现一套高性能的计费服务,接口请求扣费分为两步: 首先 使用redis扣减用户余额,扣费乐成后将数据发送到消息队列,整个扣费的第一步就完成了,这时就可以返回乐成给调用端; 第二步 数据持久化,由别的一个服务订阅消息队列,批量消费队列中的消息实现数据库持久化。
实现上面的处置惩罚流程计划使用一个demo项目来演示。项目使用 SpringBoot+redis+kafka+MySQL 的架构。
- -- 用户信息
- CREATE TABLE `t_user` (
- `user_name` varchar(128) DEFAULT NULL,
- `create_time` datetime NULL DEFAULT NULL,
- `modify_time` datetime NULL DEFAULT NULL,
- `api_open` tinyint NULL DEFAULT 0,
- `deduct_type` varchar(16) DEFAULT NULL,
- UNIQUE INDEX `user_name`(`user_name` ASC) USING BTREE
- ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
- -- 账户信息
- CREATE TABLE `t_account` (
- `user_id` int NOT NULL,
- `balance` decimal(20, 2) NULL DEFAULT NULL,
- `modify_time` datetime NULL DEFAULT NULL,
- `create_time` datetime NULL DEFAULT NULL,
- `version` bigint NULL DEFAULT 1,
- `threshold` decimal(20, 2) NULL DEFAULT NULL,
- ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = DYNAMIC;
- -- 订单信息
- CREATE TABLE `t_order` (
- `id` bigint NOT NULL,
- `user_id` int NULL DEFAULT NULL,
- `amount` decimal(20, 2) NULL DEFAULT NULL,
- `balance` decimal(20, 2) NULL DEFAULT NULL,
- `modify_time` datetime NULL DEFAULT NULL,
- `create_time` datetime NULL DEFAULT NULL,
- ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = DYNAMIC;
- -- 支付信息
- CREATE TABLE `t_pay` (
- `id` bigint NOT NULL,
- `user_id` int NULL DEFAULT NULL,
- `pay` decimal(20, 2) NULL DEFAULT NULL,
- `create_time` datetime NULL DEFAULT NULL,
- ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
复制代码 用户信息表 t_user 有两个字段必要阐明:
api_open 表示用户接口是否开启,如果用户账号余额不足可以把该字段设置为false拦截扣费请求;
deduct_type 这个字段表示扣费方式,ASYNC表示异步扣费,先通过redis扣减余额再异步扣减数据库,SYNC表示同步扣费,直接扣减数据库。
账户余额表 t_account 中也要阐明两个字段:
version 表示余额扣减的版本号,这个版本号只能递增,每更新一次数据库版本号就增长1,在异步扣费时,可以比力redis中数据和db中数据的差异,正常来说redis中的版本号要大于等于db中的版本号;
threshold 是余额的一个阈值,由于redis中数据和db数据的差异,当异步扣费时redis中的余额小于该值时,避免有大概存在的超扣情况,要对用户的请求限流。
- local rs
- if redis.call('exists', KEYS[1]) > 0 then
- local vals = redis.call('hmget', KEYS[1], 'balance', 'threshold', 'v')
- local balance = tonumber(vals[1])
- local threshold = tonumber(vals[2])
- local v0 = tonumber(vals[3])
- if balance >= tonumber(ARGV[1]) and (balance - ARGV[1]) >= threshold then
- local b = redis.call('hincrby', KEYS[1], 'balance', 0 - ARGV[1])
- redis.call('hset', KEYS[1], 'ts', ARGV[2])
- redis.call('set', KEYS[2], ARGV[1] .. ';' .. ARGV[2] .. ';' .. b, 'EX', 604800)
- redis.call('zadd', KEYS[3], ARGV[2], ARGV[3])
- local v = redis.call('hincrby', KEYS[1], 'v', 1)
- rs = 'ok;' .. b .. ';' .. v
- else
- rs = 'fail;' .. balance .. ';' .. v0
- end
- else
- rs = 'null'
- end
- return rs
复制代码 最初redis中是不存在数据的,必要将db数据加载到redis中:
- if redis.call('exists', KEYS[1]) > 0 then
- redis.call('hget', KEYS[1], 'balance')
- else
- redis.call('hmset', KEYS[1], 'balance', ARGV[1], 'v', ARGV[2], 'threshold', ARGV[3])
- end
- return 'ok'
复制代码 当redis扣费乐成而后面操纵出现非常时必要回滚redis的扣费:
- local rs
- if redis.call('exists', KEYS[1]) > 0 then
- local v = tonumber(redis.call('hget', KEYS[1], 'v'))
- if v >= tonumber(ARGV[2]) then
- rs = redis.call('hincrby', KEYS[1], 'balance', ARGV[1])
- redis.call('hincrby', KEYS[1], 'v', 1)
- redis.call('del', KEYS[2])
- redis.call('zrem', KEYS[3], ARGV[3])
- else
- rs = 'fail'
- end
- else
- rs = 'null'
- end
- return rs
复制代码 上面的lua脚本涉及到多个键操纵,要包管在集群模式下命令精确执行,要让全部的键落在一个hash槽,所以在键的计划时要包管计算hash的部分相同,使用用户ID包裹在{}内就能到达目的,全部涉及到的键封装成一个工具类:
- /**
- * @Author xingo
- * @Date 2024/9/10
- */
- public class RedisKeyUtils {
- /**
- * 用户余额键
- * @param userId 用户ID
- * @return
- */
- public static String userBalanceKey(int userId) {
- return ("user:balance:{" + userId + "}").intern();
- }
- /**
- * 用户订单键
- * @param userId 用户ID
- * @param orderId 订单ID
- * @return
- */
- public static String userOrderKey(int userId, long orderId) {
- return ("user:order:{" + userId + "}:" + orderId).intern();
- }
- /**
- * 保存用户订单一致性的订单集合
- * 保存可能已经在redis中但不在数据库中的订单ID集合
- * @param userId 用户ID
- * @return
- */
- public static String userOrderZsetKey(int userId) {
- return ("user:order:consistency:{" + userId + "}").intern();
- }
- /**
- * 用户分布式锁键
- * @param userId
- * @return
- */
- public static String userLockKey(int userId) {
- return ("user:lock:" + userId).intern();
- }
- }
复制代码 使用springboot开发还必要将lua脚本封装成工具类:
- import org.springframework.data.redis.core.script.DefaultRedisScript;
- /**
- * lua脚本工具类
- *
- * @Author xingo
- * @Date 2024/9/10
- */
- public class LuaScriptUtils {
- /**
- * redis数据分隔符
- */
- public static final String SEPARATOR = ";";
- /**
- * 异步扣费lua脚本
- * 键1:用户余额键,hash结构
- * 键2:用户订单键,string结构
- * 参数1:扣减的金额,单位是分;
- * 参数2:扣减余额的时间戳
- */
- public static final String ASYNC_DEDUCT_BALANCE_STR =
- "local rs " +
- "if redis.call('exists', KEYS[1]) > 0 then " +
- " local vals = redis.call('hmget', KEYS[1], 'balance', 'threshold', 'v') " +
- " local balance = tonumber(vals[1]) " +
- " local threshold = tonumber(vals[2]) " +
- " local v0 = tonumber(vals[3]) " +
- " if balance >= tonumber(ARGV[1]) and (balance - ARGV[1]) >= threshold then " +
- " local b = redis.call('hincrby', KEYS[1], 'balance', 0 - ARGV[1]) " +
- " redis.call('hset', KEYS[1], 'ts', ARGV[2]) " +
- " redis.call('set', KEYS[2], ARGV[1] .. '" + SEPARATOR + "' .. ARGV[2] .. '" + SEPARATOR + "' .. b, 'EX', 604800) " +
- " redis.call('zadd', KEYS[3], ARGV[2], ARGV[3]) " +
- " local v = redis.call('hincrby', KEYS[1], 'v', 1) " +
- " rs = 'ok" + SEPARATOR + "' .. b .. '" + SEPARATOR + "' .. v " +
- " else " +
- " rs = 'fail" + SEPARATOR + "' .. balance .. '" + SEPARATOR + "' .. v0 " +
- " end " +
- "else " +
- " rs = 'null' " +
- "end " +
- "return rs";
- /**
- * 同步余额数据lua脚本
- * 键:用户余额键,hash结构
- * 参数1:扣减的金额,单位是分
- * 参数2:扣减余额的时间戳
- * 参数3:余额的阈值,单位是分
- */
- public static final String SYNC_BALANCE_STR =
- "if redis.call('exists', KEYS[1]) > 0 then " +
- " redis.call('hget', KEYS[1], 'balance') " +
- "else " +
- " redis.call('hmset', KEYS[1], 'balance', ARGV[1], 'v', ARGV[2], 'threshold', ARGV[3]) " +
- "end " +
- "return 'ok'";
- /**
- * 回滚扣款lua脚本
- * 键1:用户余额键,hash结构
- * 键2:用户订单键,string结构
- * 参数1:回滚的金额,单位是分
- * 参数2:扣款时对应的版本号
- */
- public static final String ROLLBACK_DEDUCT_STR =
- "local rs " +
- "if redis.call('exists', KEYS[1]) > 0 then " +
- " local v = tonumber(redis.call('hget', KEYS[1], 'v')) " +
- " if v >= tonumber(ARGV[2]) then " +
- " rs = redis.call('hincrby', KEYS[1], 'balance', ARGV[1]) " +
- " redis.call('hincrby', KEYS[1], 'v', 1) " +
- " redis.call('del', KEYS[2]) " +
- " redis.call('zrem', KEYS[3], ARGV[3]) " +
- " else " +
- " rs = 'fail' " +
- " end " +
- "else " +
- " rs = 'null' " +
- "end " +
- "return rs";
- public static final DefaultRedisScript<String> ASYNC_DEDUCT_SCRIPT = new DefaultRedisScript<>(LuaScriptUtils.ASYNC_DEDUCT_BALANCE_STR, String.class);
- public static final DefaultRedisScript<String> SYNC_BALANCE_SCRIPT = new DefaultRedisScript<>(LuaScriptUtils.SYNC_BALANCE_STR, String.class);
- public static final DefaultRedisScript<String> ROLLBACK_DEDUCT_SCRIPT = new DefaultRedisScript<>(LuaScriptUtils.ROLLBACK_DEDUCT_STR, String.class);
- }
复制代码 全部底子组件都已经准备好了,接下来就是编写任务代码:
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
- import org.xingo.domain.ApiResult;
- import org.xingo.front.service.DeductService;
- import java.math.BigDecimal;
- /**
- * @Author xingo
- * @Date 2024/9/9
- */
- @Slf4j
- @RestController
- public class DeductController {
- @Autowired
- private DeductService deductService;
- /**
- * 异步扣减余额
- * @param userId 用户ID
- * @param amount 扣减金额
- * @return
- */
- @GetMapping("/async/dudect")
- public ApiResult asyncDeduct(int userId, BigDecimal amount) {
- return deductService.asyncDeduct(userId, amount);
- }
- }
复制代码 扣费处置惩罚服务:
- import org.xingo.domain.ApiResult;
- import java.math.BigDecimal;
- /**
- * @Author xingo
- * @Date 2024/9/13
- */
- public interface DeductService {
- /**
- * 异步扣减余额
- * @param userId
- * @param amount
- * @return
- */
- ApiResult asyncDeduct(int userId, BigDecimal amount);
- }
复制代码- import com.baomidou.mybatisplus.core.toolkit.StringUtils;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import org.xingo.domain.ApiResult;
- import org.xingo.domain.DeductResult;
- import org.xingo.entity.UserData;
- import org.xingo.enums.DeductType;
- import org.xingo.front.config.CacheUtils;
- import org.xingo.front.service.DeductService;
- import org.xingo.front.service.OrderService;
- import java.math.BigDecimal;
- /**
- * @Author xingo
- * @Date 2024/9/13
- */
- @Slf4j
- @Service
- public class DeductServiceImpl implements DeductService {
- @Autowired
- private OrderService orderService;
- @Autowired
- private CacheUtils cacheUtils;
- @Override
- public ApiResult asyncDeduct(int userId, BigDecimal amount) {
- ApiResult result = null;
- UserData user = cacheUtils.getUserData(userId);
- if(user != null && user.isApiOpen()) {
- amount = amount.abs();
- if(user.getDeductType() == DeductType.ASYNC) {
- DeductResult rs = orderService.asyncDeductOrder(userId, amount);
- if(StringUtils.isNotBlank(rs.getMessage()) && "null".equals(rs.getMessage())) {
- BigDecimal rsBalance = orderService.syncBalanceToRedis(userId);
- if(rsBalance != null) {
- return this.asyncDeduct(userId, amount);
- } else {
- result = ApiResult.fail("同步余额失败");
- }
- } else {
- result = ApiResult.success(rs);
- }
- } else {
- DeductResult rs = orderService.syncDeductOrder(userId, amount);
- return StringUtils.isBlank(rs.getMessage()) ? ApiResult.success(rs) : ApiResult.fail(rs.getMessage());
- }
- } else {
- result = ApiResult.fail("用户接口未开启");
- }
- return result;
- }
- }
复制代码 订单扣减服务
- import org.xingo.domain.DeductResult;
- import java.math.BigDecimal;
- /**
- * @Author xingo
- * @Date 2024/9/9
- */
- public interface OrderService {
- /**
- * 同步扣减订单
- * @param userId
- * @param amount
- * @return
- */
- DeductResult syncDeductOrder(Integer userId, BigDecimal amount);
- /**
- * 同步余额数据到redis
- * @param userId
- * @return
- */
- BigDecimal syncBalanceToRedis(Integer userId);
- /**
- * 同步扣减订单
- * @param userId
- * @param amount
- * @return
- */
- DeductResult asyncDeductOrder(Integer userId, BigDecimal amount);
- }
复制代码- import cn.hutool.core.util.IdUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.redisson.api.RLock;
- import org.redisson.api.RedissonClient;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.StringRedisTemplate;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
- import org.xingo.common.ConstantVal;
- import org.xingo.common.JacksonUtils;
- import org.xingo.common.RedisKeyUtils;
- import org.xingo.domain.DeductResult;
- import org.xingo.entity.AccountData;
- import org.xingo.entity.OrderData;
- import org.xingo.front.mapper.AccountMapper;
- import org.xingo.front.mapper.OrderMapper;
- import org.xingo.front.service.OrderService;
- import org.xingo.front.service.UserService;
- import java.math.BigDecimal;
- import java.math.RoundingMode;
- import java.time.LocalDateTime;
- import java.util.Arrays;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.TimeUnit;
- /**
- * @Author xingo
- * @Date 2024/9/9
- */
- @Slf4j
- @Service
- public class OrderServiceImpl implements OrderService {
- /**
- * 100倍
- */
- BigDecimal ONE_HUNDRED = BigDecimal.valueOf(100);
- Logger deductLogger = LoggerFactory.getLogger("deduct");
- @Autowired
- private AccountMapper accountMapper;
- @Autowired
- private OrderMapper orderMapper;
- @Autowired
- private StringRedisTemplate redisTemplate;
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
- @Autowired
- private RedissonClient redissonClient;
- @Autowired
- private UserService userService;
- @Override
- @Transactional(rollbackFor = Exception.class)
- public DeductResult syncDeductOrder(Integer userId, BigDecimal amount) {
- String lockKey = RedisKeyUtils.userLockKey(userId);
- RLock rlock = redissonClient.getLock(lockKey);
- try {
- boolean tryLock = rlock.tryLock(5, 5, TimeUnit.SECONDS);
- if(tryLock) {
- // 扣减账号余额
- boolean rs = accountMapper.deductBalanceByUserId(userId, amount);
- if(rs) { // 扣减余额成功
- // 查找余额
- AccountData account = accountMapper.selectById(userId);
- if(account.getBalance().compareTo(BigDecimal.ZERO) <= 0) {
- userService.closeUserApi(userId);
- }
- // 增加订单
- long id = IdUtil.getSnowflake(1, 1).nextId();
- OrderData orderData = OrderData.builder()
- .id(id)
- .userId(userId)
- .amount(amount)
- .balance(account.getBalance())
- .modifyTime(LocalDateTime.now())
- .createTime(LocalDateTime.now())
- .build();
- orderMapper.insert(orderData);
- log.info("同步扣减账号余额|{}|{}|{}|{}", userId, amount, account.getBalance(), id);
- // 实时同步余额到redis
- String key = RedisKeyUtils.userBalanceKey(userId);
- String key1 = RedisKeyUtils.userOrderKey(userId, id);
- List<String> keys = Arrays.asList(key, key1);
- String execute = redisTemplate.execute(LuaScriptUtils.DB_DEDUCT_SCRIPT, keys,
- amount.multiply(ONE_HUNDRED).intValue() + "", System.currentTimeMillis() + "");
- if(execute.startsWith("ok")) {
- String[] arr = execute.split(";");
- int redisVersion = Integer.parseInt(arr[2]);
- if(redisVersion < account.getVersion()) {
- redisTemplate.delete(key);
- log.info("缓存数据版本低于数据库|{}|{}|{}|{}|{}", userId, redisVersion, account.getVersion(), arr[1], account.getBalance());
- }
- }
- log.info("同步扣减数据到缓存|{}|{}|{}|{}", userId, amount, account.getBalance(), execute);
- return DeductResult.builder()
- .id(id)
- .userId(userId)
- .amount(amount)
- .balance(account.getBalance())
- .build();
- }
- }
- } catch (InterruptedException e) {
- log.error("获取redisson锁异常", e);
- throw new RuntimeException(e);
- } finally {
- if(rlock.isHeldByCurrentThread()) {
- rlock.unlock();
- }
- }
- return DeductResult.builder().message("扣费失败").build();
- }
- @Override
- public BigDecimal syncBalanceToRedis(Integer userId) {
- String lockKey = RedisKeyUtils.userLockKey(userId);
- RLock rlock = redissonClient.getLock(lockKey);
- try {
- boolean tryLock = rlock.tryLock(5, 5, TimeUnit.SECONDS);
- if(tryLock) {
- AccountData balance = accountMapper.selectById(userId);
- if(balance != null && balance.getBalance().compareTo(BigDecimal.ZERO) > 0) {
- String key = RedisKeyUtils.userBalanceKey(userId);
- List<String> keys = Collections.singletonList(key);
- try {
- String execute = redisTemplate.execute(LuaScriptUtils.SYNC_BALANCE_SCRIPT, keys,
- balance.getBalance().multiply(ONE_HUNDRED).intValue() + "",
- balance.getVersion().toString(),
- balance.getThreshold().multiply(ONE_HUNDRED).intValue() + "");
- log.info("同步账号余额到缓存|{}|{}|{}|{}|{}", userId, balance.getBalance(), balance.getVersion(), balance.getThreshold(), execute);
- return balance.getBalance();
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
- }
- }
- } catch (InterruptedException e) {
- log.error("获取redisson锁异常", e);
- throw new RuntimeException(e);
- } finally {
- if(rlock.isHeldByCurrentThread()) {
- rlock.unlock();
- }
- }
- return null;
- }
- @Override
- public DeductResult asyncDeductOrder(Integer userId, BigDecimal amount) {
- long id = IdUtil.getSnowflake(1, 1).nextId();
- String key = RedisKeyUtils.userBalanceKey(userId);
- String key1 = RedisKeyUtils.userOrderKey(userId, id);
- String key2 = RedisKeyUtils.userOrderZsetKey(userId);
- List<String> keys = Arrays.asList(key, key1, key2);
- try {
- long ts = System.currentTimeMillis();
- int fee = amount.multiply(ONE_HUNDRED).intValue();
- String execute = redisTemplate.execute(LuaScriptUtils.ASYNC_DEDUCT_SCRIPT, keys,
- fee + "", ts + "", id + "");
- log.info("异步扣减缓存余额|{}|{}|{}", userId, amount, execute);
- if(execute.startsWith("ok")) { // 扣费成功
- return this.deductSuccess(keys, id, userId, amount, ts, execute);
- } else { // 扣费失败
- return DeductResult.builder().message(execute).build();
- }
- } catch (Exception e) {
- log.error("扣费异常", e);
- // 扣费失败
- return DeductResult.builder().message("fail").build();
- }
- }
- /**
- * 扣费成功处理逻辑
- * @param keys
- * @param userId
- * @param amount
- * @param execute
- * @return
- * @throws Exception
- */
- private DeductResult deductSuccess(List<String> keys, long id, Integer userId, BigDecimal amount, long ts, String execute) throws Exception {
- String[] arr = execute.split(";");
- BigDecimal balance = new BigDecimal(arr[1]).divide(ONE_HUNDRED, 2, RoundingMode.HALF_UP);
- if(balance.compareTo(BigDecimal.ZERO) <= 0) {
- userService.closeUserApi(userId);
- }
- String version = arr[2];
- // 扣费成功发送kafka消息
- DeductResult deductResult = DeductResult.builder()
- .id(id)
- .userId(userId)
- .balance(balance)
- .amount(amount)
- .build();
- // 发送消息队列采用同步方式,判断发送消息队列成功后才返回接口成功
- kafkaTemplate.send(ConstantVal.KAFKA_CONSUMER_TOPIC, userId.toString(), JacksonUtils.toJSONString(deductResult)).handle((rs, throwable) -> {
- if (throwable == null) {
- return rs;
- }
- String topic = rs.getProducerRecord().topic();
- log.info("异步扣减余额后发送消息队列失败|{}|{}|{}|{}", topic, userId, amount, execute);
- // kafka消息发送失败回滚扣费
- String execute1 = redisTemplate.execute(LuaScriptUtils.ROLLBACK_DEDUCT_SCRIPT, keys,
- amount.multiply(ONE_HUNDRED).intValue() + "", version, id + "");
- log.info("异步扣减余额后发送消息队列失败回滚扣费|{}|{}|{}|{}|{}", topic, userId, amount, execute, execute1);
- // 提示失败,调用同步扣费
- deductResult.setMessage("fail");
- return null;
- })
- .thenAccept(rs -> {
- if (rs != null) {
- deductLogger.info("{}|{}|{}|{}", userId, id, amount, ts);
- }
- }).get();
- return deductResult;
- }
- }
复制代码 上面的代码已经完成了扣费的第一步全部逻辑,只通过redis的高性能扣费逻辑就完成了。使用lua脚本能够包管事务,同时redis是单线程执行命令的不用考虑锁的标题。如果使用数据库完成扣费逻辑,就要考虑使用分布式锁包管服务的安全。
- 当redis集群发生了主从切换,由于redis的异步复制就有大概存在丢失数据的风险,所以我们要在业务体系中包管数据不丢失;
- redis与db库会存在数据差异,这是性能导致的,在某些场景中要考虑这种差异有大概引起的标题。
-> hgetall user:balance:{9}
- 1) "balance"
- 2) "9970000"
- 3) "v"
- 4) "4"
- 5) "threshold"
- 6) "500000"
- 7) "ts"
- 8) "1728436908699"
复制代码 订单信息是一个键值对,值的结构是多个;分隔的值,分别为本次扣减金额、时间戳、扣减后余额:
-> get user:order:{9}:1843824075538042880
- "10000;1728436890860;9990000"
