马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
同步调用是微服务耦合的根源。消息队列解耦了服务,也救了你的相应时间。用MonkeyCode,从选型到实现一把梭。
为什么必要消息队列?
一个典范的电商下单流程,同步调用长如许:- # 同步调用:创建订单要等所有下游返回
- @app.post("/orders")
- async def create_order(req: CreateOrderRequest):
- user = await get_user(req.user_id) # 50ms
- payment = await process_payment(req) # 200ms
- inventory = await deduct_inventory(req) # 100ms
- notify = await send_notification(req) # 80ms
- log = await write_order_log(req) # 30ms
- return {"status": "ok"} # 总共 460ms
复制代码 用户点击"下单"后,要等460ms才气看到结果。而且任何一个服务挂了,下单就失败了。
引入消息队列后:- # 异步处理:下单只管写队列,20ms内返回
- @app.post("/orders")
- async def create_order(req: CreateOrderRequest):
- order_id = await save_order(req) # 写数据库 20ms
- await publish_event("order.created", { # 发消息 5ms
- "order_id": order_id,
- "user_id": req.user_id,
- "amount": req.amount
- })
- return {"order_id": order_id, "status": "processing"} # 25ms
复制代码 鄙俚的付出、扣库存、发关照,全部异步处置惩罚。用户体验快了18倍。
主流消息队列选型
MonkeyCode帮你选:
方案实用场景吞吐量延长RabbitMQ业务消息、延长队列中等(万级/秒)毫秒级Redis Streams轻量场景、已有Redis高(十万级/秒)毫秒级Kafka日志 流、大数据管道极高(百万级/秒)毫秒~秒级Amazon SQS云原生、无运维高秒级实战一:RabbitMQ + FastAPI
安装设置
让MonkeyCode天生Docker设置:- 为RabbitMQ生成docker-compose配置,包含管理界面,默认用户密码admin/pass
复制代码- # docker-compose.yml
- version: '3.8'
- services:
- rabbitmq:
- image: rabbitmq:3.12-management
- ports:
- - "5672:5672" # AMQP端口
- - "15672:15672" # 管理界面
- environment:
- RABBITMQ_DEFAULT_USER: admin
- RABBITMQ_DEFAULT_PASS: pass
- volumes:
- - rabbitmq_data:/var/lib/rabbitmq
- volumes:
- rabbitmq_data:
复制代码 启动后访问 http://localhost:15672,用admin/pass登录。
发布消息(订单服务)
- # order-service/producer.py
- import aio_pika
- import json
- from typing import Any
- class MessageProducer:
- def __init__(self, amqp_url: str = "amqp://admin:pass@localhost:5672/"):
- self.amqp_url = amqp_url
- self.connection = None
- self.channel = None
-
- async def connect(self):
- self.connection = await aio_pika.connect_robust(self.amqp_url)
- self.channel = await self.connection.channel()
- # 设置QoS,每次最多处理10条消息
- await self.channel.set_qos(prefetch_count=10)
-
- async def publish(self, queue_name: str, message: dict, priority: int = 0):
- if not self.channel:
- await self.connect()
-
- # 声明队列(幂等操作)
- queue = await self.channel.declare_queue(
- queue_name,
- durable=True, # 队列持久化
- arguments={
- "x-max-priority": 10 # 支持优先级
- }
- )
-
- await self.channel.default_exchange.publish(
- aio_pika.Message(
- body=json.dumps(message, ensure_ascii=False).encode(),
- delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # 消息持久化
- priority=priority,
- headers={"source": "order-service"}
- ),
- routing_key=queue_name
- )
- # 使用
- producer = MessageProducer()
- @app.post("/orders")
- async def create_order(req: CreateOrderRequest):
- order_id = await db.create_order(req)
-
- await producer.publish("order.created", {
- "order_id": order_id,
- "user_id": req.user_id,
- "amount": req.amount,
- "items": [{"sku": item.sku, "qty": item.qty} for item in req.items]
- }, priority=5)
-
- return {"order_id": order_id}
复制代码 斲丧消息(库存服务)
- # inventory-service/consumer.py
- import aio_pika
- import asyncio
- from typing import Callable
- class MessageConsumer:
- def __init__(self, amqp_url: str = "amqp://admin:pass@localhost:5672/"):
- self.amqp_url = amqp_url
-
- async def consume(self, queue_name: str, handler: Callable):
- connection = await aio_pika.connect_robust(self.amqp_url)
- channel = await connection.channel()
- await channel.set_qos(prefetch_count=10)
-
- queue = await channel.declare_queue(queue_name, durable=True)
-
- async def process_message(message: aio_pika.IncomingMessage):
- async with message.process(requeue=False): # 处理失败不自动重入队
- try:
- payload = json.loads(message.body.decode())
- await handler(payload)
- except Exception as e:
- # 发送到死信队列
- await message.nack(requeue=False)
- await send_to_dead_letter_queue(queue_name, payload, str(e))
-
- await queue.consume(process_message)
- print(f"[*] Waiting for messages on {queue_name}. To exit press CTRL+C")
- await asyncio.Future() # 永久运行
- # 库存扣减处理器
- async def handle_order_created(payload: dict):
- order_id = payload["order_id"]
- items = payload["items"]
-
- async with db.transaction():
- for item in items:
- # 乐观锁扣库存
- result = await db.execute("""
- UPDATE inventory
- SET stock = stock - :qty, version = version + 1
- WHERE sku = :sku AND stock >= :qty AND version = :version
- """, {"sku": item["sku"], "qty": item["qty"], "version": current_version})
-
- if result.rowcount == 0:
- # 库存不足,触发补偿事件
- await producer.publish("inventory.insufficient", {
- "order_id": order_id,
- "sku": item["sku"]
- })
- raise Exception(f"库存不足: {item['sku']}")
-
- # 扣库存成功,触发支付事件
- await producer.publish("inventory.deducted", {
- "order_id": order_id,
- "items": items
- })
- # 启动消费者
- if __name__ == "__main__":
- consumer = MessageConsumer()
- asyncio.run(consumer.consume("order.created", handle_order_created))
复制代码 实战二:Redis Streams(轻量方案)
如果你已经用了Redis,Streams是零本钱方案:- # 发布消息
- import redis.asyncio as redis
- async def publish_to_stream(stream_key: str, data: dict):
- r = await redis.from_url("redis://localhost:6379")
- await r.xadd(stream_key, data, id="*") # * 让Redis自动生成ID
- await r.aclose()
- # 消费消息(消费者组模式,支持多实例负载均衡)
- async def consume_from_stream(stream_key: str, group_name: str, consumer_name: str):
- r = await redis.from_url("redis://localhost:6379")
-
- # 创建消费者组(幂等)
- try:
- await r.xgroup_create(stream_key, group_name, id="0", mkstream=True)
- except redis.ResponseError:
- pass # 组已存在
-
- while True:
- # 读取新消息(阻塞式)
- messages = await r.xreadgroup(
- group_name, consumer_name,
- {stream_key: ">"}, # ">" 表示读取未消费的消息
- count=10, block=5000
- )
-
- for stream, msgs in messages:
- for msg_id, fields in msgs:
- try:
- await process_message(dict(fields))
- await r.xack(stream_key, group_name, msg_id) # 确认消费
- except Exception as e:
- print(f"处理失败: {msg_id}, error: {e}")
- # 可以记录到特殊的处理失败集合
- await r.hset(f"failed:{stream_key}", msg_id, str(e))
复制代码 Redis Streams vs RabbitMQ:
特性Redis StreamsRabbitMQ运维本钱零(已有Redis)必要独立摆设长期化支持(AOF/RDB)支持斲丧者组支持支持延长队列需zset共同原生支持消息TTL需xtrim原生支持实战三:Kafka(大数据场景)
当消息量到达百万/秒级别,Kafka是不二之选:- # 生产者
- from aiokafka import AIOKafkaProducer
- import json
- async def kafka_producer():
- producer = AIOKafkaProducer(
- bootstrap_servers="localhost:9092",
- value_serializer=lambda v: json.dumps(v).encode(),
- acks="all", # 所有副本确认,最强可靠性
- retries=3
- )
- await producer.start()
- try:
- await producer.send_and_wait("order-events", {
- "event": "order.created",
- "order_id": "12345",
- "timestamp": datetime.utcnow().isoformat()
- })
- finally:
- await producer.stop()
- # 消费者
- from aiokafka import AIOKafkaConsumer
- async def kafka_consumer():
- consumer = AIOKafkaConsumer(
- "order-events",
- bootstrap_servers="localhost:9092",
- group_id="inventory-service",
- auto_offset_reset="earliest"
- )
- await consumer.start()
- try:
- async for msg in consumer:
- payload = json.loads(msg.value)
- await handle_order_event(payload)
- finally:
- await consumer.stop()
复制代码 变乱驱动架构计划
消息队列的最佳实践是变乱驱动,而不是RPC的变种:
反模式:用消息队列做RPC
- # ❌ 错误:把消息队列当HTTP用
- await publish("order.create", req)
- response = await wait_for_response("order.create.response") # 同步等待
复制代码 正模式:变乱驱动 + 赔偿
- # ✅ 正确:事件驱动
- # 订单服务:发布事件
- await publish("order.created", { "order_id": order_id })
- # 库存服务:订阅事件,处理成功后发布新事件
- await subscribe("order.created", async (payload) => {
- await deduct_inventory(payload)
- await publish("inventory.deducted", { "order_id": payload.order_id })
- })
- # 支付服务:订阅库存扣减事件
- await subscribe("inventory.deducted", async (payload) => {
- await process_payment(payload)
- await publish("payment.processed", { "order_id": payload.order_id })
- })
- # 订单服务:订阅最终完成事件
- await subscribe("payment.processed", async (payload) => {
- await update_order_status(payload.order_id, "paid")
- })
复制代码 幂等性处置惩罚
网络抖动会导致消息重复投递,斲丧者必须做幂等:- async def handle_order_created(payload: dict):
- order_id = payload["order_id"]
-
- # 方案1:数据库唯一约束
- try:
- await db.execute(
- "INSERT INTO order_events (order_id, event_type, processed_at) VALUES (:oid, 'created', NOW())",
- {"oid": order_id}
- )
- except IntegrityError:
- print(f"订单 {order_id} 已处理,跳过")
- return
-
- # 业务逻辑...
复制代码- # 方案2:Redis分布式锁
- async def handle_order_created(payload: dict):
- order_id = payload["order_id"]
- lock_key = f"lock:order:{order_id}"
-
- r = await redis.from_url("redis://localhost:6379")
- acquired = await r.set(lock_key, "1", nx=True, ex=60) # 60秒过期
- if not acquired:
- print(f"订单 {order_id} 正在处理中,跳过")
- return
-
- try:
- await process_order(order_id)
- finally:
- await r.delete(lock_key)
复制代码 死信队列(DLQ)处置惩罚失败消息
- # 定义死信队列
- async def setup_dead_letter_queue(channel: aio_pika.Channel):
- # 主队列,绑定死信交换机
- await channel.declare_queue(
- "order.created",
- durable=True,
- arguments={
- "x-dead-letter-exchange": "dlx",
- "x-dead-letter-routing-key": "order.created.dlq"
- }
- )
-
- # 死信交换机
- dlx = await channel.declare_exchange("dlx", aio_pika.ExchangeType.DIRECT)
-
- # 死信队列
- dlq = await channel.declare_queue("order.created.dlq", durable=True)
- await dlq.bind(dlx, "order.created.dlq")
- # 消费死信队列(人工介入或延迟重试)
- async def consume_dlq():
- connection = await aio_pika.connect_robust("amqp://admin:pass@localhost:5672/")
- channel = await connection.channel()
- dlq = await channel.declare_queue("order.created.dlq", durable=True)
-
- async def handle_failed(msg: aio_pika.IncomingMessage):
- payload = json.loads(msg.body.decode())
- error = msg.headers.get("x-died-reason", "unknown")
-
- # 记录到数据库,等待人工处理
- await db.execute(
- "INSERT INTO failed_messages (payload, error, created_at) VALUES (:p, :e, NOW())",
- {"p": json.dumps(payload), "e": error}
- )
- msg.ack()
-
- await dlq.consume(handle_failed)
复制代码 MonkeyCode Prompt模板
- 我有一个[电商/社交/物联网
]系统,需要引入消息队列解耦[具体场景]。 - 请帮我:
- 1. 选型建议(RabbitMQ/Redis/Kafka),给出对比
- 2. 设计事件类型和payload结构
- 3. 生成生产者和消费者代码(含幂等处理)
- 4. 配置死信队列和重试机制
- 5. 生成Docker Compose部署配置
- 6. 编写单元测试和集成测试
复制代码 踩过的坑
- 消息丢失:没开长期化,RabbitMQ重启后消息全丢 → 设置delivery_mode=PERSISTENT
- 重复斲丧:斲丧者没做幂等,网络超时时重试导致重复处置惩罚 → 数据库唯一束缚或Redis锁
- 队列堆积:斲丧者处置惩罚速率跟不上生产速率 → 增长斲丧者实例 + 监控
队列长度
- 内存走漏:RabbitMQ默认不限定内存使用 → 设置vm_memory_high_watermark=0.4
总结
消息队列的核心代价是解耦和削峰。选RabbitMQ做业务消息,用Redis Streams做轻量场景,上Kafka处置惩罚大数据流。MonkeyCode能帮你从选型、代码天生到摆设设置一站式搞定。
记取:变乱驱动,不是RPC。消息发出去就别等复兴,用变乱链串起整个业务流程。
免责声明:如果侵犯了您的权益,请联系站长及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金. |