RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模式(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、长途调用模式(RPC,不常用,不做讲讲授明)
一、简单模式(Simple)
特点:①一个生产者对应一个消耗者,通过队列举行消息传递。
②该模式使用direct交换机,direct交换机是RabbitMQ默认交换机。
图解:
二、工作队列模式(Work Queue)
与简单模式相比,工作队列模式(Work Queue)多了一些消耗者,该模式也使用direct交换机,应用于处置惩罚消息较多的环境。特点如下:
①一个队列对应多个消耗者。
②一条消息只会被一个消耗者消耗。
③消息队列默认采用轮询的方式将消息均匀发送给消耗者。
图解:
三、发布订阅模式(Publish/Subscribe)
在开发过程中,有一些消息需要不同消耗者举行不同的处置惩罚
特点:
①生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
②工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换性能将消息发送给多个队列。发布订阅模式使用fanout交换机。
图解
四、路由模式(Routing)
使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时间,不是所有消息都无差异的发布到所有队列中。特点:
①每个队列绑定路由关键字RoutingKey
②生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模式使用direct交换机。
图解:
五、 通配符模式(Topics)
通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机。
通配符规则:
①消息设置RoutingKey时,RoutingKey由多个单词构成,中央以.分割。
②队列设置RoutingKey时,#可以匹配任意多个单词,*可以匹配任意一个单词。
六、Java实现五种模式
简单模式生产者
- package com.tmh.mq.simple;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * rabbitmq工作模式:简单模式,消息生产者
- */
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建连接工厂 指定IP地址、端口、连接的用户名密码、连接的虚拟主机
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //2、创建连接
- Connection connection = connectionFactory.newConnection();
- //3、创建信道
- Channel channel = connection.createChannel();
- //4、创建队列
- /**
- * 参数一:队列名称
- * 参数二:是否持久化,true表示当MQ重启后队列还在
- * 参数三:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
- * 参数四:是否自动删除
- * 参数五:其他参数
- */
- channel.queueDeclare("simplqueue", false,false,false,null);
- //5、发送消息
- String messges="hello simple queue";
- /**
- * 参数1:交换机名,""表示默认交换机
- * 参数2:路由键,简单模式就是队列名
- * 参数3:其他额外参数
- * 参数4:要传递的消息字节数组
- */
- channel.basicPublish("","simplqueue",null,messges.getBytes());
- //6、关闭信道和连接
- channel.close();
- connection.close();
- System.out.println("消息发送成功");
- }
- }
复制代码 简单模式消耗者
- package com.tmh.mq.simple;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- //simple队列消息消费者
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建连接工厂 指定IP地址、端口、连接的用户名密码、连接的虚拟主机
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //2、创建连接
- Connection connection = connectionFactory.newConnection();
- //3、创建信道
- Channel channel = connection.createChannel();
- //4、监听队列
- /**
- * 参数1:监听的队列名
- * 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
- * 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
- */
- channel.basicConsume("simplqueue",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String messge = new String(body, "UTF-8");
- System.out.println("接收消息,消息为:"+messge);
- }
- });
- }
- }
复制代码 工作队列模式生产者
- package com.tmh.mq.work;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- //工作队列模式
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建工厂连接
- ConnectionFactory connectionFactory=new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //建立连接
- Connection connection = connectionFactory.newConnection();
- //创建信道
- Channel channel = connection.createChannel();
- //创建队列,如果队列已经存在,则使用该队列
- channel.queueDeclare("workqueue",true,false,false,null);
- //发送大量消息
- for (int i = 0; i <100 ; i++) {
- channel.basicPublish("","workqueue",
- MessageProperties.PERSISTENT_TEXT_PLAIN,
- ("hello 这是今天的第"+(i+1)+"条消息").getBytes());
- }
- //关闭资源
- channel.close();
- connection.close();
- }
- }
复制代码 工作模式消耗者
- package com.tmh.mq.work;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建工厂连接
- ConnectionFactory connectionFactory=new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //建立连接
- Connection connection = connectionFactory.newConnection();
- //创建信道
- Channel channel = connection.createChannel();
- //监听队列
- channel.basicConsume("workqueue",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String s = new String(body, "UTF-8");
- System.out.println("消费者1消费消息:"+s);
- }
- });
- }
- }
- -----------------------------------------------------------------------------------------
- public class Consumer2 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建工厂连接
- ConnectionFactory connectionFactory=new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //建立连接
- Connection connection = connectionFactory.newConnection();
- //创建信道
- Channel channel = connection.createChannel();
- //监听队列
- channel.basicConsume("workqueue",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String s = new String(body, "UTF-8");
- System.out.println("消费者2消费消息:"+s);
- }
- });
- }
- }
- -----------------------------------------------------------------------------------------
- public class Consumer3 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建工厂连接
- ConnectionFactory connectionFactory=new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //建立连接
- Connection connection = connectionFactory.newConnection();
- //创建信道
- Channel channel = connection.createChannel();
- //监听队列
- channel.basicConsume("workqueue",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String s = new String(body, "UTF-8");
- System.out.println("消费者1消费消息:"+s);
- }
- });
- }
- }
复制代码 发布订阅模式生产者
- package com.tmh.mq.publish;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.nio.charset.StandardCharsets;
- import java.util.concurrent.TimeoutException;
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建连接工厂
- final ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //2、建立连接
- final Connection connection = connectionFactory.newConnection();
- //3、建立信道
- final Channel channel = connection.createChannel();
- //4、创建交换机
- /**
- * 参数一:交换机名
- * 参数二:交换机类型
- * 参数三:交换机持久化
- */
- channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);
- //5、创建队列
- channel.queueDeclare("MAIL_QUEUE", true,false,false,null);
- channel.queueDeclare("MESSAGE_QUEUE", true,false,false,null);
- channel.queueDeclare("STATION_QUEUE", true,false,false,null);
- //6、队列绑定交换机
- /**
- * 参数一:队列名
- * 参数二:交换机名字
- * 参数三:路由关键字,发布订阅模式写“”即可
- */
- channel.queueBind("MAIL_QUEUE","exchange_fanout","");
- channel.queueBind("MESSAGE_QUEUE","exchange_fanout","");
- channel.queueBind("STATION_QUEUE","exchange_fanout","");
- //7、发送消息
- for (int i = 1; i <=10 ; i++) {
- channel.basicPublish("exchange_fanout","",null,
- ("你好,发布订阅模式"+i).getBytes(StandardCharsets.UTF_8));
- }
- //8、关闭资源
- channel.close();
- connection.close();
- }
- }
复制代码 发布订阅模式消耗者
- package com.tmh.mq.publish;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class ConsumerMail {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建连接工厂
- final ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //2、建立连接
- final Connection connection = connectionFactory.newConnection();
- //3、建立信道
- final Channel channel = connection.createChannel();
- //4、监听队列
- channel.basicConsume("MAIL_QUEUE",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- final String s = new String(body, "UTF-8");
- System.out.println("邮件消息:"+s);
- }
- });
- }
- }
- -----------------------------------------------------------------------------------------
- public class ConsumerMessage {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建连接工厂
- final ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //2、建立连接
- final Connection connection = connectionFactory.newConnection();
- //3、建立信道
- final Channel channel = connection.createChannel();
- //4、监听队列
- channel.basicConsume("MESSAGE_QUEUE",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- final String s = new String(body, "UTF-8");
- System.out.println("短信消息:"+s);
- }
- });
- }
- }
- -----------------------------------------------------------------------------------------
- public class ConsumerStation {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建连接工厂
- final ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //2、建立连接
- final Connection connection = connectionFactory.newConnection();
- //3、建立信道
- final Channel channel = connection.createChannel();
- //4、监听队列
- channel.basicConsume("STATION_QUEUE",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- final String s = new String(body, "UTF-8");
- System.out.println("站内消息:"+s);
- }
- });
- }
- }
复制代码
路由模式生产者
- package com.tmh.mq.routing;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.nio.charset.StandardCharsets;
- import java.util.concurrent.TimeoutException;
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建连接工厂
- final ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //2、建立连接
- final Connection connection = connectionFactory.newConnection();
- //3、建立信道
- final Channel channel = connection.createChannel();
- //4、创建交换机
- /**
- * 参数一:交换机名
- * 参数二:交换机类型
- * 参数三:交换机持久化
- */
- channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);
- //5、创建队列
- channel.queueDeclare("MAIL_QUEUE2", true,false,false,null);
- channel.queueDeclare("MESSAGE_QUEUE2", true,false,false,null);
- channel.queueDeclare("STATION_QUEUE2", true,false,false,null);
- //6、队列绑定交换机
- /**
- * 参数一:队列名
- * 参数二:交换机名字
- * 参数三:路由关键字,发布订阅模式写“”即可
- */
- channel.queueBind("MAIL_QUEUE2","exchange_routing","important");
- channel.queueBind("MESSAGE_QUEUE2","exchange_routing","important");
- channel.queueBind("STATION_QUEUE2","exchange_routing","important");
- channel.queueBind("STATION_QUEUE2","exchange_routing","normal");
- //7、发送消息
- channel.basicPublish("exchange_routing","important",null,
- ("你好,路由模式,这是重要消息").getBytes(StandardCharsets.UTF_8));
- channel.basicPublish("exchange_routing","normal",null,
- ("你好,路由模式,这是不太重要的消息").getBytes(StandardCharsets.UTF_8));
- //8、关闭资源
- channel.close();
- connection.close();
- }
- }
复制代码 路由模式消耗者
- package com.tmh.mq.routing;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class ConsumerMail {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建连接工厂
- final ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //2、建立连接
- final Connection connection = connectionFactory.newConnection();
- //3、建立信道
- final Channel channel = connection.createChannel();
- //4、监听队列
- channel.basicConsume("MAIL_QUEUE2",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- final String s = new String(body, "UTF-8");
- System.out.println("邮件消息:"+s);
- }
- });
- }
- }
- -----------------------------------------------------------------------------------------
- public class ConsumerMessage {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建连接工厂
- final ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //2、建立连接
- final Connection connection = connectionFactory.newConnection();
- //3、建立信道
- final Channel channel = connection.createChannel();
- //4、监听队列
- channel.basicConsume("MESSAGE_QUEUE2",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- final String s = new String(body, "UTF-8");
- System.out.println("短信消息:"+s);
- }
- });
- }
- }
- -----------------------------------------------------------------------------------------
- public class ConsumerStation {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建连接工厂
- final ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //2、建立连接
- final Connection connection = connectionFactory.newConnection();
- //3、建立信道
- final Channel channel = connection.createChannel();
- //4、监听队列
- channel.basicConsume("STATION_QUEUE2",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- final String s = new String(body, "UTF-8");
- System.out.println("站内消息:"+s);
- }
- });
- }
- }
复制代码 通配符模式 生产者
- package com.tmh.mq.topic;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.nio.charset.StandardCharsets;
- import java.util.concurrent.TimeoutException;
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建连接工厂
- final ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //2、建立连接
- final Connection connection = connectionFactory.newConnection();
- //3、建立信道
- final Channel channel = connection.createChannel();
- //4、创建交换机
- /**
- * 参数一:交换机名
- * 参数二:交换机类型
- * 参数三:交换机持久化
- */
- channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);
- //5、创建队列
- channel.queueDeclare("MAIL_QUEUE3", true,false,false,null);
- channel.queueDeclare("MESSAGE_QUEUE3", true,false,false,null);
- channel.queueDeclare("STATION_QUEUE3", true,false,false,null);
- //6、队列绑定交换机
- /**
- * 参数一:队列名
- * 参数二:交换机名字
- * 参数三:路由关键字,发布订阅模式写“”即可
- */
- channel.queueBind("MAIL_QUEUE3","exchange_topic","#.mail.#");
- channel.queueBind("MESSAGE_QUEUE3","exchange_topic","#.message.#");
- channel.queueBind("STATION_QUEUE3","exchange_topic","#.station.#");
- //7、发送消息
- channel.basicPublish("exchange_topic","mail.message.station",null,
- ("你好,通配符模式,这是重要消息").getBytes(StandardCharsets.UTF_8));
- channel.basicPublish("exchange_topic","station",null,
- ("你好,通配符模式,这是不太重要的消息").getBytes(StandardCharsets.UTF_8));
- }
- }
复制代码 通配符模式消耗者
- package com.tmh.mq.topic;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class ConsumerMail {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建连接工厂
- final ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //2、建立连接
- final Connection connection = connectionFactory.newConnection();
- //3、建立信道
- final Channel channel = connection.createChannel();
- //4、监听队列
- channel.basicConsume("MAIL_QUEUE3",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- final String s = new String(body, "UTF-8");
- System.out.println("邮件消息:"+s);
- }
- });
- }
- }
- -----------------------------------------------------------------------------------------
- public class ConsumerMessage {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建连接工厂
- final ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //2、建立连接
- final Connection connection = connectionFactory.newConnection();
- //3、建立信道
- final Channel channel = connection.createChannel();
- //4、监听队列
- channel.basicConsume("MESSAGE_QUEUE3",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- final String s = new String(body, "UTF-8");
- System.out.println("短信消息:"+s);
- }
- });
- }
- }
- -----------------------------------------------------------------------------------------
- public class ConsumerStation {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建连接工厂
- final ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.66.8");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("tangminghao");
- connectionFactory.setPassword("tangminghao");
- connectionFactory.setVirtualHost("/");
- //2、建立连接
- final Connection connection = connectionFactory.newConnection();
- //3、建立信道
- final Channel channel = connection.createChannel();
- //4、监听队列
- channel.basicConsume("STATION_QUEUE3",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- final String s = new String(body, "UTF-8");
- System.out.println("站内消息:"+s);
- }
- });
- }
- }
复制代码 在Java实现中有很多注释没有改过来,由于很多模式之间变化不大,所以就复制粘贴了 |