MonkeyCode写消息队列:从零实现高可用异步架构

[复制链接]
发表于 昨天 20:26 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
同步调用是微服务耦合的根源。消息队列解耦了服务,也救了你的相应时间。用MonkeyCode,从选型到实现一把梭。
为什么必要消息队列?

一个典范的电商下单流程,同步调用长如许:
  1. # 同步调用:创建订单要等所有下游返回
  2. @app.post("/orders")
  3. async def create_order(req: CreateOrderRequest):
  4.     user = await get_user(req.user_id)          # 50ms
  5.     payment = await process_payment(req)          # 200ms
  6.     inventory = await deduct_inventory(req)      # 100ms
  7.     notify = await send_notification(req)         # 80ms
  8.     log = await write_order_log(req)             # 30ms
  9.     return {"status": "ok"}  # 总共 460ms
复制代码
用户点击"下单"后,要等460ms才气看到结果。而且任何一个服务挂了,下单就失败了。
引入消息队列后:
  1. # 异步处理:下单只管写队列,20ms内返回
  2. @app.post("/orders")
  3. async def create_order(req: CreateOrderRequest):
  4.     order_id = await save_order(req)  # 写数据库 20ms
  5.     await publish_event("order.created", {   # 发消息 5ms
  6.         "order_id": order_id,
  7.         "user_id": req.user_id,
  8.         "amount": req.amount
  9.     })
  10.     return {"order_id": order_id, "status": "processing"}  # 25ms
复制代码
鄙俚的付出、扣库存、发关照,全部异步处置惩罚。用户体验快了18倍。
主流消息队列选型

MonkeyCode帮你选:
方案实用场景吞吐量延长RabbitMQ业务消息、延长队列中等(万级/秒)毫秒级Redis Streams轻量场景、已有Redis高(十万级/秒)毫秒级Kafka日志日志流、大数据管道极高(百万级/秒)毫秒~秒级Amazon SQS云原生、无运维高秒级实战一:RabbitMQ + FastAPI

安装设置

让MonkeyCode天生Docker设置:
  1. 为RabbitMQ生成docker-compose配置,包含管理界面,默认用户密码admin/pass
复制代码
  1. # docker-compose.yml
  2. version: '3.8'
  3. services:
  4.   rabbitmq:
  5.     image: rabbitmq:3.12-management
  6.     ports:
  7.       - "5672:5672"   # AMQP端口
  8.       - "15672:15672"  # 管理界面
  9.     environment:
  10.       RABBITMQ_DEFAULT_USER: admin
  11.       RABBITMQ_DEFAULT_PASS: pass
  12.     volumes:
  13.       - rabbitmq_data:/var/lib/rabbitmq
  14. volumes:
  15.   rabbitmq_data:
复制代码
启动后访问 http://localhost:15672,用admin/pass登录。
发布消息(订单服务)
  1. # order-service/producer.py
  2. import aio_pika
  3. import json
  4. from typing import Any
  5. class MessageProducer:
  6.     def __init__(self, amqp_url: str = "amqp://admin:pass@localhost:5672/"):
  7.         self.amqp_url = amqp_url
  8.         self.connection = None
  9.         self.channel = None
  10.    
  11.     async def connect(self):
  12.         self.connection = await aio_pika.connect_robust(self.amqp_url)
  13.         self.channel = await self.connection.channel()
  14.         # 设置QoS,每次最多处理10条消息
  15.         await self.channel.set_qos(prefetch_count=10)
  16.    
  17.     async def publish(self, queue_name: str, message: dict, priority: int = 0):
  18.         if not self.channel:
  19.             await self.connect()
  20.         
  21.         # 声明队列(幂等操作)
  22.         queue = await self.channel.declare_queue(
  23.             queue_name,
  24.             durable=True,       # 队列持久化
  25.             arguments={
  26.                 "x-max-priority": 10  # 支持优先级
  27.             }
  28.         )
  29.         
  30.         await self.channel.default_exchange.publish(
  31.             aio_pika.Message(
  32.                 body=json.dumps(message, ensure_ascii=False).encode(),
  33.                 delivery_mode=aio_pika.DeliveryMode.PERSISTENT,  # 消息持久化
  34.                 priority=priority,
  35.                 headers={"source": "order-service"}
  36.             ),
  37.             routing_key=queue_name
  38.         )
  39. # 使用
  40. producer = MessageProducer()
  41. @app.post("/orders")
  42. async def create_order(req: CreateOrderRequest):
  43.     order_id = await db.create_order(req)
  44.    
  45.     await producer.publish("order.created", {
  46.         "order_id": order_id,
  47.         "user_id": req.user_id,
  48.         "amount": req.amount,
  49.         "items": [{"sku": item.sku, "qty": item.qty} for item in req.items]
  50.     }, priority=5)
  51.    
  52.     return {"order_id": order_id}
复制代码
斲丧消息(库存服务)
  1. # inventory-service/consumer.py
  2. import aio_pika
  3. import asyncio
  4. from typing import Callable
  5. class MessageConsumer:
  6.     def __init__(self, amqp_url: str = "amqp://admin:pass@localhost:5672/"):
  7.         self.amqp_url = amqp_url
  8.    
  9.     async def consume(self, queue_name: str, handler: Callable):
  10.         connection = await aio_pika.connect_robust(self.amqp_url)
  11.         channel = await connection.channel()
  12.         await channel.set_qos(prefetch_count=10)
  13.         
  14.         queue = await channel.declare_queue(queue_name, durable=True)
  15.         
  16.         async def process_message(message: aio_pika.IncomingMessage):
  17.             async with message.process(requeue=False):  # 处理失败不自动重入队
  18.                 try:
  19.                     payload = json.loads(message.body.decode())
  20.                     await handler(payload)
  21.                 except Exception as e:
  22.                     # 发送到死信队列
  23.                     await message.nack(requeue=False)
  24.                     await send_to_dead_letter_queue(queue_name, payload, str(e))
  25.         
  26.         await queue.consume(process_message)
  27.         print(f"[*] Waiting for messages on {queue_name}. To exit press CTRL+C")
  28.         await asyncio.Future()  # 永久运行
  29. # 库存扣减处理器
  30. async def handle_order_created(payload: dict):
  31.     order_id = payload["order_id"]
  32.     items = payload["items"]
  33.    
  34.     async with db.transaction():
  35.         for item in items:
  36.             # 乐观锁扣库存
  37.             result = await db.execute("""
  38.                 UPDATE inventory
  39.                 SET stock = stock - :qty, version = version + 1
  40.                 WHERE sku = :sku AND stock >= :qty AND version = :version
  41.             """, {"sku": item["sku"], "qty": item["qty"], "version": current_version})
  42.             
  43.             if result.rowcount == 0:
  44.                 # 库存不足,触发补偿事件
  45.                 await producer.publish("inventory.insufficient", {
  46.                     "order_id": order_id,
  47.                     "sku": item["sku"]
  48.                 })
  49.                 raise Exception(f"库存不足: {item['sku']}")
  50.         
  51.         # 扣库存成功,触发支付事件
  52.         await producer.publish("inventory.deducted", {
  53.             "order_id": order_id,
  54.             "items": items
  55.         })
  56. # 启动消费者
  57. if __name__ == "__main__":
  58.     consumer = MessageConsumer()
  59.     asyncio.run(consumer.consume("order.created", handle_order_created))
复制代码
实战二:Redis Streams(轻量方案)

如果你已经用了Redis,Streams是零本钱方案:
  1. # 发布消息
  2. import redis.asyncio as redis
  3. async def publish_to_stream(stream_key: str, data: dict):
  4.     r = await redis.from_url("redis://localhost:6379")
  5.     await r.xadd(stream_key, data, id="*")  # * 让Redis自动生成ID
  6.     await r.aclose()
  7. # 消费消息(消费者组模式,支持多实例负载均衡)
  8. async def consume_from_stream(stream_key: str, group_name: str, consumer_name: str):
  9.     r = await redis.from_url("redis://localhost:6379")
  10.    
  11.     # 创建消费者组(幂等)
  12.     try:
  13.         await r.xgroup_create(stream_key, group_name, id="0", mkstream=True)
  14.     except redis.ResponseError:
  15.         pass  # 组已存在
  16.    
  17.     while True:
  18.         # 读取新消息(阻塞式)
  19.         messages = await r.xreadgroup(
  20.             group_name, consumer_name,
  21.             {stream_key: ">"},  # ">" 表示读取未消费的消息
  22.             count=10, block=5000
  23.         )
  24.         
  25.         for stream, msgs in messages:
  26.             for msg_id, fields in msgs:
  27.                 try:
  28.                     await process_message(dict(fields))
  29.                     await r.xack(stream_key, group_name, msg_id)  # 确认消费
  30.                 except Exception as e:
  31.                     print(f"处理失败: {msg_id}, error: {e}")
  32.                     # 可以记录到特殊的处理失败集合
  33.                     await r.hset(f"failed:{stream_key}", msg_id, str(e))
复制代码
Redis Streams vs RabbitMQ:
特性Redis StreamsRabbitMQ运维本钱零(已有Redis)必要独立摆设长期化支持(AOF/RDB)支持斲丧者组支持支持延长队列需zset共同原生支持消息TTL需xtrim原生支持实战三:Kafka(大数据场景)

当消息量到达百万/秒级别,Kafka是不二之选:
  1. # 生产者
  2. from aiokafka import AIOKafkaProducer
  3. import json
  4. async def kafka_producer():
  5.     producer = AIOKafkaProducer(
  6.         bootstrap_servers="localhost:9092",
  7.         value_serializer=lambda v: json.dumps(v).encode(),
  8.         acks="all",  # 所有副本确认,最强可靠性
  9.         retries=3
  10.     )
  11.     await producer.start()
  12.     try:
  13.         await producer.send_and_wait("order-events", {
  14.             "event": "order.created",
  15.             "order_id": "12345",
  16.             "timestamp": datetime.utcnow().isoformat()
  17.         })
  18.     finally:
  19.         await producer.stop()
  20. # 消费者
  21. from aiokafka import AIOKafkaConsumer
  22. async def kafka_consumer():
  23.     consumer = AIOKafkaConsumer(
  24.         "order-events",
  25.         bootstrap_servers="localhost:9092",
  26.         group_id="inventory-service",
  27.         auto_offset_reset="earliest"
  28.     )
  29.     await consumer.start()
  30.     try:
  31.         async for msg in consumer:
  32.             payload = json.loads(msg.value)
  33.             await handle_order_event(payload)
  34.     finally:
  35.         await consumer.stop()
复制代码
变乱驱动架构计划

消息队列的最佳实践是变乱驱动,而不是RPC的变种:
反模式:用消息队列做RPC
  1. # ❌ 错误:把消息队列当HTTP用
  2. await publish("order.create", req)
  3. response = await wait_for_response("order.create.response")  # 同步等待
复制代码
正模式:变乱驱动 + 赔偿
  1. # ✅ 正确:事件驱动
  2. # 订单服务:发布事件
  3. await publish("order.created", { "order_id": order_id })
  4. # 库存服务:订阅事件,处理成功后发布新事件
  5. await subscribe("order.created", async (payload) => {
  6.     await deduct_inventory(payload)
  7.     await publish("inventory.deducted", { "order_id": payload.order_id })
  8. })
  9. # 支付服务:订阅库存扣减事件
  10. await subscribe("inventory.deducted", async (payload) => {
  11.     await process_payment(payload)
  12.     await publish("payment.processed", { "order_id": payload.order_id })
  13. })
  14. # 订单服务:订阅最终完成事件
  15. await subscribe("payment.processed", async (payload) => {
  16.     await update_order_status(payload.order_id, "paid")
  17. })
复制代码
幂等性处置惩罚

网络抖动会导致消息重复投递,斲丧者必须做幂等:
  1. async def handle_order_created(payload: dict):
  2.     order_id = payload["order_id"]
  3.    
  4.     # 方案1:数据库唯一约束
  5.     try:
  6.         await db.execute(
  7.             "INSERT INTO order_events (order_id, event_type, processed_at) VALUES (:oid, 'created', NOW())",
  8.             {"oid": order_id}
  9.         )
  10.     except IntegrityError:
  11.         print(f"订单 {order_id} 已处理,跳过")
  12.         return
  13.    
  14.     # 业务逻辑...
复制代码
  1. # 方案2:Redis分布式锁
  2. async def handle_order_created(payload: dict):
  3.     order_id = payload["order_id"]
  4.     lock_key = f"lock:order:{order_id}"
  5.    
  6.     r = await redis.from_url("redis://localhost:6379")
  7.     acquired = await r.set(lock_key, "1", nx=True, ex=60)  # 60秒过期
  8.     if not acquired:
  9.         print(f"订单 {order_id} 正在处理中,跳过")
  10.         return
  11.    
  12.     try:
  13.         await process_order(order_id)
  14.     finally:
  15.         await r.delete(lock_key)
复制代码
死信队列(DLQ)处置惩罚失败消息
  1. # 定义死信队列
  2. async def setup_dead_letter_queue(channel: aio_pika.Channel):
  3.     # 主队列,绑定死信交换机
  4.     await channel.declare_queue(
  5.         "order.created",
  6.         durable=True,
  7.         arguments={
  8.             "x-dead-letter-exchange": "dlx",
  9.             "x-dead-letter-routing-key": "order.created.dlq"
  10.         }
  11.     )
  12.    
  13.     # 死信交换机
  14.     dlx = await channel.declare_exchange("dlx", aio_pika.ExchangeType.DIRECT)
  15.    
  16.     # 死信队列
  17.     dlq = await channel.declare_queue("order.created.dlq", durable=True)
  18.     await dlq.bind(dlx, "order.created.dlq")
  19. # 消费死信队列(人工介入或延迟重试)
  20. async def consume_dlq():
  21.     connection = await aio_pika.connect_robust("amqp://admin:pass@localhost:5672/")
  22.     channel = await connection.channel()
  23.     dlq = await channel.declare_queue("order.created.dlq", durable=True)
  24.    
  25.     async def handle_failed(msg: aio_pika.IncomingMessage):
  26.         payload = json.loads(msg.body.decode())
  27.         error = msg.headers.get("x-died-reason", "unknown")
  28.         
  29.         # 记录到数据库,等待人工处理
  30.         await db.execute(
  31.             "INSERT INTO failed_messages (payload, error, created_at) VALUES (:p, :e, NOW())",
  32.             {"p": json.dumps(payload), "e": error}
  33.         )
  34.         msg.ack()
  35.    
  36.     await dlq.consume(handle_failed)
复制代码
MonkeyCode Prompt模板
  1. 我有一个[电商/社交/物联网物联网]系统,需要引入消息队列解耦[具体场景]。
  2. 请帮我:
  3. 1. 选型建议(RabbitMQ/Redis/Kafka),给出对比
  4. 2. 设计事件类型和payload结构
  5. 3. 生成生产者和消费者代码(含幂等处理)
  6. 4. 配置死信队列和重试机制
  7. 5. 生成Docker Compose部署配置
  8. 6. 编写单元测试和集成测试
复制代码
踩过的坑


  • 消息丢失:没开长期化,RabbitMQ重启后消息全丢 → 设置delivery_mode=PERSISTENT
  • 重复斲丧:斲丧者没做幂等,网络超时时重试导致重复处置惩罚 → 数据库唯一束缚或Redis锁
  • 队列堆积:斲丧者处置惩罚速率跟不上生产速率 → 增长斲丧者实例 + 监控监控队列长度
  • 内存走漏:RabbitMQ默认不限定内存使用 → 设置vm_memory_high_watermark=0.4
总结

消息队列的核心代价是解耦削峰。选RabbitMQ做业务消息,用Redis Streams做轻量场景,上Kafka处置惩罚大数据流。MonkeyCode能帮你从选型、代码天生到摆设设置一站式搞定。
记取:变乱驱动,不是RPC。消息发出去就别等复兴,用变乱链串起整个业务流程。

免责声明:如果侵犯了您的权益,请联系站长及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金.
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表