RabbitMQ






















Messaging that just works — RabbitMQ
案例
pom.xml
- <dependencies>
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.6.0</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.8.0</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
复制代码 生产者
- package com.www.mq;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * 生产者:发送消息
- *
- * @author Www
- * @version 1.8
- * @since 2023/2/15 20:41 星期三
- */
- public class ProducerHelloWorld {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1、创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2、设置参数
- // IP 默认值 localhost
- connectionFactory.setHost("192.168.36.100");
- // 端口 默认 5672
- connectionFactory.setPort(5672);
- // 虚拟机 默认值 /
- connectionFactory.setVirtualHost("/ljt");
- // 用户名 默认值 guest
- connectionFactory.setUsername("ljt");
- // 密码 默认值 guest
- connectionFactory.setPassword("ljt");
-
-
- // 3、创建连接 Connection : 受检异常——> 抛出
- Connection connection = connectionFactory.newConnection();
-
- // 4、创建通道 Channel
- Channel channel = connection.createChannel();
-
- // 5、创建队列 Queue
- /*
- queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
- 参数:
- 1. queue:队列名称
- 2. durable:是否持久化,当mq重启之后,还在
- 3. exclusive:
- * 是否独占。只能有一个消费者监听这队列
- * 当Connection关闭时,是否删除队列
- *
- 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
- 5. arguments:参数。
- */
- // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
- channel.queueDeclare(
- // 队列名称
- "hello_world",
- // 是否持久化,当mq重启之后,还在
- true,
- // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
- false,
- // 是否自动删除。当没有Consumer时,自动删除掉
- false,
- // 参数
- null
- );
-
- /*
- basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
- 参数:
- 1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
- 2. routingKey:路由名称
- 3. props:配置信息
- 4. body:发送消息数据
- */
- // 发送消息内容
- String body = "HelloWorld~~~~~~~~~~";
-
- // 6、发送消息
- channel.basicPublish(
- // 交换机名称。简单模式下交换机会使用默认的 ""
- "",
- // 路由名称
- "hello_world",
- // 配置信息
- null,
- // 发送消息数据
- body.getBytes()
- );
-
- // 释放资源
- channel.close();
- connection.close();
-
- }
- }
复制代码 消费者
- package com.www.mq;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * 消费者:获取消息
- *
- * @author Www
- * @version 1.8
- * @since 2023/2/15 20:41 星期三
- */
- public class ConsumerHelloWorld {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1、创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2、设置参数
- // IP 默认值 localhost
- connectionFactory.setHost("192.168.36.100");
- // 端口 默认 5672
- connectionFactory.setPort(5672);
- // 虚拟机 默认值 /
- connectionFactory.setVirtualHost("/ljt");
- // 用户名 默认值 guest
- connectionFactory.setUsername("ljt");
- // 密码 默认值 guest
- connectionFactory.setPassword("ljt");
-
-
- // 3、创建连接 Connection : 受检异常——> 抛出
- Connection connection = connectionFactory.newConnection();
-
- // 4、创建通道 Channel
- Channel channel = connection.createChannel();
-
- // 5、创建队列 Queue
- /*
- queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
- 参数:
- 1. queue:队列名称
- 2. durable:是否持久化,当mq重启之后,还在
- 3. exclusive:
- * 是否独占。只能有一个消费者监听这队列
- * 当Connection关闭时,是否删除队列
- *
- 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
- 5. arguments:参数。
- */
- // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
- channel.queueDeclare(
- // 队列名称
- "hello_world",
- // 是否持久化,当mq重启之后,还在
- true,
- // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
- false,
- // 是否自动删除。当没有Consumer时,自动删除掉
- false,
- // 参数
- null);
-
- /*
- basicConsume(String queue, boolean autoAck, Consumer callback)
- 参数:
- 1. queue:队列名称
- 2. autoAck:是否自动确认
- 3. callback:回调对象
- */
- // 6、接收消息
- Consumer consumer = new DefaultConsumer(channel) {
- /**
- * <p>
- * 回调方法,当收到消息后,会自动执行该方法
- * 1. consumerTag:标识
- * 2. envelope:获取一些信息,交换机,路由key...
- * 3. properties:配置信息
- * 4. body:数据
- * </p>
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("consumerTag:" + consumerTag);
- System.out.println("Exchange:" + envelope.getExchange());
- System.out.println("RoutingKey:" + envelope.getRoutingKey());
- System.out.println("properties:" + properties);
- System.out.println("body:" + new String(body));
- }
- };
- channel.basicConsume(
- // 队列名称
- "hello_world",
- // 是否自动确认
- true,
- // 回调对象
- consumer
- );
-
- // 不需要关闭资源
- }
- }
复制代码
工作队列

生产者
- package com.www.mq;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * 生产者:发送消息
- *
- * @author Www
- * @version 1.8
- * @since 2023/2/15 20:41 星期三
- */
- public class ProducerWorkQueues {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1、创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2、设置参数
- // IP 默认值 localhost
- connectionFactory.setHost("192.168.36.100");
- // 端口 默认 5672
- connectionFactory.setPort(5672);
- // 虚拟机 默认值 /
- connectionFactory.setVirtualHost("/ljt");
- // 用户名 默认值 guest
- connectionFactory.setUsername("ljt");
- // 密码 默认值 guest
- connectionFactory.setPassword("ljt");
-
-
- // 3、创建连接 Connection : 受检异常——> 抛出
- Connection connection = connectionFactory.newConnection();
-
- // 4、创建通道 Channel
- Channel channel = connection.createChannel();
-
- // 5、创建队列 Queue
- /*
- queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
- 参数:
- 1. queue:队列名称
- 2. durable:是否持久化,当mq重启之后,还在
- 3. exclusive:
- * 是否独占。只能有一个消费者监听这队列
- * 当Connection关闭时,是否删除队列
- *
- 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
- 5. arguments:参数。
- */
- // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
- channel.queueDeclare(
- // 队列名称
- "WorkQueue",
- // 是否持久化,当mq重启之后,还在
- true,
- // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
- false,
- // 是否自动删除。当没有Consumer时,自动删除掉
- false,
- // 参数
- null
- );
-
- /*
- basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
- 参数:
- 1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
- 2. routingKey:路由名称
- 3. props:配置信息
- 4. body:发送消息数据
- */
- for (int i = 0; i < 10; i++) {
- // 发送消息内容
- String body = "WorkQueue~~~~~~~~~~" + i;
-
- // 6、发送消息
- channel.basicPublish(
- // 交换机名称。简单模式下交换机会使用默认的 ""
- "",
- // 路由名称
- "WorkQueue",
- // 配置信息
- null,
- // 发送消息数据
- body.getBytes()
- );
- }
-
-
- // 释放资源
- channel.close();
- connection.close();
-
- }
- }
复制代码 消费者: 两个
- package com.www.mq;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * 消费者:获取消息
- *
- * @author Www
- * @version 1.8
- * @since 2023/2/15 20:41 星期三
- */
- public class ConsumerWorkQueues1 {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1、创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2、设置参数
- // IP 默认值 localhost
- connectionFactory.setHost("192.168.36.100");
- // 端口 默认 5672
- connectionFactory.setPort(5672);
- // 虚拟机 默认值 /
- connectionFactory.setVirtualHost("/ljt");
- // 用户名 默认值 guest
- connectionFactory.setUsername("ljt");
- // 密码 默认值 guest
- connectionFactory.setPassword("ljt");
-
-
- // 3、创建连接 Connection : 受检异常——> 抛出
- Connection connection = connectionFactory.newConnection();
-
- // 4、创建通道 Channel
- Channel channel = connection.createChannel();
-
- // 5、创建队列 Queue
- /*
- queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
- 参数:
- 1. queue:队列名称
- 2. durable:是否持久化,当mq重启之后,还在
- 3. exclusive:
- * 是否独占。只能有一个消费者监听这队列
- * 当Connection关闭时,是否删除队列
- *
- 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
- 5. arguments:参数。
- */
- // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
- channel.queueDeclare(
- // 队列名称
- "WorkQueue",
- // 是否持久化,当mq重启之后,还在
- true,
- // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
- false,
- // 是否自动删除。当没有Consumer时,自动删除掉
- false,
- // 参数
- null);
-
- /*
- basicConsume(String queue, boolean autoAck, Consumer callback)
- 参数:
- 1. queue:队列名称
- 2. autoAck:是否自动确认
- 3. callback:回调对象
- */
- // 6、接收消息
- Consumer consumer = new DefaultConsumer(channel) {
- /**
- * <p>
- * 回调方法,当收到消息后,会自动执行该方法
- * 1. consumerTag:标识
- * 2. envelope:获取一些信息,交换机,路由key...
- * 3. properties:配置信息
- * 4. body:数据
- * </p>
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("consumerTag:" + consumerTag);
- System.out.println("Exchange:" + envelope.getExchange());
- System.out.println("RoutingKey:" + envelope.getRoutingKey());
- System.out.println("properties:" + properties);
- System.out.println("body:" + new String(body));
- }
- };
- channel.basicConsume(
- // 队列名称
- "WorkQueue",
- // 是否自动确认
- true,
- // 回调对象
- consumer
- );
-
- // 不需要关闭资源
- }
- }
复制代码 订阅模式

生产者
- package com.www.mq;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * 生产者:发送消息
- *
- * @author Www
- * @version 1.8
- * @since 2023/2/15 20:41 星期三
- */
- public class ProducerPubSub {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1、创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2、设置参数
- // IP 默认值 localhost
- connectionFactory.setHost("192.168.36.100");
- // 端口 默认 5672
- connectionFactory.setPort(5672);
- // 虚拟机 默认值 /
- connectionFactory.setVirtualHost("/ljt");
- // 用户名 默认值 guest
- connectionFactory.setUsername("ljt");
- // 密码 默认值 guest
- connectionFactory.setPassword("ljt");
-
-
- // 3、创建连接 Connection : 受检异常——> 抛出
- Connection connection = connectionFactory.newConnection();
-
- // 4、创建通道 Channel
- Channel channel = connection.createChannel();
-
- // 5、创建交换机
- /*
- exchangeDeclare(
- String exchange,BuiltinExchangeType type,
- boolean durable, boolean autoDelete,
- boolean internal, Map<String, Object> arguments
- )
- 参数:
- 1. exchange:交换机名称
- 2. type:交换机类型
- DIRECT("direct"),:定向
- FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
- TOPIC("topic"),通配符的方式
- HEADERS("headers");参数匹配
- 3. durable:是否持久化
- 4. autoDelete:自动删除
- 5. internal:内部使用。 一般false
- 6. arguments:参数
- */
- String exchangeName = "test_fanout";
- channel.exchangeDeclare(
- // 交换机名称
- exchangeName,
- // type:交换机类型
- // DIRECT("direct"),:定向
- // FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
- // TOPIC("topic"),通配符的方式
- // HEADERS("headers");参数匹配
- BuiltinExchangeType.FANOUT,
- // 是否持久化
- true,
- // 内部使用
- false,
- // 参数
- null
-
- );
-
- // 6、创建队列
- String queue1Name = "test_fanout_queue1";
- String queue2Name = "test_fanout_queue2";
- channel.queueDeclare(
- // 队列名
- queue1Name,
- // 是否持久
- true,
- // 是否独占
- false,
- //是否自动删除
- false,
- // 参数
- null
- );
- channel.queueDeclare(
- // 队列名
- queue2Name,
- // 是否持久
- true,
- // 是否独占
- false,
- //是否自动删除
- false,
- // 参数
- null
- );
-
- // 7、绑定队列和交换机
- /*
- queueBind(String queue, String exchange, String routingKey)
- 参数:
- 1. queue:队列名称
- 2. exchange:交换机名称
- 3. routingKey:路由键,绑定规则
- 如果交换机的类型为fanout ,routingKey设置为""
- */
- channel.queueBind(
- // 队列名
- queue1Name,
- // 交换机名
- exchangeName,
- // routingKey:路由键,绑定规则
- // 如果交换机的类型为fanout ,routingKey设置为""
- ""
- );
- channel.queueBind(
- // 队列名
- queue2Name,
- // 交换机名
- exchangeName,
- // routingKey:路由键,绑定规则
- // 如果交换机的类型为fanout ,routingKey设置为""
- ""
- );
-
- /*
- basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
- 参数:
- 1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
- 2. routingKey:路由名称
- 3. props:配置信息
- 4. body:发送消息数据
- */
- // 发送消息内容
- String body = "日志信息:张三调用了findAll方法...日志级别:info...";
- // 8、发送消息
- channel.basicPublish(
- // 交换机名称。简单模式下交换机会使用默认的 ""
- exchangeName,
- // 路由名称 :如果交换机的类型为fanout ,routingKey设置为""
- "",
- // 配置信息
- null,
- // 发送消息数据
- body.getBytes()
- );
-
-
- // 9、释放资源
- channel.close();
- connection.close();
-
- }
- }
复制代码 消费者1
- package com.www.mq;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * 消费者:获取消息
- *
- * @author Www
- * @version 1.8
- * @since 2023/2/15 20:41 星期三
- */
- public class ConsumerPubSub1 {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1、创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2、设置参数
- // IP 默认值 localhost
- connectionFactory.setHost("192.168.36.100");
- // 端口 默认 5672
- connectionFactory.setPort(5672);
- // 虚拟机 默认值 /
- connectionFactory.setVirtualHost("/ljt");
- // 用户名 默认值 guest
- connectionFactory.setUsername("ljt");
- // 密码 默认值 guest
- connectionFactory.setPassword("ljt");
-
-
- // 3、创建连接 Connection : 受检异常——> 抛出
- Connection connection = connectionFactory.newConnection();
-
- // 4、创建通道 Channel
- Channel channel = connection.createChannel();
-
- // 队列名
- String queue1Name = "test_fanout_queue1";
-
-
- /*
- basicConsume(String queue, boolean autoAck, Consumer callback)
- 参数:
- 1. queue:队列名称
- 2. autoAck:是否自动确认
- 3. callback:回调对象
- */
- // 5、接收消息
- Consumer consumer = new DefaultConsumer(channel) {
- /**
- * <p>
- * 回调方法,当收到消息后,会自动执行该方法
- * 1. consumerTag:标识
- * 2. envelope:获取一些信息,交换机,路由key...
- * 3. properties:配置信息
- * 4. body:数据
- * </p>
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("consumerTag:" + consumerTag);
- System.out.println("Exchange:" + envelope.getExchange());
- System.out.println("RoutingKey:" + envelope.getRoutingKey());
- System.out.println("properties:" + properties);
- System.out.println("body:" + new String(body));
- }
- };
- channel.basicConsume(
- // 队列名称
- queue1Name,
- // 是否自动确认
- true,
- // 回调对象
- consumer
- );
-
- // 不需要关闭资源
- }
- }
复制代码 消费者2
- package com.www.mq;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * 消费者:获取消息
- *
- * @author Www
- * @version 1.8
- * @since 2023/2/15 20:41 星期三
- */
- public class ConsumerPubSub2 {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1、创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2、设置参数
- // IP 默认值 localhost
- connectionFactory.setHost("192.168.36.100");
- // 端口 默认 5672
- connectionFactory.setPort(5672);
- // 虚拟机 默认值 /
- connectionFactory.setVirtualHost("/ljt");
- // 用户名 默认值 guest
- connectionFactory.setUsername("ljt");
- // 密码 默认值 guest
- connectionFactory.setPassword("ljt");
-
-
- // 3、创建连接 Connection : 受检异常——> 抛出
- Connection connection = connectionFactory.newConnection();
-
- // 4、创建通道 Channel
- Channel channel = connection.createChannel();
-
- // 队列名
- String queue2Name = "test_fanout_queue2";
-
-
- /*
- basicConsume(String queue, boolean autoAck, Consumer callback)
- 参数:
- 1. queue:队列名称
- 2. autoAck:是否自动确认
- 3. callback:回调对象
- */
- // 5、接收消息
- Consumer consumer = new DefaultConsumer(channel) {
- /**
- * <p>
- * 回调方法,当收到消息后,会自动执行该方法
- * 1. consumerTag:标识
- * 2. envelope:获取一些信息,交换机,路由key...
- * 3. properties:配置信息
- * 4. body:数据
- * </p>
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("consumerTag:" + consumerTag);
- System.out.println("Exchange:" + envelope.getExchange());
- System.out.println("RoutingKey:" + envelope.getRoutingKey());
- System.out.println("properties:" + properties);
- System.out.println("body:" + new String(body));
- }
- };
- channel.basicConsume(
- // 队列名称
- queue2Name,
- // 是否自动确认
- true,
- // 回调对象
- consumer
- );
-
- // 不需要关闭资源
- }
- }
复制代码 Routing 路由模式


生产者
- package com.www.mq;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * Routing 工作模式
- * <p>
- * 生产者:发送消息
- *
- * @author Www
- * @version 1.8
- * @since 2023/2/15 20:41 星期三
- */
- public class ProducerRouting {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1、创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2、设置参数
- // IP 默认值 localhost
- connectionFactory.setHost("192.168.36.100");
- // 端口 默认 5672
- connectionFactory.setPort(5672);
- // 虚拟机 默认值 /
- connectionFactory.setVirtualHost("/ljt");
- // 用户名 默认值 guest
- connectionFactory.setUsername("ljt");
- // 密码 默认值 guest
- connectionFactory.setPassword("ljt");
-
-
- // 3、创建连接 Connection : 受检异常——> 抛出
- Connection connection = connectionFactory.newConnection();
-
- // 4、创建通道 Channel
- Channel channel = connection.createChannel();
-
- // 5、创建交换机
- /*
- exchangeDeclare(
- String exchange,BuiltinExchangeType type,
- boolean durable, boolean autoDelete,
- boolean internal, Map<String, Object> arguments
- )
- 参数:
- 1. exchange:交换机名称
- 2. type:交换机类型
- DIRECT("direct"),:定向
- FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
- TOPIC("topic"),通配符的方式
- HEADERS("headers");参数匹配
- 3. durable:是否持久化
- 4. autoDelete:自动删除
- 5. internal:内部使用。 一般false
- 6. arguments:参数
- */
- String exchangeName = "test_direct";
- channel.exchangeDeclare(
- // 交换机名称
- exchangeName,
- // type:交换机类型
- // DIRECT("direct"),:定向
- // FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
- // TOPIC("topic"),通配符的方式
- // HEADERS("headers");参数匹配
- BuiltinExchangeType.DIRECT,
- // 是否持久化
- true,
- // 内部使用
- false,
- // 参数
- null
-
- );
-
- // 6、创建队列
- String queue1Name = "test_direct_queue1";
- String queue2Name = "test_direct_queue2";
- channel.queueDeclare(
- // 队列名
- queue1Name,
- // 是否持久
- true,
- // 是否独占
- false,
- //是否自动删除
- false,
- // 参数
- null
- );
- channel.queueDeclare(
- // 队列名
- queue2Name,
- // 是否持久
- true,
- // 是否独占
- false,
- //是否自动删除
- false,
- // 参数
- null
- );
-
- // 7、绑定队列和交换机
- /*
- queueBind(String queue, String exchange, String routingKey)
- 参数:
- 1. queue:队列名称
- 2. exchange:交换机名称
- 3. routingKey:路由键,绑定规则
- 如果交换机的类型为fanout ,routingKey设置为""
- */
- // 队列1
- channel.queueBind(
- // 队列名
- queue1Name,
- // 交换机名
- exchangeName,
- // routingKey:路由键,绑定规则
- // 如果交换机的类型为fanout ,routingKey设置为""
- "error"
- );
- // 队列2
- channel.queueBind(
- // 队列名
- queue2Name,
- // 交换机名
- exchangeName,
- // routingKey:路由键,绑定规则
- // 如果交换机的类型为fanout ,routingKey设置为""
- "info"
- );
- channel.queueBind(
- // 队列名
- queue2Name,
- // 交换机名
- exchangeName,
- // routingKey:路由键,绑定规则
- // 如果交换机的类型为fanout ,routingKey设置为""
- "error"
- );
- channel.queueBind(
- // 队列名
- queue2Name,
- // 交换机名
- exchangeName,
- // routingKey:路由键,绑定规则
- // 如果交换机的类型为fanout ,routingKey设置为""
- "waring"
- );
-
- /*
- basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
- 参数:
- 1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
- 2. routingKey:路由名称
- 3. props:配置信息
- 4. body:发送消息数据
- */
- // 发送消息内容
- String body = "日志信息:张三调用了delete方法...警告。。。日志级别:waring...";
- // 8、发送消息
- channel.basicPublish(
- // 交换机名称。简单模式下交换机会使用默认的 ""
- exchangeName,
- // 路由名称 :如果交换机的类型为fanout ,routingKey设置为""
- "waring",
- // 配置信息
- null,
- // 发送消息数据
- body.getBytes()
- );
-
-
- // 9、释放资源
- channel.close();
- connection.close();
-
- }
- }
复制代码 消费者 1
- package com.www.mq;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * 消费者:获取消息
- *
- * @author Www
- * @version 1.8
- * @since 2023/2/15 20:41 星期三
- */
- public class ConsumerRouting1 {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1、创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2、设置参数
- // IP 默认值 localhost
- connectionFactory.setHost("192.168.36.100");
- // 端口 默认 5672
- connectionFactory.setPort(5672);
- // 虚拟机 默认值 /
- connectionFactory.setVirtualHost("/ljt");
- // 用户名 默认值 guest
- connectionFactory.setUsername("ljt");
- // 密码 默认值 guest
- connectionFactory.setPassword("ljt");
-
-
- // 3、创建连接 Connection : 受检异常——> 抛出
- Connection connection = connectionFactory.newConnection();
-
- // 4、创建通道 Channel
- Channel channel = connection.createChannel();
-
- // 队列名
- String queue2Name = "test_direct_queue1";
- /*
- basicConsume(String queue, boolean autoAck, Consumer callback)
- 参数:
- 1. queue:队列名称
- 2. autoAck:是否自动确认
- 3. callback:回调对象
- */
- // 5、接收消息
- Consumer consumer = new DefaultConsumer(channel) {
- /**
- * <p>
- * 回调方法,当收到消息后,会自动执行该方法
- * 1. consumerTag:标识
- * 2. envelope:获取一些信息,交换机,路由key...
- * 3. properties:配置信息
- * 4. body:数据
- * </p>
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("consumerTag:" + consumerTag);
- System.out.println("Exchange:" + envelope.getExchange());
- System.out.println("RoutingKey:" + envelope.getRoutingKey());
- System.out.println("properties:" + properties);
- System.out.println("body:" + new String(body));
- }
- };
- channel.basicConsume(
- // 队列名称
- queue2Name,
- // 是否自动确认
- true,
- // 回调对象
- consumer
- );
-
- // 不需要关闭资源
- }
- }
复制代码 消费者2
- package com.www.mq;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * 消费者:获取消息
- *
- * @author Www
- * @version 1.8
- * @since 2023/2/15 20:41 星期三
- */
- public class ConsumerRouting2 {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1、创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2、设置参数
- // IP 默认值 localhost
- connectionFactory.setHost("192.168.36.100");
- // 端口 默认 5672
- connectionFactory.setPort(5672);
- // 虚拟机 默认值 /
- connectionFactory.setVirtualHost("/ljt");
- // 用户名 默认值 guest
- connectionFactory.setUsername("ljt");
- // 密码 默认值 guest
- connectionFactory.setPassword("ljt");
-
-
- // 3、创建连接 Connection : 受检异常——> 抛出
- Connection connection = connectionFactory.newConnection();
-
- // 4、创建通道 Channel
- Channel channel = connection.createChannel();
-
- // 队列名
- String queue2Name = "test_direct_queue2";
- /*
- basicConsume(String queue, boolean autoAck, Consumer callback)
- 参数:
- 1. queue:队列名称
- 2. autoAck:是否自动确认
- 3. callback:回调对象
- */
- // 5、接收消息
- Consumer consumer = new DefaultConsumer(channel) {
- /**
- * <p>
- * 回调方法,当收到消息后,会自动执行该方法
- * 1. consumerTag:标识
- * 2. envelope:获取一些信息,交换机,路由key...
- * 3. properties:配置信息
- * 4. body:数据
- * </p>
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("consumerTag:" + consumerTag);
- System.out.println("Exchange:" + envelope.getExchange());
- System.out.println("RoutingKey:" + envelope.getRoutingKey());
- System.out.println("properties:" + properties);
- System.out.println("body:" + new String(body));
- }
- };
- channel.basicConsume(
- // 队列名称
- queue2Name,
- // 是否自动确认
- true,
- // 回调对象
- consumer
- );
-
- // 不需要关闭资源
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |