提示:RabbitMQ五种工作模式及应用(官方是7种)
媒介
RabbitMQ官方文档
一、RabbitMQ安装(Docker)
- 下载镜像:
- 运行容器:
- docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:latest
复制代码 账号admin,暗码123456 用于登录MQ后台
- 打开访问界面
- 进入容器:
- docker exec -it a64bbac6d26f bash
- 执行:
- rabbitmq-plugins enable rabbitmq_management
- 输出:
- Enabling plugins on node rabbit@a64bbac6d26f:
- rabbitmq_management
- The following plugins have been configured:
- rabbitmq_management
- rabbitmq_management_agent
- rabbitmq_prometheus
- rabbitmq_web_dispatch
- Applying plugin configuration to rabbit@a64bbac6d26f...
- The following plugins have been enabled:
- rabbitmq_management
- started 1 plugins
复制代码 - 即可在浏览器中通过 http://127.0.0.1:15672/ 访问,可通过上面设置的账号暗码进行登录。
二、MQ中相关概念
KEYVALUEBroker接收和分发消息的应用Virtual host出于多租户和安全因素计划的,把 AMQP 的根本组件分别到一个虚拟的分组中,雷同于网络中的 namespace 概念。当多个差别的用户使用同一个 RabbitMQ server 提供的服务时,可以分别出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等Connectionpublisher/consumer 和 broker 之间的 TCP 连接Channel假如每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,服从也较低。Channel 是在 connection 内部建立的逻辑连接,假如应用步伐支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包罗了channel id 帮助客户端和message broker 识别 channel,以是 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作体系建立 TCP connection 的开销Exchangemessage 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的范例有:direct (定向,把消息交给符合指定routing key 的队列), topic (通配符,把消息交给符合routing pattern(路由模式) 的队列) and fanout (广播,将消息交给所有绑定到交换机的队列)Queue消息最终被送到这里等待 consumer 取走Bindingexchange 和 queue 之间的虚拟连接,binding 中可以包罗 routing key。Binding 信息被生存到 exchange 中的查询表中,用于 message 的分发依据 每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头
Exchange(交换机)只负责转发消息,不具备存储消息的本领,因此假如没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
三、工作模式
模式特点场景图解简单 (Simple)一个生产者、一个消耗者,不必要设置交换机(使用默认的交换机)单个生产者和单个消耗者(日志记录,订单处理)工作(Work Queue)一个生产者、多个消耗者(竞争关系),不必要设置交换机(使用默认的交换机)必要多个消耗者共同处理任务(异步处理,短信关照)发布/订阅(Publish/Subscribe)必要设置范例为fanout的交换机,而且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列必要消息被多个消耗者同时接收(实时关照,广播)路由(Routing)必要设置范例为direct的交换机,交换机和队列进行绑定,而且指定routingkey,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列必要根据特定条件匹配路由消息(消息分类处理)通配符(Topics)必要设置范例为topic的交换机,交换机和队列进行绑定,而且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列适用于必要根据模式匹配分发消息的场景 四、Java操作MQ
依赖:
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.21.0</version>
- </dependency>
复制代码 工具类:
- package com.rabbitmq.utils;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Connection;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class ConnectionUntils {
- public static Connection getConnection() throws IOException, TimeoutException {
- //创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("127.0.0.1");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setUsername("admin");
- factory.setPassword("123456");
- //创建连接
- Connection connection = factory.newConnection();
- return connection;
- }
- }
复制代码- package com.rabbitmq.simple;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.utils.ConnectionUntils;
- /**
- * 简单模式
- * 生产者
- */
- public class Producer {
- static final String QUEUE_NAME = "simple_name";
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUntils.getConnection();
- Channel channel = connection.createChannel();
- /**
- * 声明队列
- * 参数1:队列名称
- * 参数2:是否定义持久化队列
- * 参数3:是否独占本次连接
- * 参数4:是否在不使用的时候自动删除队列
- * 参数5:队列其它参数
- */
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- /**
- * 发送消息
- * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
- * 参数2:路由key,简单模式可以传递队列名称
- * 参数3:消息其它属性
- * 参数4:消息内容
- */
- String message = "Hello World!7";
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
- //释放资源
- channel.close();
- connection.close();
- }
- }
复制代码- package com.rabbitmq.simple;
- import com.rabbitmq.client.*;
- import com.rabbitmq.utils.ConnectionUntils;
- import java.io.IOException;
- /**
- * 消费者
- */
- public class Consumer {
- public static void main(String[] args) throws Exception {
- Connection conection = ConnectionUntils.getConnection();
- //通道
- Channel channel = conection.createChannel();
- /**
- * 声明队列
- * 参数1:队列名称
- * 参数2:是否定义持久化队列
- * 参数3:是否独占本次连接
- * 参数4:是否在不使用的时候自动删除队列
- * 参数5:队列其它参数
- */
- channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
- //接受消息
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 接受消息 处理消息的回调函数
- * @param consumerTag 消费者标签 channel.basicConsume() 指定使用什么标签的消费者消费消息
- * @param envelope 消息包的信息:消息id routingkey exchange
- * @param properties 属性信息
- * @param body 消息
- * @throws IOException
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("路由:" + envelope.getRoutingKey());
- System.out.println("交换机:" + envelope.getExchange());
- System.out.println("消息ID:" + envelope.getDeliveryTag());
- System.out.println("消息" + new String(body, "UTF-8"));
- super.handleDelivery(consumerTag, envelope, properties, body);
- }
- };
- /**
- * 1.队列名称
- * 2.是否确认,true 消息消费后,mq自动回复消息已消费 mq收到回复后删除消息
- * 3.消费者对象,消息消费的回调函数
- */
- channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
- }
- }
复制代码
- 工作模式:在一个队列中假如有多个消耗者,那么消耗者之间对于同一个消息的关系是竞争的关系。
- package com.rabbitmq.work;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.utils.ConnectionUntils;
- /**
- * 工作模式
- * 生产者
- */
- public class Producer {
- static final String QUEUE_NAME = "work_name";
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUntils.getConnection();
- Channel channel = connection.createChannel();
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- for (int i = 1; i < 30; i++) {
- String message = "Hello World!" + i;
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
- }
- channel.close();
- connection.close();
- }
- }
复制代码- package com.rabbitmq.work;
- import com.rabbitmq.client.*;
- import com.rabbitmq.utils.ConnectionUntils;
- import java.io.IOException;
- /**
- * 消费者1
- */
- public class Consumer1 {
- public static void main(String[] args) throws Exception {
- Connection conection = ConnectionUntils.getConnection();
- Channel channel = conection.createChannel();
- channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("路由:" + envelope.getRoutingKey());
- System.out.println("交换机:" + envelope.getExchange());
- System.out.println("消息ID:" + envelope.getDeliveryTag());
- System.out.println("消息" + new String(body, "UTF-8"));
- super.handleDelivery(consumerTag, envelope, properties, body);
- }
- };
- channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
- }
- }
复制代码- package com.rabbitmq.work;
- import com.rabbitmq.client.*;
- import com.rabbitmq.utils.ConnectionUntils;
- import java.io.IOException;
- /**
- * 消费者2
- */
- public class Consumer2 {
- public static void main(String[] args) throws Exception {
- Connection conection = ConnectionUntils.getConnection();
- Channel channel = conection.createChannel();
- channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
- //接受消息
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("路由:" + envelope.getRoutingKey());
- System.out.println("交换机:" + envelope.getExchange());
- System.out.println("消息ID:" + envelope.getDeliveryTag());
- System.out.println("消息" + new String(body, "UTF-8"));
- super.handleDelivery(consumerTag, envelope, properties, body);
- }
- };
- channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
- }
- }
复制代码- package com.rabbitmq.ps;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.utils.ConnectionUntils;
- /**
- * 发布订阅者模式
- * 生产者
- */
- public class Producer {
- static final String EXCHANGE_NAME = "fanout_exchange";
- static final String QUEUE_NAME1 = "fanout_name_1";
- static final String QUEUE_NAME2 = "fanout_name_2";
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUntils.getConnection();
- Channel channel = connection.createChannel();
- /**
- * 声明交换机
- * 1 交换机名称
- * 2 交换机类型 fanout
- * direct
- * topic
- */
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
- channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
- channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
- //队列绑定交换机
- channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
- channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");
- for (int i = 1; i <= 10; i++) {
- String message = "Hello World!" + i;
- channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
- }
- channel.close();
- connection.close();
- }
- }
复制代码- package com.rabbitmq.ps;
- import com.rabbitmq.client.*;
- import com.rabbitmq.utils.ConnectionUntils;
- import java.io.IOException;
- /**
- * 消费者1
- */
- public class Consumer1 {
- public static void main(String[] args) throws Exception {
- Connection conection = ConnectionUntils.getConnection();
- Channel channel = conection.createChannel();
- channel.queueDeclare(Producer.QUEUE_NAME1, true, false, false, null);
- channel.queueDeclare(Producer.QUEUE_NAME2, true, false, false, null);
- channel.queueBind(Producer.QUEUE_NAME1,Producer.EXCHANGE_NAME,"");
- channel.queueBind(Producer.QUEUE_NAME2,Producer.EXCHANGE_NAME,"");
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("路由:" + envelope.getRoutingKey());
- System.out.println("交换机:" + envelope.getExchange());
- System.out.println("消息ID:" + envelope.getDeliveryTag());
- System.out.println("消息" + new String(body, "UTF-8"));
- super.handleDelivery(consumerTag, envelope, properties, body);
- }
- };
- channel.basicConsume(Producer.QUEUE_NAME1, true, consumer);
- }
- }
复制代码- package com.rabbitmq.ps;
- import com.rabbitmq.client.*;
- import com.rabbitmq.utils.ConnectionUntils;
- import java.io.IOException;
- /**
- * 消费者2
- */
- public class Consumer2 {
- public static void main(String[] args) throws Exception {
- Connection conection = ConnectionUntils.getConnection();
- Channel channel = conection.createChannel();
- channel.exchangeDeclare(Producer.EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
- channel.queueDeclare(Producer.QUEUE_NAME1, true, false, false, null);
- channel.queueDeclare(Producer.QUEUE_NAME2, true, false, false, null);
- channel.queueBind(Producer.QUEUE_NAME1,Producer.EXCHANGE_NAME,"");
- channel.queueBind(Producer.QUEUE_NAME2,Producer.EXCHANGE_NAME,"");
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("路由:" + envelope.getRoutingKey());
- System.out.println("交换机:" + envelope.getExchange());
- System.out.println("消息ID:" + envelope.getDeliveryTag());
- System.out.println("消息" + new String(body, "UTF-8"));
- super.handleDelivery(consumerTag, envelope, properties, body);
- }
- };
- channel.basicConsume(Producer.QUEUE_NAME2, true, consumer);
- }
- }
复制代码- package com.rabbitmq.routing;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.utils.ConnectionUntils;
- /**
- * 路由模式
- * 生产者
- */
- public class Producer {
- static final String EXCHANGE_NAME = "direct_exchange";
- static final String QUEUE_NAME1 = "direct_name_insert";
- static final String QUEUE_NAME2 = "direct_name_update";
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUntils.getConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
- channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
- channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
- channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"insert");
- channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"update");
- String message = "Hello World! - 新增 insert";
- channel.basicPublish(EXCHANGE_NAME,"insert",null,message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
- String message2 = "Hello World! - 修改 update";
- channel.basicPublish(EXCHANGE_NAME,"update",null,message2.getBytes());
- System.out.println(" [x] Sent '" + message2 + "'");
- channel.close();
- connection.close();
- }
- }
复制代码- package com.rabbitmq.routing;
- import com.rabbitmq.client.*;
- import com.rabbitmq.utils.ConnectionUntils;
- import java.io.IOException;
- public class Consumer1 {
- public static void main(String[] args) throws Exception {
- Connection conection = ConnectionUntils.getConnection();
- Channel channel = conection.createChannel();
- channel.queueDeclare(Producer.QUEUE_NAME1, true, false, false, null);
- channel.queueDeclare(Producer.QUEUE_NAME2, true, false, false, null);
- channel.queueBind(Producer.QUEUE_NAME1,Producer.EXCHANGE_NAME,"insert");
- channel.queueBind(Producer.QUEUE_NAME2,Producer.EXCHANGE_NAME,"update");
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("路由:" + envelope.getRoutingKey());
- System.out.println("交换机:" + envelope.getExchange());
- System.out.println("消息ID:" + envelope.getDeliveryTag());
- System.out.println("消息" + new String(body, "UTF-8"));
- super.handleDelivery(consumerTag, envelope, properties, body);
- }
- };
- channel.basicConsume(Producer.QUEUE_NAME1, true, consumer);
- }
- }
复制代码- package com.rabbitmq.routing;
- import com.rabbitmq.client.*;
- import com.rabbitmq.utils.ConnectionUntils;
- import java.io.IOException;
- public class Consumer2 {
- public static void main(String[] args) throws Exception {
- Connection conection = ConnectionUntils.getConnection();
- Channel channel = conection.createChannel();
- channel.queueDeclare(Producer.QUEUE_NAME1, true, false, false, null);
- channel.queueDeclare(Producer.QUEUE_NAME2, true, false, false, null);
- channel.queueBind(Producer.QUEUE_NAME1,Producer.EXCHANGE_NAME,"insert");
- channel.queueBind(Producer.QUEUE_NAME2,Producer.EXCHANGE_NAME,"update");
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("路由:" + envelope.getRoutingKey());
- System.out.println("交换机:" + envelope.getExchange());
- System.out.println("消息ID:" + envelope.getDeliveryTag());
- System.out.println("消息" + new String(body, "UTF-8"));
- super.handleDelivery(consumerTag, envelope, properties, body);
- }
- };
- channel.basicConsume(Producer.QUEUE_NAME2, true, consumer);
- }
- }
复制代码- package com.rabbitmq.topic;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.utils.ConnectionUntils;
- /**
- * 通配符模式
- * 生产者
- */
- public class Producer {
- static final String EXCHANGE_NAME = "topic_exchange";
- static final String QUEUE_NAME1 = "topic_queue_1";
- static final String QUEUE_NAME2 = "topic_queue_2";
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUntils.getConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
- channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
- channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
- channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"item.*");
- channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"item.insert");
- channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"item.update");
- String message = "Hello World! - 新增 insert";
- channel.basicPublish(EXCHANGE_NAME,"item.insert",null,message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
- String message2 = "Hello World! - 修改 update";
- channel.basicPublish(EXCHANGE_NAME,"item.update",null,message2.getBytes());
- System.out.println(" [x] Sent '" + message2 + "'");
- String message3 = "Hello World! - 修改 delete";
- channel.basicPublish(EXCHANGE_NAME,"item.delete",null,message3.getBytes());
- System.out.println(" [x] Sent '" + message3 + "'");
- channel.close();
- connection.close();
- }
- }
复制代码- package com.rabbitmq.topic;
- import com.rabbitmq.client.*;
- import com.rabbitmq.utils.ConnectionUntils;
- import java.io.IOException;
- public class Consumer1 {
- public static void main(String[] args) throws Exception {
- Connection conection = ConnectionUntils.getConnection();
- Channel channel = conection.createChannel();
- channel.queueDeclare(Producer.QUEUE_NAME1, true, false, false, null);
- channel.queueDeclare(Producer.QUEUE_NAME2, true, false, false, null);
- channel.queueBind(Producer.QUEUE_NAME1,Producer.EXCHANGE_NAME,"item.*");
- channel.queueBind(Producer.QUEUE_NAME2,Producer.EXCHANGE_NAME,"item.insert");
- channel.queueBind(Producer.QUEUE_NAME2,Producer.EXCHANGE_NAME,"item.update");
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("路由:" + envelope.getRoutingKey());
- System.out.println("交换机:" + envelope.getExchange());
- System.out.println("消息ID:" + envelope.getDeliveryTag());
- System.out.println("消息" + new String(body, "UTF-8"));
- super.handleDelivery(consumerTag, envelope, properties, body);
- }
- };
- channel.basicConsume(Producer.QUEUE_NAME1, true, consumer);
- }
- }
复制代码- package com.rabbitmq.topic;
- import com.rabbitmq.client.*;
- import com.rabbitmq.utils.ConnectionUntils;
- import java.io.IOException;
- public class Consumer2 {
- public static void main(String[] args) throws Exception {
- Connection conection = ConnectionUntils.getConnection();
- Channel channel = conection.createChannel();
- channel.queueDeclare(Producer.QUEUE_NAME1, true, false, false, null);
- channel.queueDeclare(Producer.QUEUE_NAME2, true, false, false, null);
- channel.queueBind(Producer.QUEUE_NAME1,Producer.EXCHANGE_NAME,"item.*");
- channel.queueBind(Producer.QUEUE_NAME2,Producer.EXCHANGE_NAME,"item.insert");
- channel.queueBind(Producer.QUEUE_NAME2,Producer.EXCHANGE_NAME,"item.update");
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("路由:" + envelope.getRoutingKey());
- System.out.println("交换机:" + envelope.getExchange());
- System.out.println("消息ID:" + envelope.getDeliveryTag());
- System.out.println("消息" + new String(body, "UTF-8"));
- super.handleDelivery(consumerTag, envelope, properties, body);
- }
- };
- channel.basicConsume(Producer.QUEUE_NAME2, true, consumer);
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |