Redis --- 秒杀优化方案(阻塞队列+基于Stream流的消息队列) ...

打印 上一主题 下一主题

主题 824|帖子 824|积分 2472

下面是我们的秒杀流程:

对于正常的秒杀处置惩罚,我们必要多次查询数据库,会给数据库造成相当大的压力,这个时间我们必要加入缓存,进而缓解数据库压力。
在上面的图示中,我们可以将一条流水线的任务拆成两条流水线来做,假如我们直接将判断秒杀库存与校验一人一单放在流水线A上,剩下的放在另一条流水线B,那么假如流水线A就可以相当于服务员直接判断是否符合资格,假如符合资格那么直接天生信息给另一条流水线B去处置惩罚业务,这里的流水线就是咱们的线程,而流水线A也是基于数据库进行查询,也会压力数据库,那么这种情况我们就可以将待查询信息保存在Redis缓存中。
但是我们不能再流水线A判断完成后去直接调用流水线B,如许的服从是大打扣头的,这种情况我们必要开启独立线程去执行流水线B的操纵,怎样知道给哪个用户创建订单呢?这个时间就要流水线A在判断成功后去天生信息给独立线程
   最后的业务就酿成,用户直接访问流水线A,通过流水线A去判断,假如通过则天生信息给流水线B去创建订单,过程如下图:
  

那么什么样的数据布局满足下面条件:① 一个key能够保存很多值   ②唯一性:一人一单必要保证用户id不能重复。
所以我们必要使用set:

那么怎样判断校验用户的购买资格呢?


 而上述判断必要保证原子性,所以我们必要使用Lua脚本进行编写:

  1. local voucherId = ARGV[1]; -- 优惠劵id
  2. local userId = ARGV[2]; -- 用户id
  3. -- 库存key
  4. local stockKey = 'seckill:stock' .. voucherId; -- 拼接
  5. -- 订单key
  6. local stockKey = 'seckill:stock' .. voucherId; -- 拼接
  7. -- 判断库存是否充足
  8. if(tonumber(redis.call('get',stockKey) <= 0)) then
  9.     -- 库存不足,返回1
  10.     return 1;
  11. end;
  12. -- 判断用户是否下单
  13. if(redis.call('sismember',orderKey,userId)) then
  14.     -- 存在,说明重复下单,返回2
  15.     return 2;
  16. end
  17. -- 扣减库存 incrby stockKey -1
  18. redis.call('incrby',stockKey,-1);
  19. -- 下单(保存用户) sadd orderKey userId
  20. redis.call('sadd',orderKey,userId);
  21. return 0;
复制代码
之后我们按照下面步骤来实现代码:

在方法体内执行Lua脚原来原子性判断,然后判断是否能够处置惩罚并传入阻塞队列:
  1. @Slf4j
  2. @Service
  3. public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
  4.     @Autowired
  5.     private ISeckillVoucherService seckillVoucherService;
  6.     @Autowired
  7.     private RedisIdWorker redisIdWorker;
  8.     @Resource
  9.     private StringRedisTemplate stringRedisTemplate;
  10.     @Resource
  11.     private RedissonClient redissonClient;
  12.     private static final DefaultRedisScript<Long> SECKILL_SCRIPT; // 泛型内填入返回值类型
  13.     static { // 静态属性要使用静态代码块进行初始化
  14.         SECKILL_SCRIPT = new DefaultRedisScript<>();
  15.         SECKILL_SCRIPT.setResultType(Long.class);
  16.         SECKILL_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
  17.     }
  18.     public Result seckillVoucherMax(Long voucherId) {
  19.         // 获取用户信息
  20.         Long userId = UserHolder.getUser().getId();
  21.         // 1.执行Lua脚本来判断用户资格
  22.         Long result = stringRedisTemplate.execute(
  23.                             SECKILL_SCRIPT,
  24.                             Collections.emptyList(), // Lua无需接受key
  25.                             voucherId.toString(),
  26.                             userId.toString()
  27.                         );
  28.         // 2.判断结果是否为0
  29.         int r = result.intValue();
  30.         if(r != 0) {
  31.             // 不为0代表无资格购买
  32.             return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
  33.         }
  34.         // 3.有购买资格则将下单信息保存到阻塞队列中
  35.         // ...
  36.         return Result.ok();
  37.     }
  38. }
复制代码
 接下来我们创建阻塞队列,线程池以及线程方法,随后使用Springboot提供的注解在@PostConstruct去给线程池传入线程方法:
  1. @Slf4j
  2. @Service
  3. public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
  4.     @Autowired
  5.     private ISeckillVoucherService seckillVoucherService;
  6.     @Autowired
  7.     private RedisIdWorker redisIdWorker;
  8.     @Resource
  9.     private StringRedisTemplate stringRedisTemplate;
  10.     @Resource
  11.     private RedissonClient redissonClient;
  12.     private static final DefaultRedisScript<Long> SECKILL_SCRIPT; // 泛型内填入返回值类型
  13.     static { // 静态属性要使用静态代码块进行初始化
  14.         SECKILL_SCRIPT = new DefaultRedisScript<>();
  15.         SECKILL_SCRIPT.setResultType(Long.class);
  16.         SECKILL_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
  17.     }
  18.     private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); // 创建阻塞队列
  19.     private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
  20.   // 创建线程池
  21.     // 让大类在开始初始化时就能够执行线程任务
  22.     @PostConstruct
  23.     private void init() {
  24.         SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderTask());
  25.     }
  26.     // 创建线程任务
  27.     private class VoucherOrderTask implements Runnable {
  28.         @Override
  29.         public void run() {
  30.             while(true){
  31.                 try {
  32.                     // 获取队列中的订单信息
  33.                     VoucherOrder voucherOrder = orderTasks.take();// 取出头部信息
  34.                     // 创建订单
  35.                     handleVoucherOrder(voucherOrder);
  36.                 } catch (Exception e) {
  37.                     log.error("处理订单异常",e);
  38.                 }
  39.             }
  40.         }
  41.     }
  42.     // 创建订单
  43.     private void handleVoucherOrder(VoucherOrder voucherOrder) {
  44.         RLock lock = redissonClient.getLock("lock:order:" + voucherOrder.getUserId().toString());
  45.         boolean isLock = lock.tryLock();
  46.         // 判断是否获取锁成功
  47.         if (!isLock) {
  48.             // 获取锁失败,返回错误或重试
  49.             log.error("不允许重复下单");
  50.             return ;
  51.         }
  52.         try {
  53.             proxy.createVoucherOrderMax(voucherOrder);
  54.         } finally {
  55.             lock.unlock();
  56.         }
  57.     }
  58.     @Override
  59.     public void createVoucherOrderMax(VoucherOrder voucherOrder) {
  60.         // 一人一单
  61.         Long userId = voucherOrder.getUserId();
  62.         // 查询订单
  63.         int count = query().eq("user_id",userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
  64.         // 判断是否存在
  65.         if(count > 0){
  66.             // 用户已经购买过
  67.             log.error("用户已经购买过");
  68.             return ;
  69.         }
  70.         // CAS改进:将库存判断改成stock > 0以此来提高性能
  71.         boolean success = seckillVoucherService.update()
  72.                 .setSql("stock= stock -1") // set stock = stock - 1
  73.                 .eq("voucher_id", voucherOrder.getVoucherId()).eq("stock",0) // where id = ? and stock > 0
  74.                 .update();
  75.         if (!success) {
  76.             //扣减库存
  77.             log.error("库存不足!");
  78.             return ;
  79.         }
  80.         //6.创建订单
  81.         save(voucherOrder);
  82.     }
  83.     private IVoucherOrderService proxy; // 代理对象
  84.     public Result seckillVoucherMax(Long voucherId) {
  85.         // 获取用户信息
  86.         Long userId = UserHolder.getUser().getId();
  87.         // 1.执行Lua脚本来判断用户资格
  88.         Long result = stringRedisTemplate.execute(
  89.                             SECKILL_SCRIPT,
  90.                             Collections.emptyList(), // Lua无需接受key
  91.                             voucherId.toString(),
  92.                             userId.toString()
  93.                         );
  94.         // 2.判断结果是否为0
  95.         int r = result.intValue();
  96.         if(r != 0) {
  97.             // 不为0代表无资格购买
  98.             return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
  99.         }
  100.         // 3.有购买资格则将下单信息保存到阻塞队列中
  101.         Long orderId = redisIdWorker.nextId("order");
  102.         // 创建订单
  103.         VoucherOrder voucherOrder = new VoucherOrder();
  104.         voucherOrder.setId(orderId);
  105.         voucherOrder.setUserId(userId);
  106.         voucherOrder.setVoucherId(voucherId);
  107.         // 放入阻塞队列
  108.         orderTasks.add(voucherOrder);
  109.         // 4.获取代理对象(线程异步执行,需要手动在方法内获取)
  110.         proxy = (IVoucherOrderService)AopContext.currentProxy(); // 获取当前类的代理对象  (需要引入aspectjweaver依赖,并且在实现类加入@EnableAspectJAutoProxy(exposeProxy = true)以此来暴露代理对象)
  111.         return Result.ok();
  112.     }
  113. }
复制代码
在上面代码中,我们使用下面代码创建了一个单线程的线程池。它保证所有提交的任务都按照提交的顺序执行,每次只有一个线程在工作。
  1. private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
复制代码
下面代码是一个常见的阻塞队列实现,具有固定大小(在这里是 1024 * 1024),它的作用是缓冲和排队任务。ArrayBlockingQueue 是一个线程安全的队列,它会自动处置惩罚线程之间的同步问题。当队列满时,调用 put() 方法的线程会被阻塞,直到队列有空间;当队列为空时,调用 take() 方法的线程会被阻塞,直到队列中有数据。
  1. private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
复制代码
在下面代码中,orderTasks 阻塞队列用于存放必要处置惩罚的订单对象,每个订单的处置惩罚逻辑都由 VoucherOrderTask 线程池中的线程异步执行:
  1. VoucherOrder voucherOrder = orderTasks.take();
  2. handleVoucherOrder(voucherOrder);
复制代码
之后我们必要调用 Runnable 接口去实现VoucherOrderTask类以此来创建线程方法
  1. private class VoucherOrderTask implements Runnable {
  2.     @Override
  3.     public void run() {
  4.         while (true) {
  5.             try {
  6.                 // 获取队列中的订单信息
  7.                 VoucherOrder voucherOrder = orderTasks.take(); // 获取订单
  8.                 // 创建订单
  9.                 handleVoucherOrder(voucherOrder);
  10.             } catch (Exception e) {
  11.                 log.error("处理订单异常", e);
  12.             }
  13.         }
  14.     }
  15. }
复制代码
随后将线程方法通过 submit() 方法将 VoucherOrderTask 提交到线程池中,这个任务是一个无限循环的任务,它会不停从阻塞队列中取出订单并处置惩罚,直到线程池关闭。这种方式使得订单处置惩罚任务可以异步执行,而不阻塞主线程,进步了系统的响应本领:
  1. @PostConstruct
  2. private void init() {
  3.     SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderTask());
  4. }
复制代码

但是在高并发的情况下就会产生大量订单,就会超出JVM阻塞队列的上线,并且每当服务重启大概宕机的情况发生,阻塞队列的所有订单任务就都会丢失。
所以为相识决这种情况,我们就要使用消息队列去解决这个问题:
   
  
什么是消息队列?

  
  消息队列(Message Queue, MQ)是一种用于在应用步伐之间传递消息的通讯方式。它允许应用步伐通过发送和吸取消息来解耦,从而进步系统的可扩展性、可靠性和机动性。消息队列通常用于异步通讯、任务队列、事件驱动架构等场景。
  消息队列的焦点概念 :

  • 生产者(Producer):发送消息到消息队列的应用步伐。
  • 消费者(Consumer):从消息队列中吸取并处置惩罚消息的应用步伐。
  • 队列(Queue):消息的存储区域,生产者将消息发送到队列,消费者从队列中获取消息。
  • 消息(Message):在生产者与消费者之间传递的数据单位。
  • Broker:消息队列的服务器,负责吸取、存储和转发消息。

   消息队列是在JVM以外的一个独立的服务,能够不受JVM内存的限制,并且存入MQ的信息都可以做持久化存储。
  具体教学可以查询下面链接:微服务架构 --- 使用RabbitMQ进行异步处置惩罚 

但是如许的方式是必要额外提供服务的,所以我们可以使用Redis提供的三种差别的方式来实现消息队列

  • List 布局实现消息队列
  • Pub/Sub(发布/订阅)模式
  • Stream 布局(Redis 5.0 及以上版本)(保举使用)(具体先容)

使用 List 布局实现消息队列:
Redis 的 List 数据布局是一个双向链表,支持从头部或尾部插入和弹出元素。我们可以利用 LPUSH 和 BRPOP 下令实现一个简单的消息队列。
   实现步骤:
  

  • 生产者:使用 LPUSH 将消息推入队列。
  • 消费者:使用 BRPOP 阻塞地从队列中获取消息。
  生产者代码:
  1. import redis.clients.jedis.Jedis;
  2. public class ListProducer {
  3.     public static void main(String[] args) {
  4.         Jedis jedis = new Jedis("localhost", 6379); // 连接 Redis
  5.         String queueName = "myQueue";
  6.         // 发送消息
  7.         for (int i = 1; i <= 5; i++) {
  8.             String message = "Message " + i;
  9.             jedis.lpush(queueName, message); // 将消息推入队列
  10.             System.out.println("Sent: " + message);
  11.         }
  12.         jedis.close(); // 关闭连接
  13.     }
  14. }
复制代码
消费者代码:
  1. import redis.clients.jedis.Jedis;
  2. public class ListConsumer {
  3.     public static void main(String[] args) {
  4.         Jedis jedis = new Jedis("localhost", 6379); // 连接 Redis
  5.         String queueName = "myQueue";
  6.         while (true) {
  7.             // 阻塞获取消息,超时时间为 0(无限等待)
  8.             var result = jedis.brpop(0, queueName);
  9.             String message = result.get(1); // 获取消息内容
  10.             System.out.println("Received: " + message);
  11.         }
  12.     }
  13. }
复制代码
  

  • 优点:简单易用,适合轻量级场景。
  • 缺点不支持消息确认机制,消息一旦被消费(从队列内取出)就会从队列中删除。并且只支持单消费者(一个消息只能拿出一次)
  
使用 Pub/Sub 模式实现消息队列: 
Redis 的 Pub/Sub 模式是一种发布-订阅模子,生产者将消息发布到频道,消费者订阅频道以吸取消息。
   实现步骤:
  

  • 生产者:使用 PUBLISH 下令向频道发布消息。
  • 消费者:使用 SUBSCRIBE 下令订阅频道。
  生产者代码:
  1. import redis.clients.jedis.Jedis;
  2. public class PubSubProducer {
  3.     public static void main(String[] args) {
  4.         Jedis jedis = new Jedis("localhost", 6379); // 连接 Redis
  5.         String channelName = "myChannel";
  6.         // 发布消息
  7.         for (int i = 1; i <= 5; i++) {
  8.             String message = "Message " + i;
  9.             jedis.publish(channelName, message); // 发布消息到频道
  10.             System.out.println("Published: " + message);
  11.         }
  12.         jedis.close(); // 关闭连接
  13.     }
  14. }
复制代码
 消费者代码:
  1. import redis.clients.jedis.Jedis;
  2. import redis.clients.jedis.JedisPubSub;
  3. public class PubSubConsumer {
  4.     public static void main(String[] args) {
  5.         Jedis jedis = new Jedis("localhost", 6379); // 连接 Redis
  6.         String channelName = "myChannel";
  7.         // 创建订阅者
  8.         JedisPubSub subscriber = new JedisPubSub() {
  9.             @Override
  10.             public void onMessage(String channel, String message) {
  11.                 System.out.println("Received: " + message);
  12.             }
  13.         };
  14.         // 订阅频道
  15.         jedis.subscribe(subscriber, channelName);
  16.     }
  17. }
复制代码
  

  • 优点:支持一对多的消息广播。
  • 缺点:消息是即时的,假如消费者不在线,消息会丢失。
  
但是上面两方式都是有缺点的:

  • 不支持消息确认机制,消息一旦被消费(从队列内取出)就会从队列中删除。并且只支持单消费者(一个消息只能拿出一次)
  • 消息是即时的,假如消费者不在线,消息会丢失。
所以根据上面的两种方式,我们推出一款全新的方式 ->
使用 Stream 布局实现消息队列:
   Redis Stream 是一种强大的数据布局,用于管理消息流。它将消息存储在 Redis 中,并允许消费者按顺序获取消息。Stream 具有以下特点:
  

  • 有序消息:消息按插入顺序分列。
  • 消费者组:一个消费者组可以有多个消费者,每个消费者可以独立消费差别的消息。
  • 消息 ID:每条消息都有唯一的 ID(如:1588890470850-0),ID 按时间戳天生。
  • 自动分配消息:多个消费者可以从 Stream 中并行消费消息,保证消息不会重复消费。
  


在 Redis Stream 中,一个队列可以有多个消费者组,每个消费者组可以独立地消费队列中的消息。每个消费者组内有多个消费者,而消费者是基于 消费者名称 进行辨认的。 
   消费者组的工作方式:

  

  • 每个消费者组拥有自己的 消费进度,也就是每个消费者组会从 自己独立的消息 ID 开始消费
  • 多个消费者组之间是相互独立的,即使它们消费的是同一个队列,它们也可以从差别的位置开始消费队列中的消息。
  • 每个消费者组都可以有多个 消费者(在同一个组内,多个消费者可以并行消费同一个队列的消息,但每个消息在消费者组内只能被一个消费者处置惩罚一次)。
  假设有一个队列(Stream)mystream,可以为它创建多个消费者组:
  1. XGROUP CREATE mystream group1 $ MKSTREAM
  2. XGROUP CREATE mystream group2 $ MKSTREAM
复制代码
如许,mystream 队列上就有了两个消费者组:group1 和 group2。每个消费者组可以有自己的消费者并从该队列中读取消息。此时,group1 和 group2 都在消费同一个队列 mystream,但它们的消费进度是独立的,它们各自有自己的消息 ID 记录。
  每个消费者组可以有多个消费者,而每个消费者通过一个 唯一的消费者名称 来标识。
  
  
每个消费者组有独立的消费进度

  
  每个消费者组会记录自己的消费进度,也就是它消费到队列中的 哪个消息 ID。即使多个消费者组在消费同一个消息队列,它们每个组都会从 差别的消费位置(消息 ID)开始读取消息。
  例如,假设有一个队列 mystream,同时有两个消费者组 group1 和 group2,它们都从 mystream 队列中读取消息:
  

  • group1 从 mystream 队列中的消息 id1 开始消费,group1 的进度会记录在 Redis 中。
  • group2 从 mystream 队列中的消息 id2 开始消费,group2 的进度也会记录在 Redis 中。
  消费进度互不干扰,即便 group1 和 group2 都在消费 mystream 队列,它们的消费位置是独立的。
  
  
消费者组内部的消息消费

  
  一个消费者组内的消费者会 共享 组内的消息。即使有多个消费者,每条消息 在消费者组内部只会被 一个消费者 消费。消费者之间会并行处置惩罚消息,但每条消息只会被一个消费者处置惩罚。
  举个例子:假设 group1 中有三个消费者 consumer1、consumer2、consumer3,假如队列 mystream 有 6 条消息,那么它们会如下消费:
  

  • consumer1 处置惩罚消息 1、2
  • consumer2 处置惩罚消息 3、4
  • consumer3 处置惩罚消息 5、6
  但对于消费者组 group2,假如它有自己的消费者,group2 内的消费者也会并行消费 mystream 中的消息,而 group1 和 group2 之间没有直接关系。
  首先初始化一个消息队列:
在项目启动时,确保 Redis 中存在对应的 Stream 和消费者组。可以通过步伐在启动时检查并创建(假如不存在的话)。
  1. @Configuration
  2. public class RedisStreamConfig {
  3.     @Autowired
  4.     private StringRedisTemplate redisTemplate;
  5.     private static final String STREAM_KEY = "mystream";
  6.     private static final String GROUP_NAME = "mygroup";
  7.     @PostConstruct
  8.     public void init() {
  9.         // 检查消费者组是否存在,若不存在则创建
  10.         try {
  11.             // 如果消费者组不存在则会抛出异常,我们捕获异常进行创建
  12.             redisTemplate.opsForStream().groups(STREAM_KEY);
  13.         } catch (Exception e) {
  14.             // 创建消费者组,起始位置为 $ 表示从末尾开始消费新消息
  15.             redisTemplate.opsForStream().createGroup(STREAM_KEY, GROUP_NAME);
  16.         }
  17.     }
  18. }
复制代码
  注意:
  

  • opsForStream().groups(STREAM_KEY):查询消费者组是否已存在。
  • opsForStream().createGroup(STREAM_KEY, GROUP_NAME):假如没有消费者组,则创建一个新的组。
  随后我们生产者发送消息示例:
  1. @Service  
  2. public class RedisStreamProducerService {  // 定义生产者服务类 RedisStreamProducerService
  3.     private static final String STREAM_KEY = "mystream";  // 定义 Redis Stream 的名称,这里指定队列名为 "mystream"
  4.     @Autowired  
  5.     private StringRedisTemplate redisTemplate;
  6.     public void sendMessage(String content) {  // 定义一个方法,发送消息到 Redis Stream,参数 content 是消息的内容
  7.         Map<String, String> map = new HashMap<>();  // 创建一个 Map 用来存储消息内容
  8.         map.put("content", content);  // 将消息内容添加到 Map 中,键是 "content",值是传入的内容
  9.         // 在消息队列中添加消息,调用 StringRedisTemplate 的 opsForStream 方法
  10.         RecordId recordId = redisTemplate.opsForStream()  // 获取操作 Redis Stream 的操作对象
  11.                 .add(StreamRecords.objectBacked(map)  // 创建一个 Stream 记录,将 Map 转化为对象记录
  12.                 .withStreamKey(STREAM_KEY));  // 设置该记录属于的 Stream(消息队列)的名称
  13.         // 输出记录的 ID,表示消息已经成功发送
  14.         System.out.println("消息发送成功,id: " + recordId.getValue());  // 打印消息的 ID,表明该消息已经被成功加入到 Stream 中
  15.     }
  16. }
复制代码
  RecordId 是 Spring Data Redis 中的一个类,用来表示 消息的唯一标识符。它对应 Redis Stream 中的 消息 ID,该 ID 是 Redis Stream 中每条消息的唯一标识。Redis 中的消息 ID 通常是由时间戳和序号组成的(如 1588890470850-0)。
  主要功能:

  

  • 表示消息 ID:RecordId 是一个封装类,表示 Redis Stream 中消息的 ID。
  • 用于辨认和操纵消息:在消费和确认消息时,RecordId 用来标识每条消息的唯一性,并资助 Redis 确定消息是否已经被消费
  使用场景:

  RecordId 用来标识从 Stream 中读取到的消息,我们可以通过 RecordId 来进行消息的确认、删除或其他操纵。
  1. RecordId recordId = redisTemplate.opsForStream().add(StreamRecords.objectBacked(map).withStreamKey("mystream"));
复制代码
通过 StreamRecords.objectBacked(map) 将 map 对象作为消息内容,并用 add 方法将其写入 Stream。
  在然后编写消费者服务:
   使用 RedisTemplate 的 read 方法(底层执行的是 XREADGROUP 下令)从消费者组中拉取消息,并进行处置惩罚。消费者可以采用定时任务或背景线程不停轮询
  1. @Slf4j  
  2. @Service  
  3. public class RedisStreamConsumerService {
  4.     private static final String STREAM_KEY = "mystream";  // Redis Stream 的名称,这里指定队列名为 "mystream"
  5.     private static final String GROUP_NAME = "mygroup";  // 消费者组的名称,多个消费者可以通过组名共享消费队列
  6.     private static final String CONSUMER_NAME = "consumer-1";  // 消费者的名称,消费者名称在同一消费者组内必须唯一
  7.     @Autowired  
  8.     private StringRedisTemplate redisTemplate;
  9.     @PostConstruct  // 使用该注解能让方法在 Spring 完成依赖注入后自动调用,用于初始化任务
  10.     @Async  // 将该方法标记为异步执行,允许它在单独的线程中运行,不会阻塞主线程,@EnableAsync 需要在配置类中启用
  11.     public void start() {  // 启动方法,在应用启动时执行
  12.         // 无限循环,不断从 Redis Stream 中读取消息(可以改为定时任务等方式)
  13.         while (true) {
  14.             try {
  15.                 // 设置 Stream 读取的阻塞超时,设置最多等待 2 秒
  16.                 StreamReadOptions options = StreamReadOptions.empty().block(Duration.ofSeconds(2));
  17.                 // 从指定的消费者组中读取消息,">" 表示只消费未被消费过的消息
  18.                 List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
  19.                         Consumer.from(GROUP_NAME, CONSUMER_NAME),  // 指定消费者组和消费者名称
  20.                         options,  // 设置读取选项,包含阻塞时间
  21.                         StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed())  // 从最后消费的消息开始读取
  22.                 );
  23.                 // 如果没有消息,继续循环读取
  24.                 if (messages == null || messages.isEmpty()) {
  25.                     continue;  
  26.                 }
  27.                 // 处理每一条读取到的消息
  28.                 for (MapRecord<String, Object, Object> message : messages) {
  29.                     String messageId = message.getId();  // 获取消息的唯一标识符(ID)
  30.                     Map<Object, Object> value = message.getValue();  // 获取消息内容(以 Map 形式存储)
  31.                     log.info("接收到消息,id={},内容={}", messageId, value);  // 打印日志,记录消息 ID 和内容
  32.                     // 在这里加入业务逻辑处理
  33.                     // 例如处理消息并执行相应的操作
  34.                     // ...
  35.                     // 消息处理成功后,需要确认消息已经被消费(通过 XACK 命令)
  36.                     redisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP_NAME, messageId);  // 确认消费的消息
  37.                 }
  38.             } catch (Exception e) {
  39.                 log.error("读取 Redis Stream 消息异常", e);  // 异常捕获,记录错误日志
  40.             }
  41.         }
  42.     }
  43. }
复制代码
  MapRecord<String, Object, Object> 是 Spring Data Redis 用来表示 Redis Stream 中的 消息记录 的类。它不仅包含了消息的 ID,还包含了消息的内容(即消息数据)。在 Redis 中,每条消息都存储为一个 key-value 对。
  主要功能:

  

  • 封装消息 ID 和消息内容:MapRecord 用来封装消息的 ID 和消息的内容。
  • 消息的内容:消息的内容通常是一个 键值对(Map<String, Object>),可以是恣意对象的数据布局(例如,JSON、Map 或其他序列化对象)。
  字段:

  

  • getId():返回消息的 ID(RecordId 类型)。
  • getValue():返回消息的内容,以 Map<Object, Object> 的情势。
  使用场景:

  MapRecord 是用来表示从 Stream 中读取到的消息,它将消息的 ID 和内容(键值对)封装在一起。你可以使用 MapRecord 来获取消息的 ID 和内容并处置惩罚。
  1. MapRecord<String, Object, Object> message = redisTemplate.opsForStream().read(Consumer.from("mygroup", "consumer1"), options, StreamOffset.create("mystream", ReadOffset.lastConsumed()));
复制代码
在这个例子中,message 是一个 MapRecord 实例,它封装了从 mystream 队列中读取到的消息。我们可以通过 message.getId() 获取消息 ID,通过 message.getValue() 获取消息内容。
    在消费者中,我们使用 MapRecord<String, Object, Object> 来封装消息,获取 message.getId() 来获取消息的 ID(RecordId),以及通过 message.getValue() 获取消息的内容。 随后在处置惩罚完消息后,调用 acknowledge() 来确认消息已经被消费。
  最后启动异步支持:
  1. @SpringBootApplication
  2. @EnableAsync // 启动异步支持
  3. public class MyApplication {
  4.     public static void main(String[] args) {
  5.         SpringApplication.run(MyApplication.class, args);
  6.     }
  7. }
复制代码
通过这种方式,Spring Data Redis 提供了高效且类型安全的接口来操纵 Redis Stream,资助我们在分布式系统中实现高效的消息队列。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

魏晓东

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表