redis实现消息队列的几种方式

打印 上一主题 下一主题

主题 1000|帖子 1000|积分 3000

一、了解

众所周知,redis是我们日常开辟过程中使用最多的非关系型数据库,也是消息中间件。现实上除了常用的rabbitmq、rocketmq、kafka消息队列(大家自己下去研究吧~模式都是通用的),我们也能使用redis实现消息队列。由于其他中间件大概更适用于大型/企业级项目,在咱们项现在期不需要这么多的数据,redis跟我们也是高度集成的。这里就简化了技能栈。
二、常用的几种使用redis实现的消息队列方式

1、List数据结构

Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)大概尾部(右边)一个列表最多可以包含 232 - 1 个元素 (4294967295, 每个列表凌驾40亿个元素)。
这里的列表大家可以想想为一个横着的通道,假设我现在往右边插入第一条数据,这个元素就会被放在最左边,接着再放入第二条数据,它就会在左边第二条,以此类推…插入了100条数据。 假设这个时候我要取出第一条,我就从最左边取就好。
这就变相实现了有序消息队列。具体实现大家自己研究
优点:操作方便,可以有序的取出自己插入的数据
缺点:不能进行及时消费,没有消费者
2、pub/sub 订阅消费模式

这就是传统的生产者->队列->消费者的模式。生产者的消息所有订阅者都能收到。

优点:实现了发布订阅模式,可以及时进行消费
缺点:没有消息持久化,在体系瓦解、宕机的时候;消息会丢失
3、sorted set有序集合Redis

有序集合和集合一样也是 string 类型元素的集合,且不答应重复的成员。不同的是每一个元素都会关联一个double分数,redis就是通过分数为集合中的成员进行从大到小的分列。
有序集合的成员是唯一的,但是score是可以重复的。
天生消息直接往s-set中插入数据,将score设置为接收到数据的13位时间戳;需要使用的时候再根据score大小有序取出来就行了。
看到这里是不是大家能想到,既然每条消息都带有时间,那我是不是可以顺手实现延迟队列。
这里只需要将score设置为 担当消息的时间戳+延迟时间 。我在使用的时候获取当天时间戳的数据,如许就实现了延迟消息队列。
优点:操作方便,可以实现延迟队列
缺点:不能及时进行消费
4、stream流 (redis5.0版本以上才有 重点讲)

Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时候的数据,并且能记着每一个客户端的访问位置,还能保证消息不丢失。
Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

每个stream流都有自己的名称,它是redis的key,也可以理解为队列名称。
Consumer Group :消费组,使用 XGROUP CREATE 下令创建,一个消费组有多个消费者(Consumer)。
last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,恣意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。
stream常用下令



  • XADD 界说stream流,写入消息体
  1. XADD mystream * field1 A field2 B field3 C field4 D
  2. mystream:自定义流名称
  3. *:由redis生成流的id(也可以自定义,但是得保证自增唯一)
  4. field1-A \field2-B\field3-C :保存的消息体,key-value形式
  5. -- 举例
  6. redis> XADD mystream * name Sara surname OConnor
  7. "1601372323627-0"
复制代码


  • XDEL 删除消息
  1. > XADD mystream * a 1
  2. 1538561698944-0
  3. > XADD mystream * b 2
  4. 1538561700640-0
  5. > XADD mystream * c 3
  6. 1538561701744-0
  7. > XDEL mystream 1538561700640-0
  8. (integer) 1
  9. 127.0.0.1:6379> XRANGE mystream - +
  10. 1) 1) 1538561698944-0
  11.    2) 1) "a"
  12.       2) "1"
  13. 2) 1) 1538561701744-0
  14.    2) 1) "c"
  15.       2) "3"
复制代码


  • XRANGE 获取消息队列数据
  1. XRANGE key start end [COUNT count]
  2. key:strem流名称
  3. start:开始值,- 表示最小值
  4. end:结束值,+ 表示最大值
  5. -- 举例:
  6. redis> XRANGE mystream - + 2
  7. 从mystrem全部数据中取出两条数据
  8. redis> XRANGE mystream + - 1
  9. 从mystream倒叙取一条数据
复制代码


  • XREVRANGE 主动过滤已删除的消息
  1. redis> XADD writers * name Virginia surname Woolf
  2. "1601372731458-0"
  3. redis> XADD writers * name Jane surname Austen
  4. "1601372731459-0"
  5. redis> XADD writers * name Toni surname Morrison
  6. "1601372731459-1"
  7. redis> XADD writers * name Agatha surname Christie
  8. "1601372731459-2"
  9. redis> XADD writers * name Ngozi surname Adichie
  10. "1601372731459-3"
  11. redis> XLEN writers
  12. (integer) 5
  13. redis> XREVRANGE writers + - COUNT 1
  14. 1) 1) "1601372731459-3"
  15.    2) 1) "name"
  16.       2) "Ngozi"
  17.       3) "surname"
  18.       4) "Adichie"
  19. redis>
复制代码


  • XREAD 阻塞大概非阻塞获取消息
  1. # 从 Stream 头部读取两条消息
  2. > XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
  3. 1) 1) "mystream"
  4.    2) 1) 1) 1526984818136-0
  5.          2) 1) "duration"
  6.             2) "1532"
  7.             3) "event-id"
  8.             4) "5"
  9.             5) "user-id"
  10.             6) "7782813"
  11.       2) 1) 1526999352406-0
  12.          2) 1) "duration"
  13.             2) "812"
  14.             3) "event-id"
  15.             4) "9"
  16.             5) "user-id"
  17.             6) "388234"
  18. 2) 1) "writers"
  19.    2) 1) 1) 1526985676425-0
  20.          2) 1) "name"
  21.             2) "Virginia"
  22.             3) "surname"
  23.             4) "Woolf"
  24.       2) 1) 1526985685298-0
  25.          2) 1) "name"
  26.             2) "Jane"
  27.             3) "surname"
  28.             4) "Austen"
  29. count :数量
  30. milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
  31. key :队列名
  32. id :消息 ID
复制代码


  • XGROUP CREATE 创建消费者组
  1. XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  2. key :队列名称,如果不存在就创建
  3. groupname :组名。
  4. $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略
  5. 从头开始消费:
  6. XGROUP CREATE mystream consumer-group-name 0-0  
  7. 从尾部开始消费:
  8. XGROUP CREATE mystream consumer-group-name $
复制代码
以上就是常用的steam流的下令,大家下来自己测试,练习。
三、springboot整合redis stream流

java中提供了连接redis的客户端,jedis和lettuce、redistemplate;RedisTemplate 是 Spring Data Redis 提供的一个高级抽象层,封装了 Jedis 或 Lettuce 等底层客户端。
它提供了丰富的功能,如序列化、事务支持、键过期等。这里紧张讲主流的redistemplate整合,大家以后能直接使用。
及时消费

及时消费顾名思义,生产者发送消息,消费者立马进行消费逻辑处置处罚。


  • RedisStreamUtils工具类,方便后续进行stream操作没根据自己项目需求来界说
  1. @Configuration
  2. @SuppressWarnings("all")
  3. public class RedisStreamUtils {
  4.     @Resource
  5.     private RedisTemplate<String, Object> redisTemplate;
  6.     /**
  7.      * 创建消费组
  8.      *
  9.      * @param streamKey   键名称
  10.      * @param group 组名称
  11.      * @return {@link String}
  12.      */
  13.     public String createGroup(String streamKey, String group) {
  14.         return redisTemplate.opsForStream().createGroup(streamKey, group);
  15.     }
  16.     /**
  17.      * 获取消费者信息
  18.      *
  19.      * @param streamKey   键名称
  20.      * @param group 组名称
  21.      * @return {@link StreamInfo.XInfoConsumers}
  22.      */
  23.     public StreamInfo.XInfoConsumers queryConsumers(String streamKey, String group) {
  24.         return redisTemplate.opsForStream().consumers(streamKey, group);
  25.     }
  26.     /**
  27.      * 查询组信息
  28.      *
  29.      * @param streamKey 键名称
  30.      * @return
  31.      */
  32.     public StreamInfo.XInfoGroups queryGroups(String streamKey) {
  33.         return redisTemplate.opsForStream().groups(streamKey);
  34.     }
  35.     // 添加Map消息
  36.     public String addMap(String streamKey, Map<String, Object> value) {
  37.         return Objects.requireNonNull(redisTemplate.opsForStream().add(streamKey, value)).getValue();
  38.     }
  39.     // 读取消息
  40.     public List<MapRecord<String, Object, Object>> read(String streamKey) {
  41.         return redisTemplate.opsForStream().read(StreamOffset.fromStart(streamKey));
  42.     }
  43.     // 确认消费
  44.     public Long ack(String streamKey, String group, String... recordIds) {
  45.         return redisTemplate.opsForStream().acknowledge(streamKey, group, recordIds);
  46.     }
  47.     // 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁
  48.     public Long del(String key, String... recordIds) {
  49.         return redisTemplate.opsForStream().delete(key, recordIds);
  50.     }
  51.     // 判断是否存在key
  52.     public boolean hasKey(String key) {
  53.         Boolean aBoolean = redisTemplate.hasKey(key);
  54.         return aBoolean != null && aBoolean;
  55.     }
  56. }
复制代码


  • RedisConfig配置文件
  1. @Configuration
  2. @Slf4j
  3. @RequiredArgsConstructor
  4. public class RedisConfig {
  5.     private final RedisStreamUtils redisStreamUtil;
  6.     private final Environment environment;
  7.         //消费者处理消息配置
  8.     @Bean
  9.     public Subscription subscription(RedisConnectionFactory factory) {
  10.         AtomicInteger index = new AtomicInteger(1);
  11.         //获取系统处理器数量 创建线程池,开启守护线程
  12.         int processors = Runtime.getRuntime().availableProcessors();
  13.         ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
  14.                 new LinkedBlockingDeque<>(), r -> {
  15.             Thread thread = new Thread(r);
  16.             thread.setName("async-stream-consumer-" + index.getAndIncrement());
  17.             thread.setDaemon(true);
  18.             return thread;
  19.         });
  20.         //流消息监听容器参数设置
  21.         StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
  22.                 StreamMessageListenerContainer
  23.                         .StreamMessageListenerContainerOptions
  24.                         .builder()
  25.                         // 一次最多获取多少条消息
  26.                         .batchSize(5)
  27.                         //执行线程池
  28.                         .executor(executor)
  29.                         //阻塞消息读取(延迟消息)
  30.                         .pollTimeout(Duration.ofSeconds(1))
  31.                         //异常处理
  32.                         .errorHandler(throwable -> {
  33.                             log.error("[MQ handler exception]", throwable);
  34.                             throwable.printStackTrace();
  35.                         })
  36.                         .build();
  37.                 //通过redis连接工厂,创建流消息监听容器
  38.                 var listenerContainer = StreamMessageListenerContainer.create(factory, options);
  39.                 //初始化流和消费者处理配置
  40.                 //初始化流和消费者处理配置
  41.         Subscription subscription = initStreamAndConsumer(listenerContainer);
  42.                 //开启监听容器
  43.         listenerContainer.start();
  44.         return subscription;
  45.     }
  46.         private Subscription initStreamAndConsumer(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer){
  47.                 //↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
  48.                 //这一部分可以不用配置,可以根据自己的实际情况配置
  49.         //该key和group可根据需求自定义配置
  50.         String streamName = "mystream";
  51.         String groupname = "mygroup";
  52.         initStream(streamName, groupname);
  53.         // 手动ask消息
  54.         //消费者处理完消息之后,会进行确认;这里有一个pending状态会变成已处理
  55.         Subscription subscription = listenerContainer.receive(Consumer.from(groupname, "zhuyazhou"),
  56.                 StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumer(redisStreamUtil));
  57.         // 自动ask消息
  58.            /* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
  59.                     StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/
  60.         //这一部分可以不用配置,可以根据自己的实际情况配置
  61.                 //↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
  62.                 return subscription;
  63.         }
  64.     private void initStream(String key, String group) {
  65.         boolean hasKey = redisStreamUtil.hasKey(key);
  66.         if (!hasKey) {
  67.             Map<String, Object> map = new HashMap<>(1);
  68.             map.put("field", "value");
  69.             //创建主题
  70.             String result = redisStreamUtil.addMap(key, map);
  71.             //创建消费组
  72.             redisStreamUtil.createGroup(key, group);
  73.             //将初始化的值删除掉
  74.             redisStreamUtil.del(key, result);
  75.             log.info("stream:{}-group:{} initialize success", key, group);
  76.         }
  77.     }
  78. }
复制代码
大家这里可以想一想,这种写法是不是符合生产过程中的创建队列/消费者的逻辑,是不是不方便。能不能在我需要的时候直接调用方法去创建???假设现在我新增了一个业务需求,需要用不同的业务逻辑去处置处罚,而且我希望定制不同的消费者应答模式,这个时候就需要一个通用方法去实现,这里我是如许做的。照旧在工具类中
创建redis流消息监听容器
紧张参数 :界说线程池、一次最大获取消息数、超时重新获取、非常处置处罚
  1.   @Bean
  2.     public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(RedisConnectionFactory factory) {
  3.         log.info("redis ip:{},port:{}",environment.getProperty("spring.data.redis.host"),environment.getProperty("spring.data.redis.port"));
  4.         AtomicInteger index = new AtomicInteger(1);
  5.         int processors = Runtime.getRuntime().availableProcessors();
  6.         ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
  7.                 new LinkedBlockingDeque<>(), r -> {
  8.             Thread thread = new Thread(r);
  9.             thread.setName("async-stream-consumer-" + index.getAndIncrement());
  10.             thread.setDaemon(true);
  11.             return thread;
  12.         });
  13.         StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
  14.                 StreamMessageListenerContainer
  15.                         .StreamMessageListenerContainerOptions
  16.                         .builder()
  17.                         // 一次最多获取多少条消息
  18.                         .batchSize(5)
  19.                         .executor(executor)
  20.                         .pollTimeout(Duration.ofSeconds(3))
  21.                         .errorHandler(throwable -> {
  22.                             log.error("[MQ handler exception]", throwable);
  23.                             throwable.printStackTrace();
  24.                         })
  25.                         .build();
  26.         return StreamMessageListenerContainer.create(factory, options);
  27.     }
  28. //业务需求调用此方法即可
  29. public void addNewStreamAndSubscribe(String streamName, String groupName, String consumerId, StreamListener listener) {
  30.         initStream(streamName, groupName);
  31.         subscribeToStream(streamName, groupName, Consumer.from(groupName, consumerId), listener);
  32.     }
  33.     public void addNewStreamAndSubscribe(String streamName, String groupName, String consumerId, RedisConsumer listener,Map<String,Object> recodMap) {
  34.         initStream(streamName, groupName);
  35.         subscribeToStream(streamName, groupName, Consumer.from(groupName, consumerId), listener);
  36.         addMap(streamName, recodMap);
  37.     }
  38.     private void subscribeToStream(String streamName, String groupName, Consumer consumer, StreamListener listener) {
  39.         StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = streamMessageListenerContainer(redisConnectionFactory);
  40.         Subscription subscription = container.receive(Consumer.from(groupName, consumer.getName()),
  41.                 StreamOffset.create(streamName, ReadOffset.lastConsumed()), listener);
  42.         //开始消息容器监听
  43.         container.start();
  44.         log.info("Subscribed to stream: {} with group: {} and consumer: {}", streamName, groupName, consumer);
  45.     }
复制代码
streamMessageListenerContainer中的 .batchSize(1) 设置需要着重说一下。意思是在消费者在监听到数据的时候,一次从redis中取出的多少条数据,假设我设置1,就意味着我的监听器会redis中取出1条未消费的数据,随后进入消费者逻辑,处置处罚完毕之后返回;继承由监听器读取1条数据,在进入消费者逻辑;这个值设置得越小消息处置处罚数据越快,但是也会增加redis链接的资源。
较大的 batchSize 可以减少与 Redis 服务器的交互次数,降低网络通讯开销,进步处置处罚效率。
较小的 batchSize 适用于需要低延迟处置处罚的场景,但会增加网络通讯开销和 CPU 使用率。



  • RedisConsumer消费者
  1. @Component("RedisConsumer")
  2. @RequiredArgsConstructor
  3. @Slf4j
  4. public class RedisConsumer implements StreamListener<String, MapRecord<String,String,String>> {
  5.     private final RedisStreamUtils redisStreamUtils;
  6.     @Override
  7.     public void onMessage(MapRecord<String, String, String> message) {
  8.         try {
  9.             log.info("RedisConsumer1获取到了消息:{}",message);
  10.             String streamKey = message.getStream();
  11.             RecordId recordId = message.getId();
  12.             Map<String, String> value = message.getValue();
  13.             //获取这个流下 所有的消费者组
  14.             StreamInfo.XInfoGroups xInfoGroups = redisStreamUtils.queryGroups(streamKey);
  15.             //处理逻辑
  16.             //↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
  17.             log.info("【streamKey】= {},【recordId】= {},【msg】= {}",streamKey,recordId, value);
  18.              //手动确认ack消息,并删除已处理的消息
  19.              //我这里使用手动
  20.             xInfoGroups.forEach(xInfoGroup -> redisStreamUtils.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));
  21.             //自动确认消息 ---------自己下来研究
  22.             //↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
  23.             //根据业务场景来看是否需要删除消息
  24. //        redisStreamUtils.del(streamKey, recordId.getValue());
  25.         } catch (Exception e) {
  26.             throw new ServiceException("消费异常");
  27.         }
  28.     }
  29. }
复制代码


  • RedisConsumer2消费者
  1. @Component("RedisConsumer2")
  2. @RequiredArgsConstructor
  3. @Slf4j
  4. public class RedisConsumer2 implements StreamListener<String, MapRecord<String,String,String>> {
  5.     private final RedisStreamUtils redisStreamUtils;
  6.     @Override
  7.     public void onMessage(MapRecord<String, String, String> message) {
  8.         try {
  9.             log.info("RedisConsumer2获取到了消息:{}",message);
  10.             String streamKey = message.getStream();
  11.             RecordId recordId = message.getId();
  12.             Map<String, String> value = message.getValue();
  13.             //获取这个流下 所有的消费者组
  14.             StreamInfo.XInfoGroups xInfoGroups = redisStreamUtils.queryGroups(streamKey);
  15.             //处理逻辑
  16.             //↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
  17.             log.info("【streamKey】= {},【recordId】= {},【msg】= {}",streamKey,recordId, value);
  18.             //↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
  19.             //手动确认ack消息,并删除已处理的消息
  20.             xInfoGroups.forEach(xInfoGroup -> redisStreamUtils.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));
  21. //        redisStreamUtils.del(streamKey, recordId.getValue());
  22.         } catch (Exception e) {
  23.             throw new ServiceException("消费异常");
  24.         }
  25.     }
  26. }
复制代码


  • RedisStreamcontroller模拟测试
  1. @RequestMapping(value = "/redisStream")
  2. @RestController
  3. @RequiredArgsConstructor
  4. @Slf4j
  5. @SuppressWarnings("all")
  6. public class RedisStreamController {
  7.     private final RedisStreamUtils redisStreamUtils;
  8.     private final RedisConsumer redisConsumer;
  9.     private final RedisTemplate redisTemplate;
  10.     private final ApplicationContext applicationContext;
  11.     @GetMapping(value = "/addNewStreamAndSubscribe")
  12.     public ResultVO addNewStreamAndSubscribe(@RequestParam("streamKey") String streamKey,
  13.                                               @RequestParam("groupName") String groupName,
  14.                                               @RequestParam("consumer")String consumer,
  15.                                              @RequestParam("consumerClass") String consumerClass){
  16.         try {
  17.             // 获取实现类的实例
  18.             StreamListener consumerInstance = (StreamListener) applicationContext.getBean(consumerClass);
  19.             redisStreamUtils.addNewStreamAndSubscribe(streamKey, groupName, consumer,consumerInstance );
  20.         } catch (Exception e) {
  21.             throw new RuntimeException(e);
  22.         }
  23.         return ResultVO.success();
  24.     }
  25.     @GetMapping(value = "/addMap")
  26.     public ResultVO addMap(@RequestParam("streamKey") String streamKey,
  27.                            @RequestParam("key")String key,
  28.                            @RequestParam("value")String value) {
  29.         HashMap<String, Object> objectObjectHashMap = new HashMap<>();
  30.         objectObjectHashMap.put(key,value);
  31.         redisStreamUtils.addMap(streamKey,objectObjectHashMap);
  32.         return ResultVO.success();
  33.     }
  34.     @GetMapping(value = "/getGroup")
  35.     public ResultVO getGroup(@RequestParam("streamKey") String streamKey,
  36.                              @RequestParam("groupName") String groupName) {
  37.         boolean b = redisStreamUtils.hasKey(streamKey);
  38.         if(b){
  39.             StreamInfo.XInfoGroups xInfoGroups = redisStreamUtils.queryGroups(streamKey);
  40.             List<Object> list = new ArrayList<>();
  41.             for (StreamInfo.XInfoGroup xInfoGroup : xInfoGroups) {
  42.                 StreamInfo.XInfoConsumers xInfoConsumers = null;
  43.                 if(StrUtil.isNotEmpty(groupName)){
  44.                     xInfoConsumers = redisStreamUtils.queryConsumers(streamKey, groupName);
  45.                     for (StreamInfo.XInfoConsumer xInfoConsumer : xInfoConsumers) {
  46.                         log.info("group:{},pending:{},consumerCount:{},consumerName:{},lastDeliveryId:{}"
  47.                                 ,xInfoGroup.groupName(),xInfoGroup.pendingCount(),xInfoGroup.consumerCount(),xInfoConsumer.consumerName(),xInfoGroup.lastDeliveredId());
  48.                     }
  49.                 }
  50.             }
  51.         }else{
  52.             log.info("streamKey不存在:{}",streamKey);
  53.             return ResultVO.error("streamKey不存在");
  54.         }
  55.         return ResultVO.success();
  56.     }
  57.     @GetMapping(value = "/delStream")
  58.     public ResultVO delStream(@RequestParam("streamKey") String streamKey){
  59.         redisTemplate.delete(streamKey);
  60.         return ResultVO.success();
  61.     }
  62.     @GetMapping(value = "/readMsg")
  63.     public ResultVO readMsg(@RequestParam("streamKey") String streamKey,
  64.                             @RequestParam("groupName") String groupName,
  65.                             @RequestParam("consumer") String consumer){
  66.         // 读取消息,每次读取最多 5 条
  67.         List read = redisTemplate.opsForStream().read(
  68.                 Consumer.from(groupName, consumer),
  69.                 StreamReadOptions.empty().count(10).block(Duration.ofSeconds(1)),
  70.                 StreamOffset.create(streamKey, ReadOffset.lastConsumed())
  71.         );
  72.         return ResultVO.success(JSON.toJSONString(read));
  73.     }
复制代码
项目启动
调用/addNewStreamAndSubscribe接口



  • 创建流、监听容器
  • 消费者绑定流+消费者逻辑处置处罚类
  • 接收生产者消息方式(最新、偏移量)
  • 开启消息容器监听
调用/addMap接口,发送消息

如果只有一个消费者,那么当消费者出现非常的时候,直到服务恢复,会从上一次消费的数据开始进行消费。
假设现在消费者组有两个消费者,都绑定了同一个消息流,这个时候发送消息就是轮询访问。
RedisConsumer1获取到了消息
RedisConsumer2获取到了消息
RedisConsumer1获取到了消息
RedisConsumer2获取到了消息

如果consumer1出现了非常,这个时候consumer2会正常消费所有的数据。
stream本身就支持持久化数据,也是dbs和aof两种。不用担心数据丢失。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

立山

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表