Redis 消息队列详解

宁睿  论坛元老 | 2025-1-25 14:08:24 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1082|帖子 1082|积分 3256

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
Redis 消息队列详解

Redis 作为一个高性能的内存数据库,支持多种实现消息队列的方式,主要包罗:

  • Redis List(基于列表的队列)
  • Redis Pub/Sub(发布/订阅)
  • Redis Stream(消息流)
  • Redis Sorted Set(延迟队列)
差别的方式适用于差别的场景,下面具体解说各自的实现原理、适用场景以及示例代码。

1. 基于 Redis List 的消息队列

Redis 的 List 结构(链表)可以用来实现 使命队列,支持持久化存储,并允许消费者从队列中取出使命进行处理。
1.1. 根本原理



  • LPUSH queue_name message:将新使命参加队列 头部
  • RPUSH queue_name message:将新使命参加队列 尾部
  • LPOP queue_name / RPOP queue_name:从队列头部/尾部取出使命(非壅闭)。
  • BRPOP queue_name timeout:壅闭方式获取使命,如果队列为空,会等待新的使命到来。
1.2. 生产者代码

  1. import redis
  2. r = redis.Redis(host='localhost', port=6379, decode_responses=True)
  3. # 添加任务到队列
  4. r.lpush('task_queue', 'task1')
  5. r.lpush('task_queue', 'task2')
  6. print("Tasks added!")
复制代码
1.3. 消费者代码

  1. import redis
  2. r = redis.Redis(host='localhost', port=6379, decode_responses=True)
  3. while True:
  4.     task = r.brpop('task_queue', timeout=0)  # 阻塞模式
  5.     if task:
  6.         print(f"Processing task: {task[1]}")
复制代码
1.4. 适用场景



  • 使命队列(Task Queue):适用于 使命分发异步处理
  • 多消费者竞争模式:多个消费者会竞争消费队列中的使命,每个使命只能被一个消费者取走。
1.5. 问题与优化



  • 消息丢失:如果消费者取出使命后瓦解,使命就会丢失。可以利用 RPOPLPUSH 实现使命持久化
    1. task = r.rpoplpush('task_queue', 'processing_queue')
    2. if task:
    3.     process(task)
    4.     r.lrem('processing_queue', 1, task)  # 任务完成后删除
    复制代码
  • 限流:可以利用 Redis 令牌桶滑动窗口算法 控制使命消费速率。

2. 基于 Redis Pub/Sub 的实时消息队列

Redis Pub/Sub 适用于实时广播通知,但不支持消息存储。
2.1. 根本原理



  • PUBLISH channel message:发布消息到频道。
  • SUBSCRIBE channel:订阅者监听频道消息。
2.2. 生产者代码

  1. import redis
  2. r = redis.Redis(host='localhost', port=6379, decode_responses=True)
  3. r.publish('news_channel', 'Breaking News: Redis is awesome!')
复制代码
2.3. 消费者代码

  1. import redis
  2. r = redis.Redis(host='localhost', port=6379, decode_responses=True)
  3. pubsub = r.pubsub()
  4. pubsub.subscribe('news_channel')
  5. for message in pubsub.listen():
  6.     if message['type'] == 'message':
  7.         print(f"Received message: {message['data']}")
复制代码
2.4. 适用场景



  • 实时消息推送(如聊天系统、WebSocket)。
  • 事件驱动架构(如监控报警、状态变更通知)。
2.5. 问题与优化



  • 消息不会持久化,订阅者掉线后无法收到历史消息。
  • 不能回放消息,消息是实时的,一旦发送就无法重新获取。
  • 可以利用 Redis Stream 代替 来办理持久化问题。

3. 基于 Redis Stream 的持久化消息队列

Redis Stream 是一种高效、持久化的消息队列,支持消息存储、消费分组等特性,类似 Kafka。
3.1. 根本原理



  • XADD stream_name * field1 value1 field2 value2:添加消息。
  • XREAD COUNT 1 STREAMS stream_name last_id:读取消息。
  • XGROUP CREATE stream_name group_name $ MKSTREAM:创建消费组。
3.2. 生产者代码

  1. import redis
  2. r = redis.Redis(host='localhost', port=6379, decode_responses=True)
  3. r.xadd('event_stream', {'event': 'user_signup', 'user_id': '12345'})
  4. print("Event published!")
复制代码
3.3. 消费者代码

  1. import redis
  2. r = redis.Redis(host='localhost', port=6379, decode_responses=True)
  3. while True:
  4.     events = r.xread({'event_stream': '$'}, count=1, block=5000)
  5.     for stream, messages in events:
  6.         for message_id, message in messages:
  7.             print(f"Received: {message}")
复制代码
3.4. 适用场景



  • 大规模消息存储(Kafka 替换)。
  • 日志收集与分析
  • 事件溯源(能回放历史消息)。

4. 基于 Redis Sorted Set 的延迟队列

Sorted Set(有序聚集)可以用于定时使命调度
4.1. 生产者代码

  1. import time
  2. import redis
  3. r = redis.Redis(host='localhost', port=6379, decode_responses=True)
  4. r.zadd('delay_queue', {'task1': time.time() + 10})  # 10秒后执行
复制代码
4.2. 消费者代码

  1. import time
  2. import redis
  3. r = redis.Redis(host='localhost', port=6379, decode_responses=True)
  4. while True:
  5.     now = time.time()
  6.     tasks = r.zrangebyscore('delay_queue', 0, now)
  7.     for task in tasks:
  8.         print(f"Processing {task}")
  9.         r.zrem('delay_queue', task)
  10.     time.sleep(1)
复制代码
4.3. 适用场景



  • 定时使命(如定时短信、延迟实行)。
  • 超时检测(如订单逾期、缓存清理)。

5. 选择合适的 Redis 消息队列

方案持久化支持多消费者适用场景List✅竞争消费使命队列、异步使命Pub/Sub❌广播消费实时推送、通知Stream✅组消费事件流、日志收集Sorted Set✅竞争消费延迟使命
总结



  • 如果需要使命队列,利用 List(支持竞争消费,得当使命调度)。
  • 如果需要实时推送,利用 Pub/Sub(广播消息,不存储)。
  • 如果需要持久化消息流,利用 Stream(类似 Kafka,支持回放)。
  • 如果需要定时使命,利用 Sorted Set(得当延迟队列)。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

宁睿

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表