什么是消息队列,为什么要利用消息队列
消息队列(Message Queue,简称MQ)是一种在不同应用程序之间的通信的方式。它答应应用程序发送消息到队列中,无需立即处理,而是可以存储起来,直到有其他应用程序准备好去处理它们,这种异步通信的方式可以在比较复杂的业务逻辑中实现各个应用程序之间有条不紊的通信,增加整个体系的稳固性和可维护性。
MQ基本概念
Broker:消息的处理中央,可以明白为一个RabbitMQ服务或是服务集群
Producer:消息生产者,即将消息推送到Broker中的脚色
Consumer:消息消费者,即从 Broker中获取消息的脚色
CentOS安装RabbitMQ
RabbitMQ依赖于Erlang,执行yum -y install erlang安装并查察自己的Erlang版本
如果yum安装不上的话可以去此地址下载对应版本的rpm包进行安装
rabbitmq/erlang - Packages · packagecloud
到此地址查察那个RabbitMQ版本的RPM安装包适配26.2x的Erlang
Releases · rabbitmq/rabbitmq-server
发现4.0.3版本是兼容26.2xErlang环境的
下载第一个RPM安装包并上传到服务器
执行命令安装RPM包
- rpm -ivh rabbitmq-server-4.0.3-1.el8.noarch.rpm
复制代码
设置开机自启并启动RabbitMQ服务
- systemctl enable rabbitmq-server
- systemctl start rabbitmq-server
复制代码 启用RabbitMQ可视化web管理页面
- rabbitmq-plugins enable rabbitmq_management
复制代码 添加RabbitMQ管理端用户dovir,密码为123
- rabbitmqctl add_user dovir 123
- rabbitmqctl set_permissions -p / dovir ".*" ".*" ".*"
- rabbitmqctl set_user_tags dovir administrator
- rabbitmqctl list_users
复制代码 确保防火墙为关闭状态
输入http://服务器ip地址:15672/访问RabbitMQ可视化web管理页面用自己添加的账号登录
可以看到如今没有任何队列Queue
利用Python向消息队列中推送数据
RabbitMQ有多种工作模式,这里仅树模最简单的模式——Hello world模式,即一个producer发送message,另一个consumer吸收message
producer发送message
- import pika
- # 连接到RabbitMQ服务器
- connection = pika.BlockingConnection(
- pika.ConnectionParameters(
- host = '192.168.160.132',
- credentials = pika.PlainCredentials('dovir', '123')
- )
- )
- # 向消息队列推送数据和从中拉取都是通过通道进行的,要先创建通道,用channel去推拉数据
- channel = connection.channel()
- # 声明(创建)一个名为mq_test的消息队列,已经存在则不会重复创建
- channel.queue_declare(queue = 'mq_test')
- # 向消息队列推送数据
- channel.basic_publish(
- # 指定通过那个交换机去推送,为空则使用默认交换机
- exchange = '',
- # 指定数据推送到mq_test队列
- routing_key = 'mq_test',
- # 消息的内容
- body = 'This is a RabbitMQ test action'
- )
- # 推送结束后关闭与RabbitMQ服务器的连接,释放资源
- connection.close()
复制代码 执行后发现有了一个名为mq_test的队列

点击队列,用Get messages查察队列中的内容

利用Python从消息队列中拉取数据
consumer吸收message
- import pika
- # 连接到RabbitMQ服务器
- connection = pika.BlockingConnection(
- pika.ConnectionParameters(
- host = '192.168.160.132',
- credentials = pika.PlainCredentials('dovir', '123')
- )
- )
- # 向消息队列推送数据和从中拉取都是通过通道进行的,要先创建通道,用channel去推拉数据
- channel = connection.channel()
- # 声明(创建)一个名为mq_test的消息队列,已经存在则不会重复创建
- channel.queue_declare(queue = 'mq_test')
- # 定义成功拉取到消息后的回调函数
- def callback(ch, method, properties, body):
- print('成功拉取消息:',body)
- # 拉取消息
- channel.basic_consume(
- # 拉取mq_test消息队列里面的消息
- queue = 'mq_test',
- # 成功拉取到消息后要执行的回调函数
- on_message_callback = callback,
- # 成功接收到
- auto_ack = True
- )
- # 开始循环等待,一直处于等待接收消息的状态
- print('等待消息入队列中......')
- channel.start_consuming()
复制代码 运行后成功拉取到消息
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |