RabbitMQ 生产者与消费者:实现同步消息处理的全面指南 ...

打印 上一主题 下一主题

主题 924|帖子 924|积分 2772

在 RabbitMQ 中,生产者和消费者通常是异步工作的,但如果您盼望实现一种机制,使得生产者在发送下一条消息之前等待消费者处理完当前消息(即实现同步),可以通过以下几种方式来实现。
### 方法 1: 使用确认机制
RabbitMQ 提供了消息确认机制,您可以在生产者中等待消费者确认消息已被处理完再发送下一条消息。以下是实现步骤:
1. **启用消息确认**:在消费者中处理完消息后,发送一个确认。
2. **生产者等待确认**:生产者在发送每条消息后等待消费者的确认。
#### 示例代码
以下是一个简朴的示例,演示如何使用确认机制来实现同步。
##### 生产者代码
```python
import pika
# 连接到 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个队列
channel.queue_declare(queue='hello')
# 启用消息确认
channel.confirm_select()
# 发送消息并等待确认
for i in range(5):
    message = f'Message {i + 1}'
    channel.basic_publish(exchange='', routing_key='hello', body=message)
    print(f" [x] Sent '{message}'")
    
    # 等待确认
    if channel.is_open and channel.is_confirm_select:
        print("
  • Waiting for acknowledgment...")
            channel.wait_for_confirm()
            print("
  • Message acknowledged.")
    # 关闭连接
    connection.close()
    ```
    ##### 消费者代码
    ```python
    import pika
    import time
    # 连接到 RabbitMQ 服务器
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    # 声明一个队列
    channel.queue_declare(queue='hello')
    # 界说回调函数
    def callback(ch, method, properties, body):
        print(f" [x] Received {body.decode()}")
        time.sleep(1)  # 模拟处理时间
        ch.basic_ack(delivery_tag=method.delivery_tag)  # 发送确认
        print(f" [x] Acknowledged {body.decode()}")
    # 告诉 RabbitMQ 使用 callback 来吸收消息
    channel.basic_consume(queue='hello', on_message_callback=callback)
    print('
  • Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    ```
    ### 运行示例
    1. **启动 RabbitMQ 服务器**:确保 RabbitMQ 服务器正在运行。
    2. **运行消费者**:起首运行消费者脚本,它会等待吸收消息。
    3. **运行生产者**:然后运行生产者脚本,它会发送多条消息,并在每条消息被确认后再发送下一条。
    ### 方法 2: 使用消息队列的特性
    如果您盼望在消费者处理完当前消息后再发送下一条消息,可以在消费者中添加一个信号机制,例如使用 `queue.Queue` 或 `threading.Event` 来关照生产者。
    ### 方法 3: 使用 RPC(长途过程调用)
    RabbitMQ 还支持 RPC 模式,您可以将哀求发送到队列,消费者处理哀求并返回效果。生产者会在发送哀求后等待消费者的响应。这种方式在某些场景下也可以实现同步。
    #### 示例代码(RPC)
    **生产者(客户端)代码**:
    ```python
    import pika
    import uuid
    class FibonacciRpcClient:
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
            self.channel = self.connection.channel()
            result = self.channel.queue_declare(queue='', exclusive=True)
            self.callback_queue = result.method.queue
            self.channel.basic_consume(queue=self.callback_queue,
                                       on_message_callback=self.on_response,
                                       auto_ack=True)
        def on_response(self, ch, method, properties, body):
            self.response = body
        def call(self, n):
            self.response = None
            self.channel.basic_publish(exchange='',
                                        routing_key='rpc_queue',
                                        properties=pika.BasicProperties(
                                            reply_to=self.callback_queue,
                                        ),
                                        body=str(n))
            while self.response is None:
                self.connection.process_data_events()
            return int(self.response)
    rpc_client = FibonacciRpcClient()
    for i in range(5):
        print(f" [x] Requesting fib({i})")
        response = rpc_client.call(i)
        print(f" [.] Got {response}")
    rpc_client.connection.close()
    ```
    **消费者(服务端)代码**:
    ```python
    import pika
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n - 1) + fib(n - 2)
    def on_request(ch, method, properties, body):
        n = int(body)
        print(f" [.] fib({n})")
        response = fib(n)
        ch.basic_publish(exchange='',
                         routing_key=properties.reply_to,
                         properties=pika.BasicProperties(
                             correlation_id=properties.correlation_id
                         ),
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='rpc_queue')
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
    ```
    ### 总结
    - **确认机制**:通过在生产者中等待消费者确认消息处理完成,可以实现同步发送消息。
    - **RPC 模式**:使用 RabbitMQ 的 RPC 特性可以在发送哀求后等待响应,实现同步处理。
    - **信号机制**:通过使用其他 Python 线程或变乱机制,您可以在消费者处理完消息后关照生产者发送下一条消息。
    这些方法都可以根据您的具体需求举行调解和扩展。选择最恰当您应用场景的方法。

    免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
  • 回复

    使用道具 举报

    0 个回复

    倒序浏览

    快速回复

    您需要登录后才可以回帖 登录 or 立即注册

    本版积分规则

    道家人

    金牌会员
    这个人很懒什么都没写!
    快速回复 返回顶部 返回列表