马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
Redis 消息队列详解
Redis 作为一个高性能的内存数据库,支持多种实现消息队列的方式,主要包罗:
- Redis List(基于列表的队列)
- Redis Pub/Sub(发布/订阅)
- Redis Stream(消息流)
- Redis Sorted Set(延迟队列)
差别的方式适用于差别的场景,下面具体解说各自的实现原理、适用场景以及示例代码。
1. 基于 Redis List 的消息队列
Redis 的 List 结构(链表)可以用来实现 使命队列,支持持久化存储,并允许消费者从队列中取出使命进行处理。
1.1. 根本原理
- LPUSH queue_name message:将新使命参加队列 头部。
- RPUSH queue_name message:将新使命参加队列 尾部。
- LPOP queue_name / RPOP queue_name:从队列头部/尾部取出使命(非壅闭)。
- BRPOP queue_name timeout:壅闭方式获取使命,如果队列为空,会等待新的使命到来。
1.2. 生产者代码
- import redis
- r = redis.Redis(host='localhost', port=6379, decode_responses=True)
- # 添加任务到队列
- r.lpush('task_queue', 'task1')
- r.lpush('task_queue', 'task2')
- print("Tasks added!")
复制代码 1.3. 消费者代码
- import redis
- r = redis.Redis(host='localhost', port=6379, decode_responses=True)
- while True:
- task = r.brpop('task_queue', timeout=0) # 阻塞模式
- if task:
- print(f"Processing task: {task[1]}")
复制代码 1.4. 适用场景
- 使命队列(Task Queue):适用于 使命分发 和 异步处理。
- 多消费者竞争模式:多个消费者会竞争消费队列中的使命,每个使命只能被一个消费者取走。
1.5. 问题与优化
- 消息丢失:如果消费者取出使命后瓦解,使命就会丢失。可以利用 RPOPLPUSH 实现使命持久化:
- task = r.rpoplpush('task_queue', 'processing_queue')
- if task:
- process(task)
- r.lrem('processing_queue', 1, task) # 任务完成后删除
复制代码 - 限流:可以利用 Redis 令牌桶 或 滑动窗口算法 控制使命消费速率。
2. 基于 Redis Pub/Sub 的实时消息队列
Redis Pub/Sub 适用于实时广播通知,但不支持消息存储。
2.1. 根本原理
- PUBLISH channel message:发布消息到频道。
- SUBSCRIBE channel:订阅者监听频道消息。
2.2. 生产者代码
- import redis
- r = redis.Redis(host='localhost', port=6379, decode_responses=True)
- r.publish('news_channel', 'Breaking News: Redis is awesome!')
复制代码 2.3. 消费者代码
- import redis
- r = redis.Redis(host='localhost', port=6379, decode_responses=True)
- pubsub = r.pubsub()
- pubsub.subscribe('news_channel')
- for message in pubsub.listen():
- if message['type'] == 'message':
- print(f"Received message: {message['data']}")
复制代码 2.4. 适用场景
- 实时消息推送(如聊天系统、WebSocket)。
- 事件驱动架构(如监控报警、状态变更通知)。
2.5. 问题与优化
- 消息不会持久化,订阅者掉线后无法收到历史消息。
- 不能回放消息,消息是实时的,一旦发送就无法重新获取。
- 可以利用 Redis Stream 代替 来办理持久化问题。
3. 基于 Redis Stream 的持久化消息队列
Redis Stream 是一种高效、持久化的消息队列,支持消息存储、消费分组等特性,类似 Kafka。
3.1. 根本原理
- XADD stream_name * field1 value1 field2 value2:添加消息。
- XREAD COUNT 1 STREAMS stream_name last_id:读取消息。
- XGROUP CREATE stream_name group_name $ MKSTREAM:创建消费组。
3.2. 生产者代码
- import redis
- r = redis.Redis(host='localhost', port=6379, decode_responses=True)
- r.xadd('event_stream', {'event': 'user_signup', 'user_id': '12345'})
- print("Event published!")
复制代码 3.3. 消费者代码
- import redis
- r = redis.Redis(host='localhost', port=6379, decode_responses=True)
- while True:
- events = r.xread({'event_stream': '$'}, count=1, block=5000)
- for stream, messages in events:
- for message_id, message in messages:
- print(f"Received: {message}")
复制代码 3.4. 适用场景
- 大规模消息存储(Kafka 替换)。
- 日志收集与分析。
- 事件溯源(能回放历史消息)。
4. 基于 Redis Sorted Set 的延迟队列
Sorted Set(有序聚集)可以用于定时使命调度。
4.1. 生产者代码
- import time
- import redis
- r = redis.Redis(host='localhost', port=6379, decode_responses=True)
- r.zadd('delay_queue', {'task1': time.time() + 10}) # 10秒后执行
复制代码 4.2. 消费者代码
- import time
- import redis
- r = redis.Redis(host='localhost', port=6379, decode_responses=True)
- while True:
- now = time.time()
- tasks = r.zrangebyscore('delay_queue', 0, now)
- for task in tasks:
- print(f"Processing {task}")
- r.zrem('delay_queue', task)
- time.sleep(1)
复制代码 4.3. 适用场景
- 定时使命(如定时短信、延迟实行)。
- 超时检测(如订单逾期、缓存清理)。
5. 选择合适的 Redis 消息队列
方案持久化支持多消费者适用场景List✅竞争消费使命队列、异步使命Pub/Sub❌广播消费实时推送、通知Stream✅组消费事件流、日志收集Sorted Set✅竞争消费延迟使命 总结
- 如果需要使命队列,利用 List(支持竞争消费,得当使命调度)。
- 如果需要实时推送,利用 Pub/Sub(广播消息,不存储)。
- 如果需要持久化消息流,利用 Stream(类似 Kafka,支持回放)。
- 如果需要定时使命,利用 Sorted Set(得当延迟队列)。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |