实践环境
JDK 1.8.0_121
amqp-client 5.16.0
附:查看差别版本的amqp-client客户端支持的Java JDK版本
https://www.rabbitmq.com/client-libraries/java-versions
mavn settings.xml
- <?xml version="1.0" encoding="UTF-8" ?>
- <settings xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.1.0 http://maven.apache.org/xsd/settings-1.1.0.xsd" xmlns="http://maven.apache.org/SETTINGS/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <localRepository>D:\maven-repo</localRepository>
- <mirrors>
- <mirror>
- <id>aliyunmaven</id>
- <name>阿里云公共仓库</name>
- <url>https://maven.aliyun.com/repository/public</url>
- <mirrorOf>*</mirrorOf>
- </mirror>
- </mirrors>
- <profiles>
- <profile>
- <repositories>
- </repositories>
- <pluginRepositories>
- </pluginRepositories>
- <id>artifactory</id>
- </profile>
- <profile>
- <id>jdk-1.8</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- <jdk>1.8</jdk>
- </activation>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
- </properties>
- </profile>
- </profiles>
- <activeProfiles>
- <activeProfile>artifactory</activeProfile>
- </activeProfiles>
- </settings>
复制代码 pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>ore.example</groupId>
- <artifactId>rabbitMQStudy</artifactId>
- <version>1.0-SNAPSHOT</version>
- <properties>
- <maven.compiler.source>3.5.4</maven.compiler.source>
- <maven.compiler.target>3.5.4</maven.compiler.target>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <amqp.client.version>5.16.0</amqp.client.version>
- <slf4j.api.version>1.7.36</slf4j.api.version>
- </properties>
- <dependencies>
-
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>${amqp.client.version}</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.api.version}</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- <version>${slf4j.api.version}</version>
- </dependency>
- </dependencies>
- </project>
复制代码 Hello World
场景:生产者 -> hello队列 -> 消费者
Sent.java- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.Channel;
- import java.nio.charset.StandardCharsets;
- public class Sent {
- private final static String QUEUE_NAME = "hello";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.88.128"); // 设置RabbitMQ服务器
- factory.setPort(5672); // 默认端口为 5672
- factory.setUsername("testacc"); // 设置连接登录用户
- factory.setPassword("test1234"); // 设置用户访问密码
- factory.setAutomaticRecoveryEnabled(true); // 开启Connection自动恢复功能,这意味着如果连接丢失,客户端将尝试重新连接到 RabbitMQ 服务器。
- factory.setNetworkRecoveryInterval(5000); // 尝试重连时间间隔 // 设置为 5000:如果RabbitMQ客户端失去连接后,每5秒自动尝试重连一次
- factory.setVirtualHost("/"); // 设置虚拟主机,默认 /
- factory.setConnectionTimeout(30 * 1000); // 设置TCP连接超时时间 默认 60000(60秒)
- factory.setHandshakeTimeout(30 * 1000); // 设置SSL握手超时时间 默认 10000(10秒)
- factory.setShutdownTimeout(0); // 设置客户端关闭前等待操作完成的最大时间 默认 10000(10秒)
- // 因为Connection和Channel都实现了java.lang.AutoCloseable,使用try-with-resources语句,可以在代码中显示关闭连接和信道
- try (Connection connection = factory.newConnection(); // 创建连接
- Channel channel = connection.createChannel()) { // 创建信道
- channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 声明一个队列(等幂操作),如果队列不存在,自动创建,Routing Key: hello 交换机:(AMQP default)
- String message = "Hello World!";
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
- System.out.println(" [x] Sent '" + message + "'");
- }
- }
- }
复制代码 说明:
basicPublish函数说明- void basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) throws IOException;
- void basicPublish(String var1, String var2, boolean var3, BasicProperties var4, byte[] var5) throws IOException;
- void basicPublish(String var1, String var2, boolean var3, boolean var4, BasicProperties var5, byte[] var6) throws IOException;
复制代码
- var1 : 指定要发送的交换机的名称, 假如设置为空字符串, 那么消息会被发送到RabbitMQ的默认交换机.
- var2 : 路由键, 用于指定消息要路由到的队列
- var3:是否强制路由,假如设置为true,并且消息无法路由到任何队列(没有匹配的绑定),那么RabbitMQ会返回一个错误给生产者。假如设置为false,则消息将被丢弃
- var4:是否立即发布(immediate flag)。假如设置为true,并且消息无法路由到任何消费者(没有匹配的队列或消费者不在线),那么RabbitMQ会返回一个错误给生产者。假如设置为false,消息将被存储在队列中等候消费者。
- BasicProperties 可以使用PERSISTENT_TEXT_PLAIN表示发送的是需要持久化的消息,其实也就是将BasicProperties中的deliveryMode设置为2
- props : 消息的属性, 这是一个可选参数, 里面有: 消息类型, 格式, 优先级, 过期时间等等
- body : 消息体, 也就是要发送的消息本身
- var5 : 消息属性,同props
- var6:消息体,同body。
queueDeclare函数说明- com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare() throws IOException;
- com.rabbitmq.client.AMQP.Queue.DeclareOk (String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;
复制代码 说明:
- 当调用第一个不带参数的queueDeclare()时,RabbitMQ 会自动创建一个新的队列,该队列的名称将由 RabbitMQ 自动生成,并且这个队列黑白持久的、排他的、自动删除的,且不带任何额外的参数。
由于没有指定队列名称,你通常无法预先知道队列的确切名称,这可能会在某些场景下造成不便,比如当你需要多个消费者共享同一个队列时。此外,由于队列黑白持久的,假如 RabbitMQ 服务器重启,这个队列将会丢失,所有在队列中的消息也会丢失。
该方法适用于那些不需要复杂队列设置的场景,比如暂时测试或简单应用,可能不适用于需要持久化存储或明确指定队列名称的场景。
- 第二个方法答应更细致地设置队列的属性,参数说明如下:
- var1:队列的名称,不能为空,且要求在 RabbitMQ 服务器上是唯一的。
- var2:是否持久化队列。true -- 持久化,即 RabbitMQ 服务器重启后依然存在。false,非持久化的,服务器重启后队列将不存在。
- var3:是否排他。true--是,队列只能被声明它的连接使用,并且当连接关闭时,队列会被自动删除。这通常用于暂时队列。false -- 否
- var4:是否自动删除。true,当最后一个消费者断开连接后,队列会自动删除。假如设置为 false,则不会自动删除队列。
- var5:一组额外的队列参数,可以用来设置队列的更多高级特性。例如,队列的最大长度、消息生存时间等。
该方法适用于那些需要复杂队列设置和高级特性的场景。
- 当调用第二个方法时,RabbitMQ会检查是否已经存在具有类似名称的队列,假如假如队列不存在,则根据提供的参数创建一个新的队列。假如已存在,则不再创建
basicConsume 函数说明
basicConsume 有20个重载函数,这里就不一一列出了,常用方法如下:- String basicConsume(String var1, boolean var2, DeliverCallback var3, CancelCallback var4) throws IOException;
复制代码 说明:
- var1:消费者要从中接收消息的队列名称
- var2:设置是否自动确认消息。
true 自动确认消息--一旦消息被交付给消费者,RabbitMQ 会自动将其标志为已确认,消息就从队列中移除,纵然消费者还没有实际处理完这条消息。这种模式下,假如消费者在处理消息时瓦解或发生错误,那么这条消息就会丢失,因为 RabbitMQ 以为它已经被成功处理了。
false 不启动确认消息。消费者需要显式地调用 basicAck 方法来确认消息已被成功处理。这样,假如消费者在处理消息时瓦解,RabbitMQ 会重新将这条消息放回队列中,等候其他消费者处理,从而保证了消息的可靠性。
- var3:一个回调函数,当 RabbitMQ 向消费者发送消息,切消费被消费者成功消息后,会自动调用这个回调。开发者可以在该回调函数中处理接收到的消息,比如打印消息内容大概进行其他业务逻辑。
- var4:可选的回调函数,当消费者取消订阅时会自动调用这个回调。这个回调可以用于实行清理工作,比如释放资源、记录日记等。
Recv.java- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.DeliverCallback;
- import java.nio.charset.StandardCharsets;
- public class Recv {
- private final static String QUEUE_NAME = "hello";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.88.128"); // 设置RabbitMQ服务器IP
- factory.setUsername("testacc"); // 设置连接登录用户
- factory.setPassword("test1234"); // 设置用户访问密码
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel(); // 创建信道
- channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 声明需要消费的队列,如果队列不存在则创建
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- DeliverCallback deliverCallback = (consumerTag, delivery) -> { // 定义一个回调函数用于缓冲服务器推送的消息
- String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
- System.out.println(" [x] Received '" + message + "'");
- };
- channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); // 启动一个消费者,并返回服务端生成的消费者标识
- }
- }
复制代码 说明:这里为啥不用类似 生产者代码中的try-with-resources语句,因为这里想让消费者持续异步监听队列消息,而不是消费完一条消息后马上退出。
运行测试
先运行Recv,控制台输出:- [*] Waiting for messages. To exit press CTRL+C
复制代码 再运行Sent,控制台输出:运行Recv的控制台输出:- [x] Received 'Hello World!'
复制代码 此外,运行Recv后, 查看RabbitMQ管理界面,可以看到Channels Tab页新增显示一条信道,Connections Tab页新增显示一条连接,Queues界面新增一个名为hello的队列
参考链接:
https://www.rabbitmq.com/tutorials/tutorial-one-java
工作队列(使命队列)
在本节中,将创建一个工作队列,用于在多个woker之间分配耗时的使命。
工作队列(又名:使命队列)背后的主要头脑是避免立即实行资源密集型使命,并等候其完成。而是把使命安排在以后完成。将使命封装为消息并将其发送到队列。在背景运行的工作进程将pop出使命并最终实行作业。当你运行多个worker时,使命将在它们之间共享。
这个概念在web应用程序中特别有用,因为在短HTTP哀求窗口内无法处理复杂的使命。
场景--轮询(round-robin):
说明:P 代表生产者,Queue为队列, C 代表 消费者
NewTask.java- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.MessageProperties;
- public class NewTask {
- private final static String TASK_QUEUE_NAME = "task_queue";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.88.128");
- factory.setPort(5672);
- factory.setUsername("testacc");
- factory.setPassword("test1234");
- factory.setShutdownTimeout(0);
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
- String[] msgs = {"First message...", "Second message", "Third message...", "Fourth message", "Fifth message..."};
- for (int i = 0; i < msgs.length; i++) {
- channel.basicPublish("", TASK_QUEUE_NAME, // 第一个参数代表交换机名称,设置为空,表示使用默认交换机
- MessageProperties.PERSISTENT_TEXT_PLAIN,
- msgs[i].getBytes("UTF-8"));
- System.out.println(" [x] Sent '" + msgs[i] + "'");
- }
- }
- }
- }
复制代码 Worker.java- import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.88.128"); factory.setPort(5672); factory.setUsername("testacc"); factory.setPassword("test1234"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); // 设置第2个参数为True,设置队列为持久化队列 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; // 将第二个参数设置为false,即不自动应答,保证消息处理的可靠性 channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { /*模拟实行使命耗时*/ for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(2000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } }}
复制代码 先运行Worker,开启两个Worker进程,然后运行NewTask 五次,查看控制台输出
NewTask运行输出- [x] Sent 'First message...'
- [x] Sent 'Second message'
- [x] Sent 'Third message...'
- [x] Sent 'Fourth message'
- [x] Sent 'Fifth message...'
复制代码 第一Worker输出- [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message...' [x] Done [x] Received 'Third message...' [x] Done [x] Received 'Fifth message...' [x] Done
复制代码 第二个Worker输出- [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message' [x] Done [x] Received 'Fourth message' [x] Done
复制代码 说明:
默认环境下,RabbitMQ将按次序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到类似数量的消息。这种分发消息的方式称为轮询。
场景-公中分发(Fair dispatch):
你可能已经注意到,分发仍然没有完全按照我们的要求工作。例如,在有两个worker的环境下,当所有奇数消息都很重(消息处理比较耗时),偶数消息都很轻时(消息处理比较简单,不怎么耗时),一个worker会一直很忙,另一个几乎不做任何工作。好吧,RabbitMQ对此一无所知,仍然会匀称地发送消息。
这是因为RabbitMQ只是在消息进入队列时分发消息。它不思量消费者未确认的消息数量。它只是盲目地将每第n条消息分派给第n个消费者
为了克服这一点,可使用带参数prefetchCount = 1的basicQos方法。这告诉RabbitMQ一次不要给一个worker发送多条消息。大概,换句话说,在处理完并确认前一条消息之前,不要向worker发送新消息。取而代之,将消息发送给下一个不忙的worker。- int prefetchCount = 1;
- channel.basicQos(prefetchCount);
复制代码注意队列大小
假如所有的worker都很忙,队列可能会排满。需要密切关注这一点,也许可以增加更多的worker,大概采取其他策略。
- import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.88.128"); factory.setPort(5672); factory.setUsername("testacc"); factory.setPassword("test1234"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); // 设置第2个参数为True,设置队列为持久化队列 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; // 将第二个参数设置为false,即不自动应答,保证消息处理的可靠性 channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { /*模拟实行使命耗时*/ for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(2000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } }}
复制代码 先运行Worker,开启两个Worker进程,然后运行NewTask,查看控制台输出
第一Worker输出- [x] Received 'First message...'
- [x] Done
- [x] Received 'Fourth message'
- [x] Done
复制代码 第二个Worker输出- [x] Received 'Second message'
- [x] Done
- [x] Received 'Third message...'
- [x] Done
- [x] Received 'Fifth message...'
- [x] Done
复制代码 参考链接:
https://www.rabbitmq.com/tutorials/tutorial-two-java
发布和订阅
上节示例中,我们创建了一个工作队列。工作队列背后的假设是,每个使命只传递给一个工作者。本例将实现向多个消费者传递一个信息。这种模式被称为“发布/订阅”。
场景:
说明:P 代表生产者,X 代表 交换机,Q 代表队列
EmitLog.java- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class EmitLog {
- private static final String EXCHANGE_NAME = "logs";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.88.128");
- factory.setPort(5672);
- factory.setUsername("testacc");
- factory.setPassword("test1234");
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 声明名为 logs,类型为 fanout交换机
- String message = argv.length < 1 ? "info: Hello World!" :
- String.join(" ", argv);
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
- System.out.println(" [x] Sent '" + message + "'");
- }
- }
- }
复制代码 ReceiveLogs.java- import com.rabbitmq.client.*;public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.88.128"); factory.setPort(5672); factory.setUsername("testacc"); factory.setPassword("test1234"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 假设需求是:1,无论何时,消费者连接到RabbitMQ,都需要一个新的空的队列 // 2,端开消费者时在,自动删除队列 String queueName = channel.queueDeclare().getQueue(); // channel.queueDeclare() 定义一个非持久,排他的,自动删除的,名称随机生成且保持唯一的队列 channel.queueBind(queueName, EXCHANGE_NAME, ""); // 绑定队列和交换机,以告知交换机需要发送消息到哪个队列 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }}
复制代码 先运行 ReceiveLogs(开两个进程),再运行 EmitLog(不带参数运行)
EmitLog 运行输出:- [x] Sent 'info: Hello World!'
复制代码 两ReceiveLogs运行控制台输出:- [*] Waiting for messages. To exit press CTRL+C [x] Received 'info: Hello World!'
复制代码 参考链接:
https://www.rabbitmq.com/tutorials/tutorial-three-java
路由
上节示例中,实现了多个消费者订阅所有队列消息,本节示例中,将实现仅订阅消息子集,即订阅部门消息。
直接交换机(Direct exchange)
The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message.
上一节示例中实现了将所有消息广播给所有消费者,本节希望在此基础上,以答应根据消息的严峻性对其进行过滤,实现差别消费者接收差别级别的日记
上节使用的扇出交换机(fanout),没有太多的机动性——它只能进行无意识的广播。所以,本节示例中将使用直接交换机(direct)。直接交换机背后的路由算法很简单——队列的绑定键和消息的路由键完全匹配,则将消息进入到该队列。
为了说明这一点,假设有以下设置:
这里,我们可以看到直接交换机X绑定了两个队列。第一个队列用绑定键orange绑定,第二个队列有两个绑定,一个绑定键black,另一个绑定键 green。
在这种设置下,使用orange 路由键发布到交换机的消息将被路由到队列Q1。使用路由键为black或green的发布的消息件将路由到Q2。所有其他消息都将被丢弃。
多个绑定
使用类似的绑定键绑定多个队列是完全合法的。以下示例中,使用绑定键black在X和Q1之间添加绑定。在这种环境下,direct交换机将表现得像fanout交换机,将消息广播到所有匹配的队列。拥有路由键为black的消息将同时发送到Q1和Q2。
本节示例中实现的场景:
EmitLogDirect.java- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class EmitLogDirect {
- private static final String EXCHANGE_NAME = "direct_logs";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.88.128");
- factory.setPort(5672);
- factory.setUsername("testacc");
- factory.setPassword("test1234");
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
- String severity = getSeverity(argv);
- String message = getMessage(argv);
- channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
- System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
- }
- }
- private static String getSeverity(String[] strings) {
- if (strings.length < 1)
- return "info";
- return strings[0];
- }
- private static String getMessage(String[] strings) {
- if (strings.length < 2)
- return "Hello World!";
- return joinStrings(strings, " ", 1);
- }
- private static String joinStrings(String[] strings, String delimiter, int startIndex) {
- int length = strings.length;
- if (length == 0) return "";
- if (length <= startIndex) return "";
- StringBuilder words = new StringBuilder(strings[startIndex]);
- for (int i = startIndex + 1; i < length; i++) {
- words.append(delimiter).append(strings[i]);
- }
- return words.toString();
- }
- }
复制代码 函数说明:- import com.rabbitmq.client.*;
- public class ReceiveLogsDirect {
- private static final String EXCHANGE_NAME = "direct_logs";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.88.128");
- factory.setPort(5672);
- factory.setUsername("testacc");
- factory.setPassword("test1234");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
- String queueName = channel.queueDeclare().getQueue();
- if (argv.length < 1) {
- System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
- System.exit(1);
- }
- for (String severity : argv) {
- channel.queueBind(queueName, EXCHANGE_NAME, severity); // 第三个参数为绑定键
- }
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
- };
- channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
- });
- }
- }
复制代码 说明:
- var1:队列名称
- var2: 交换机名称
- var3:用于绑定交换机和队列的路由键,为了同basic_publish routingKey参数混淆,称之为 绑定建(bindingKey)
- var4:一些额外参数
先运行ReceiveLogsDirect(开两个进程,分别携带info warning error,warning error参数运行),再运行EmitLogDirect,查看控制台输出。
第一次运行EmitLogDirect时携带以下参数- com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String var1, String var2, String var3) throws IOException;
- com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String var1, String var2, String var3, Map<String, Object> var4) throws IOException;
复制代码 运行ReceiveLogsDirect的两个控制台都输出以下内容- [*] Waiting for messages. To exit press CTRL+C [x] Received 'error':'Run. Run. Or it will explode.'
复制代码 第2次,去掉运行参数,直接运行运行EmitLogDirect,结果仅带info warning error参数运行ReceiveLogsDirect的控制台增加输出以下内容:- [*] Waiting for messages. To exit press CTRL+C
- [x] Received 'error':'Run. Run. Or it will explode.'
复制代码 参考链接:
https://www.rabbitmq.com/tutorials/tutorial-four-java
主题
上节示例中,采用了direct交换机,实现了选择性接收消息,虽然有所改进,单仍然有局限性,不能基于多个标准进行路由,比如纪要根据日记严峻级别来订阅日记,同时还要根据日记消息产生源订阅日记,为此还需要了解更复杂的主题交换机。
主题交换机(Topic exchange)
发送到主题交换机的消息不能是任意的路由键——它必须是一个由点分隔的单词列表。单词可以是任意的,通常是与消息相关的一些特征。几个有用的路由键示例:"stock.usd.nyse"、“nyse.vmw”、“quick.ornge.rabbit”。如你喜欢,路由键可以包含任意多个单词,但是最多不能超过255个字节。
绑定键也必须采用类似的情势。主题交换机背后的逻辑类似于直接交换机——使用特定路由键发送的消息将被传递到使用匹配绑定键绑定的所有队列。但是,对绑定键来说,有两个重要的特殊环境:
- * 可以匹配一个单词。
- # 开匹配0个或更多个单次。
用一个例子来解释这一点:
在这个例子中,我们将发送描述动物的消息。消息将使用由三个单词(两点)构成的路由键进行发送。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:“..”。
我们创建了三个绑定:Q1用绑定键“*.ornge.*”绑定,Q2用“*.*.rabit”和“lazy.#”绑定。
这些绑定可以概括为:
- Q1对所有橙色的动物都感爱好。
- Q2想听听关于兔子的一切,以及关于懒惰动物的一切。
消息路由示例:
- 带有路由键“quick.ornge.robit”、 "lazy.orange.elephant"的消息将会被发送给所有队列。
- 带有路由键“quick.orange.fox”的消息将仅被投放入Q1队列中。
- 带有路由键"lazy.pink.rabbit"的消息的将仅被投放入Q2队列中,且只会放入一次,虽然匹配两个绑定键。
- 带有路由键"quick.brown.fox、 orange、quick.orange.new.rabbit的消息将不会被投放入任何队列中,会被丢弃。
- 带有路由键lazy.orange.new.rabbit消息将被投放入Q2队列中
说明:
主题交换j机器功能强大,可以像其他交换一样运行。
- 当队列使用“#” 绑定键绑定时,它将接收所有消息,而不管路由键如何,就像fanout交换机一样。
- 当绑定中不使用特殊字符“*”和“#”时,主题交换机的行为就像direct交换机一样。
EmitLogTopic- [x] Received 'info':'Hello World!'
复制代码 ReceiveLogsTopic- import com.rabbitmq.client.*;public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.88.128"); factory.setPort(5672); factory.setUsername("testacc"); factory.setPassword("test1234"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }}
复制代码 分别使用以下参数先运行ReceiveLogsTopic(开4个进程),再运行EmitLogTopic(带参数"kern.critical" "A critical kernel error"运行),查看控制台输出。- import com.rabbitmq.client.*;
- public class ReceiveLogsTopic {
- private static final String EXCHANGE_NAME = "topic_logs";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.88.128");
- factory.setPort(5672);
- factory.setUsername("testacc");
- factory.setPassword("test1234");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
- String queueName = channel.queueDeclare().getQueue();
- if (argv.length < 1) {
- System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
- System.exit(1);
- }
- for (String bindingKey : argv) {
- channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
- }
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
- };
- channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
- }
- }
复制代码 结果,运行ReceiveLogsTopic的四个控制台输出:- [*] Waiting for messages. To exit press CTRL+C [x] Received 'kern.critical':'A critical kernel error'
复制代码 运行EmitLogTopic的控制台输出:- [*] Waiting for messages. To exit press CTRL+C
- [x] Received 'kern.critical':'A critical kernel error'
复制代码 参考链接:https://www.rabbitmq.com/tutorials/tutorial-five-java
参考链接
https://github.com/rabbitmq/rabbitmq-tutorials/tree/main/java
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |