1.弁言
1.1什么是rabbitmq
RabbitMQ 是一个消息代理,它可以接受并转发消息。RabbitMQ集接受,存储转发为一体。TODO…
1.2消息队列的基本概念
可以把消息队列看为一个消息容器,我们可以按需生产消息向容器中发送或者从容器中消费消息,尽管这听起来想“一个人”完成的操作,但现实生产和消费的角色并不相同,而是两个步调,并且这两个步调每每不在同一呆板上。
生产消息的一方我们通常称为“生产者”:
消费消息的一方通常称为“消费者”:
2.RabbitMQ 基础
2.1安装
我们将使用 Pika 1.0.0,它是 RabbitMQ 团队推荐的 Python 客户端。 要安装它,您可以使用 pip 软件包管理工具:
- python -m pip install pika --upgrade
复制代码 2.2"holle world"示例
我们将写两个简单的代码,一个作为生产者发送“holle world”字符串,一个作为消费者从队列接受字符串,并在屏幕上打印出来。
起首先来编写我们的生产者”sending.py“
- #!/usr/bin/env python
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
- channel = connection.channel()
复制代码 我们这里是毗连的本地的服务,如果想毗连其他呆板上的服务只需要更改“localhost”即可。
有了生产者(后续统一用P表现)之后我们就需要有一个队列来作为和消费者(后续统一用C表现)之间通信的媒介。
- channel.queue_declare(queue='hello')
复制代码 如果P向一个不存在的队列发送消息,那么该消息就会被丢弃。
需要注意的是Rabbitmq不会直接向队列发送消息,而是将消息发送给exchange(互换机),由exchange来通过绑定的规则向一个或多个队列发送消息。这部门的内容在3.1会有更多先容。
我们现在只需要知道我们需要向默认的exchange发送消息即可,默认的exchange用“”字符串来表现。
- channel.basic_publish(exchange='', #交换机类型,这里采用默认的使用空字符串表示
- routing_key='hello', # 队列名
- body='Hello World!') #消息内容
- print(" [x] Sent 'Hello World!'")
- connection.close() #关闭连接
复制代码 现在我们就可以向hello队列发送消息了,下面我们就来完成C的代码“receiving”。
同样的我们需要先和Rabbitmq取得毗连
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- channel.queue_declare(queue='hello')
复制代码 这里我们再一次创建了hello队列,不消担心调用多次queue_declare函数只会创建一个同名的队列,这样的好处就是当我们无法确定是sending.py在receiving.py之前运行的时间代码也不会报错,这时重复的创建队列就是一个好的方法,也推荐在现实应用中这样做。
接下来就是为receiving写一个回调函数(callback),该函数的作用就是在C取得队列中的消息之后,用过callback处理,我们这里比较简单就是打印接收的消息。
- def callback(ch, method, properties, body):
- print(f" [x] Received {body}")
复制代码 接下来就需要告诉这个callback函数从那个队列去接收消息。
- channel.basic_consume(queue='hello',
- auto_ack=True,
- on_message_callback=callback)
- print(' [*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
复制代码 对于auto_ack=True参数,其实是Rabbitmq简直认应答机制,Rabbitmq需要知道C取得的消息是否被正确的处理了,就是通过该参数来控制,后序在3.2会有更多的先容。
整合起来如下:
sending.py
- #!/usr/bin/env pythonimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='hello')
- channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')print(" [x] Sent 'Hello World!'")connection.close()
复制代码 receiving
- #!/usr/bin/env pythonimport pika, sys, osdef main(): connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello')
- def callback(ch, method, properties, body): print(f" [x] Received {body}") channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) print('
- [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()if __name__ == '__main__': try: main() except KeyboardInterrupt: #通过ctrl-c来中断步调 print('Interrupted') try: sys.exit(0) except SystemExit: os._exit(0)
复制代码 可以开启两个终端分别运行两个文件来检察效果。
3.RabbitMQ 高级特性
3.1Exchange 类型
通过前面的学习我们已经对RabbitMQ的基本工作流程有了一些熟悉,接下来就来学习更多高级内容。
前面在2.2末节也提到过,P不直接向队列发送消息,而是向exchange发送消息,exchange通过不同的绑定规则绑定到不同的队列中,不同的绑定规则就对应了不同类型的exchange,RabbitMQ一同有四种,分别是
direct,
topic,
headers,# 本文不做先容,后续大概会更新
fanout,
接下来分别先容这三种exchange的特性:
direct
只有设置的RoutingKey和BindingKey完全匹配的时间,消息队列才可以获取消息。
如图所示,info类型的消息只会发送到第一个队列中去,warn类型的消息只会发送到第二个队列中去…。
代码中
sending.py
- #!/usr/bin/env python
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
- RoutingKey="info"
- channel.basic_publish(exchange='direct_logs', routing_key=RoutingKey, body=message)
- print(f" [x] Sent {severity}:{message}")
- connection.close()
复制代码 receiving.py
- #!/usr/bin/env python
- import pika
- import sys
- connection = pika.BlockingConnection(
- pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
- #这里我们没有给出队列的具体名,而是通过空字符串和exclusive=True参数来生成一个唯一的队列。
- result = channel.queue_declare(queue='', exclusive=True)
- queue_name = result.method.queue #获取唯一队列名
- BindingKey="info"
- channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=BindingKey)
复制代码 topic
topic和direct较为相似,不外它更较为复杂。它使用通配符来绑定特定的队列。
*:可以完全替换一个单词。
#:可以替换零个或多个单词。
如上图所示:
第一个队列将匹配任意三个单词,此中第二个必须为orange。
而第二个队列有两个通配符规则第一个是匹配任意一个以lazy开头的单词组合(或者lazy,因为#也可以为0)。
第二个是可以匹配三个单词组合,但是第三个必须为rabbit。
如果没有一个可以匹配到对应的队列中,那么该消息将被丢弃。
sending.py
- #!/usr/bin/env python
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
- channel.basic_publish(exchange='topic_logs', routing_key="dag.orange.cat", body="hello world")
- print(f" [x] Sent {routing_key}:{message}")
- connection.close()
复制代码 receiving.py
- #!/usr/bin/env python
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
- result = channel.queue_declare('', exclusive=True)
- queue_name = result.method.queue
- channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key="*.orange.*")
- def callback(ch, method, properties, body):
- print(f" [x] {method.routing_key}:{body}")
- channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
- channel.start_consuming()
复制代码 fanout
fanout就很简单了,它会向它知道的全部队列广播发送消息。
sending.py
- #!/usr/bin/env python
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- channel.exchange_declare(exchange='logs', exchange_type='fanout')
- channel.basic_publish(exchange='logs', routing_key='', body="hello world")
- print(f" [x] Sent {message}")
- connection.close()
复制代码 receiving.py
- #!/usr/bin/env python
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- channel.exchange_declare(exchange='logs', exchange_type='fanout')
- result = channel.queue_declare(queue='', exclusive=True)
- queue_name = result.method.queue
- # 将队列和exchange进行绑定,多次调用queue_bind即可将多个不同的队列绑定到同一个exchange上,具体策略需根据需求而定
- channel.queue_bind(exchange='logs', queue=queue_name)
- def callback(ch, method, properties, body):
- print(f" [x] {body}")
- channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
- channel.start_consuming()
复制代码 注意在receiving中我们并没有对队列名举行定义,而是使用空字符串和exclusive=True让Rabbitmq来天生唯一的一个队列。这一点我们前面也有提及。
3.2消息确认机制
这一节我们将继承学习在2.2中提高的消息确认机制也就是auto_ack=True参数。
在正式先容前我们先要讨论一种场景,当我们有多个消费者并且这些消费者都从同一个队列中拿取消息,那么这时Rabbitmq就会有一个分配的题目,起首你大概想以轮询的方式每个C取得一个,那么就会引发这样的一个题目,此中一个C在处理某个耗时消息(比如I/O麋集型)的时间,别的一个C不绝能很快的处理完消息,这就会造成一些某个C的“压力”越来越大,而其他C却大部门时间都处于空闲的状态。
别的如果图中第一个C在处理1消息的时间,被意外停止了,那么该消息就会丢失,因为消息一旦从队列中取走就会被”标志“为删除,如果你不希望这样的事发生就需要通过auto_ack=False来开启任务简直认应答机制,这样在任务的意外中断时Rabbitmq就会将该消息分配给别的的C去处理。
上述解决了消息在被消费时意外丢失的题目,但是如果Rabbitmq如果意外中断了那么全部的消息就会全部丢失,为了解决这种题目我们还需要在创建队列的时间设置一个参数,那就是exclusive=True。该参数的意义在于将消息持久化写入磁盘中。
- channel.queue_declare(queue='task_queue', durable=True)
复制代码 除了设置auto_ack=False来解决耗时题目(即负载不均衡)外,Rabbitmq还提供别的两种机制。
- #消息预取(Prefetch):
- #RabbitMQ 提供了 basic.qos 方法,可以用来设置每个消费者在处理完前一个消息之前最多能接收多少个未确认的消息。这可以防止某个消费者一次性接收过多的消息,从而导致负载不均衡。
- channel.basic_qos(prefetch_count=1)
- #通过设置 prefetch_count=1,RabbitMQ 会确保每个消费者在处理完并确认前一个消息之前不会接收新的消息。这有助于更均匀地分配消息,避免某个消费者过载。
复制代码- #手动消息确认(Manual Acknowledgment):
- #使用手动消息确认机制,消费者在处理完消息后显式地向 RabbitMQ 发送确认。这可以确保消息在被成功处理之前不会被RabbitMQ 从队列中删除。
- def callback(ch, method, properties, body):
- # 处理消息
- print(f"Received {body}")
- # 手动确认消息
- ch.basic_ack(delivery_tag=method.delivery_tag)
-
- channel.basic_consume(queue='your_queue', on_message_callback=callback, auto_ack=False)
复制代码 3.3消息优先级
如果你想某个消息应该较早的被C消费,那么就可以通过设置优先级来实现。
优先级范围:
RabbitMQ 支持的优先级范围是 0 到 255,此中 0 是最低优先级,255 是最高优先级。你可以在声明队列时指定最大优先级。
要实现优先级起首需要在创建队列的时间名声该队列为优先级队列,我们通过arguments参数来设置。
sending.py
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- # 声明一个优先级队列,最大优先级为 10
- args = {'x-max-priority': 10}
- channel.queue_declare(queue='priority_queue', arguments=args)
- # 发送消息
- channel.basic_publish(
- exchange='',
- routing_key='priority_queue',
- body='Low priority message',
- properties=pika.BasicProperties(priority=1) # 设置消息优先级为 1
- )
- channel.basic_publish(
- exchange='',
- routing_key='priority_queue',
- body='High priority message',
- properties=pika.BasicProperties(priority=10) # 设置消息优先级为 10
- )
- print("Messages sent")
- connection.close()
复制代码 receiving.py
消费者代码不需要做任何特殊处理,RabbitMQ 会根据消息的优先级自动排序并分发消息。
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- def callback(ch, method, properties, body):
- print(f"Received {body}")
-
- # 消费消息
- channel.basic_consume(queue='priority_queue', on_message_callback=callback, auto_ack=True)
- print('Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
复制代码 3.4死信队列(DLQ)
RabbitMQ 的死信队列(Dead Letter Queue, DLQ)是一个用于处理无法被正常消费的消息的机制。
无法被正常消费的消息有以下几种环境。
1.消息被拒绝(Rejected):
消费者显式地拒绝消息,并且不要求重新入队(requeue)。
2.消息过期(TTL, Time-To-Live):
消息在队列中存活的时间凌驾了设置的 TTL。
3.队列长度限制(Max Length):
队列到达最大长度,新的消息无法入队。
以上几种类型的消息都会被放到私信队列中去。
要使用死信队列,你需要在声明队列时设置一些参数,包罗 x-dead-letter-exchange 和(可选的)x-dead-letter-routing-key。
sending.py
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- # 声明死信交换机
- channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct')
- # 声明死信队列
- channel.queue_declare(queue='dlx_queue')
- # 将死信队列绑定到死信交换机
- channel.queue_bind(exchange='dlx_exchange', queue='dlx_queue', routing_key='dlx_routing_key')
- # 声明原始队列,并设置死信交换机和路由键
- args = {
- 'x-dead-letter-exchange': 'dlx_exchange',
- 'x-dead-letter-routing-key': 'dlx_routing_key'
- }
- channel.queue_declare(queue='original_queue', arguments=args)
- # 发送消息到原始队列
- channel.basic_publish(
- exchange='',
- routing_key='original_queue',
- body='Test Message'
- )
- print("Message sent to original queue")
- connection.close()
复制代码 receiving.py
- import pika
- import threading
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- # 消费原始队列中的消息并触发死信机制
- def original_callback(ch, method, properties, body):
- print(f"Received from original queue: {body}")
- # 拒绝消息并不重新入队
- ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
- channel.basic_consume(queue='original_queue', on_message_callback=original_callback, auto_ack=False)
- # 启动一个新的线程来消费死信队列中的消息
- def consume_dlx():
- dlx_connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- dlx_channel = dlx_connection.channel()
- def dlx_callback(ch, method, properties, body):
- print(f"DLX Received: {body}")
- ch.basic_ack(delivery_tag=method.delivery_tag)
- dlx_channel.basic_consume(queue='dlx_queue', on_message_callback=dlx_callback, auto_ack=False)
- dlx_channel.start_consuming()
- dlx_thread = threading.Thread(target=consume_dlx)
- dlx_thread.start()
- # 开始消费原始队列中的消息
- channel.start_consuming()
复制代码 4.参考资料
官网:Rabbitmq官网(需要科学上网)
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |