RabbitMQ

打印 上一主题 下一主题

主题 1016|帖子 1016|积分 3048

RabbitMQ























Messaging that just works — RabbitMQ
案例

pom.xml
  1. <dependencies>
  2.         <dependency>
  3.             <groupId>com.rabbitmq</groupId>
  4.             <artifactId>amqp-client</artifactId>
  5.             <version>5.6.0</version>
  6.         </dependency>
  7.     </dependencies>
  8.     <build>
  9.         <plugins>
  10.             <plugin>
  11.                 <groupId>org.apache.maven.plugins</groupId>
  12.                 <artifactId>maven-compiler-plugin</artifactId>
  13.                 <version>3.8.0</version>
  14.                 <configuration>
  15.                     <source>1.8</source>
  16.                     <target>1.8</target>
  17.                 </configuration>
  18.             </plugin>
  19.         </plugins>
  20.     </build>
复制代码
生产者
  1. package com.www.mq;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 生产者:发送消息
  9. *
  10. * @author Www
  11. * @version 1.8
  12. * @since 2023/2/15  20:41 星期三
  13. */
  14. public class ProducerHelloWorld {
  15.    
  16.     public static void main(String[] args) throws IOException, TimeoutException {
  17.         // 1、创建连接工厂
  18.         ConnectionFactory connectionFactory = new ConnectionFactory();
  19.         // 2、设置参数
  20.         // IP 默认值 localhost
  21.         connectionFactory.setHost("192.168.36.100");
  22.         // 端口 默认 5672
  23.         connectionFactory.setPort(5672);
  24.         // 虚拟机 默认值 /
  25.         connectionFactory.setVirtualHost("/ljt");
  26.         // 用户名 默认值 guest
  27.         connectionFactory.setUsername("ljt");
  28.         // 密码 默认值 guest
  29.         connectionFactory.setPassword("ljt");
  30.         
  31.         
  32.         // 3、创建连接 Connection : 受检异常——> 抛出
  33.         Connection connection = connectionFactory.newConnection();
  34.         
  35.         // 4、创建通道 Channel
  36.         Channel channel = connection.createChannel();
  37.         
  38.         // 5、创建队列 Queue
  39.         /*
  40.          queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
  41.          参数:
  42.              1. queue:队列名称
  43.              2. durable:是否持久化,当mq重启之后,还在
  44.              3. exclusive:
  45.                  * 是否独占。只能有一个消费者监听这队列
  46.                  * 当Connection关闭时,是否删除队列
  47.                  *
  48.              4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
  49.              5. arguments:参数。
  50.          */
  51.         // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
  52.         channel.queueDeclare(
  53.                 // 队列名称
  54.                 "hello_world",
  55.                 // 是否持久化,当mq重启之后,还在
  56.                 true,
  57.                 // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
  58.                 false,
  59.                 // 是否自动删除。当没有Consumer时,自动删除掉
  60.                 false,
  61.                 // 参数
  62.                 null
  63.         );
  64.         
  65.          /*
  66.              basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
  67.             参数:
  68.                 1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
  69.                 2. routingKey:路由名称
  70.                 3. props:配置信息
  71.                 4. body:发送消息数据
  72.          */
  73.         // 发送消息内容
  74.         String body = "HelloWorld~~~~~~~~~~";
  75.         
  76.         // 6、发送消息
  77.         channel.basicPublish(
  78.                 // 交换机名称。简单模式下交换机会使用默认的 ""
  79.                 "",
  80.                 // 路由名称
  81.                 "hello_world",
  82.                 // 配置信息
  83.                 null,
  84.                 // 发送消息数据
  85.                 body.getBytes()
  86.         );
  87.         
  88.         // 释放资源
  89.         channel.close();
  90.         connection.close();
  91.         
  92.     }
  93. }
复制代码
消费者
  1. package com.www.mq;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * 消费者:获取消息
  7. *
  8. * @author Www
  9. * @version 1.8
  10. * @since 2023/2/15  20:41 星期三
  11. */
  12. public class ConsumerHelloWorld {
  13.    
  14.     public static void main(String[] args) throws IOException, TimeoutException {
  15.         // 1、创建连接工厂
  16.         ConnectionFactory connectionFactory = new ConnectionFactory();
  17.         // 2、设置参数
  18.         // IP 默认值 localhost
  19.         connectionFactory.setHost("192.168.36.100");
  20.         // 端口 默认 5672
  21.         connectionFactory.setPort(5672);
  22.         // 虚拟机 默认值 /
  23.         connectionFactory.setVirtualHost("/ljt");
  24.         // 用户名 默认值 guest
  25.         connectionFactory.setUsername("ljt");
  26.         // 密码 默认值 guest
  27.         connectionFactory.setPassword("ljt");
  28.         
  29.         
  30.         // 3、创建连接 Connection : 受检异常——> 抛出
  31.         Connection connection = connectionFactory.newConnection();
  32.         
  33.         // 4、创建通道 Channel
  34.         Channel channel = connection.createChannel();
  35.         
  36.         // 5、创建队列 Queue
  37.         /*
  38.          queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
  39.          参数:
  40.              1. queue:队列名称
  41.              2. durable:是否持久化,当mq重启之后,还在
  42.              3. exclusive:
  43.                  * 是否独占。只能有一个消费者监听这队列
  44.                  * 当Connection关闭时,是否删除队列
  45.                  *
  46.              4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
  47.              5. arguments:参数。
  48.          */
  49.         // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
  50.         channel.queueDeclare(
  51.                 // 队列名称
  52.                 "hello_world",
  53.                 // 是否持久化,当mq重启之后,还在
  54.                 true,
  55.                 // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
  56.                 false,
  57.                 // 是否自动删除。当没有Consumer时,自动删除掉
  58.                 false,
  59.                 // 参数
  60.                 null);
  61.         
  62.          /*
  63.             basicConsume(String queue, boolean autoAck, Consumer callback)
  64.             参数:
  65.                 1. queue:队列名称
  66.                 2. autoAck:是否自动确认
  67.                 3. callback:回调对象
  68.          */
  69.         // 6、接收消息
  70.         Consumer consumer = new DefaultConsumer(channel) {
  71.             /**
  72.              * <p>
  73.              *   回调方法,当收到消息后,会自动执行该方法
  74.              *      1. consumerTag:标识
  75.              *      2. envelope:获取一些信息,交换机,路由key...
  76.              *      3. properties:配置信息
  77.              *      4. body:数据
  78.              * </p>
  79.              */
  80.             @Override
  81.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  82.                 System.out.println("consumerTag:" + consumerTag);
  83.                 System.out.println("Exchange:" + envelope.getExchange());
  84.                 System.out.println("RoutingKey:" + envelope.getRoutingKey());
  85.                 System.out.println("properties:" + properties);
  86.                 System.out.println("body:" + new String(body));
  87.             }
  88.         };
  89.         channel.basicConsume(
  90.                 // 队列名称
  91.                 "hello_world",
  92.                 // 是否自动确认
  93.                 true,
  94.                 // 回调对象
  95.                 consumer
  96.         );
  97.         
  98.         // 不需要关闭资源
  99.     }
  100. }
复制代码

工作队列


生产者
  1. package com.www.mq;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 生产者:发送消息
  9. *
  10. * @author Www
  11. * @version 1.8
  12. * @since 2023/2/15  20:41 星期三
  13. */
  14. public class ProducerWorkQueues {
  15.    
  16.     public static void main(String[] args) throws IOException, TimeoutException {
  17.         // 1、创建连接工厂
  18.         ConnectionFactory connectionFactory = new ConnectionFactory();
  19.         // 2、设置参数
  20.         // IP 默认值 localhost
  21.         connectionFactory.setHost("192.168.36.100");
  22.         // 端口 默认 5672
  23.         connectionFactory.setPort(5672);
  24.         // 虚拟机 默认值 /
  25.         connectionFactory.setVirtualHost("/ljt");
  26.         // 用户名 默认值 guest
  27.         connectionFactory.setUsername("ljt");
  28.         // 密码 默认值 guest
  29.         connectionFactory.setPassword("ljt");
  30.         
  31.         
  32.         // 3、创建连接 Connection : 受检异常——> 抛出
  33.         Connection connection = connectionFactory.newConnection();
  34.         
  35.         // 4、创建通道 Channel
  36.         Channel channel = connection.createChannel();
  37.         
  38.         // 5、创建队列 Queue
  39.         /*
  40.          queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
  41.          参数:
  42.              1. queue:队列名称
  43.              2. durable:是否持久化,当mq重启之后,还在
  44.              3. exclusive:
  45.                  * 是否独占。只能有一个消费者监听这队列
  46.                  * 当Connection关闭时,是否删除队列
  47.                  *
  48.              4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
  49.              5. arguments:参数。
  50.          */
  51.         // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
  52.         channel.queueDeclare(
  53.                 // 队列名称
  54.                 "WorkQueue",
  55.                 // 是否持久化,当mq重启之后,还在
  56.                 true,
  57.                 // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
  58.                 false,
  59.                 // 是否自动删除。当没有Consumer时,自动删除掉
  60.                 false,
  61.                 // 参数
  62.                 null
  63.         );
  64.         
  65.          /*
  66.              basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
  67.             参数:
  68.                 1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
  69.                 2. routingKey:路由名称
  70.                 3. props:配置信息
  71.                 4. body:发送消息数据
  72.          */
  73.         for (int i = 0; i < 10; i++) {
  74.             // 发送消息内容
  75.             String body = "WorkQueue~~~~~~~~~~" + i;
  76.             
  77.             // 6、发送消息
  78.             channel.basicPublish(
  79.                     // 交换机名称。简单模式下交换机会使用默认的 ""
  80.                     "",
  81.                     // 路由名称
  82.                     "WorkQueue",
  83.                     // 配置信息
  84.                     null,
  85.                     // 发送消息数据
  86.                     body.getBytes()
  87.             );
  88.         }
  89.         
  90.         
  91.         // 释放资源
  92.         channel.close();
  93.         connection.close();
  94.         
  95.     }
  96. }
复制代码
消费者: 两个
  1. package com.www.mq;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * 消费者:获取消息
  7. *
  8. * @author Www
  9. * @version 1.8
  10. * @since 2023/2/15  20:41 星期三
  11. */
  12. public class ConsumerWorkQueues1 {
  13.    
  14.     public static void main(String[] args) throws IOException, TimeoutException {
  15.         // 1、创建连接工厂
  16.         ConnectionFactory connectionFactory = new ConnectionFactory();
  17.         // 2、设置参数
  18.         // IP 默认值 localhost
  19.         connectionFactory.setHost("192.168.36.100");
  20.         // 端口 默认 5672
  21.         connectionFactory.setPort(5672);
  22.         // 虚拟机 默认值 /
  23.         connectionFactory.setVirtualHost("/ljt");
  24.         // 用户名 默认值 guest
  25.         connectionFactory.setUsername("ljt");
  26.         // 密码 默认值 guest
  27.         connectionFactory.setPassword("ljt");
  28.         
  29.         
  30.         // 3、创建连接 Connection : 受检异常——> 抛出
  31.         Connection connection = connectionFactory.newConnection();
  32.         
  33.         // 4、创建通道 Channel
  34.         Channel channel = connection.createChannel();
  35.         
  36.         // 5、创建队列 Queue
  37.         /*
  38.          queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
  39.          参数:
  40.              1. queue:队列名称
  41.              2. durable:是否持久化,当mq重启之后,还在
  42.              3. exclusive:
  43.                  * 是否独占。只能有一个消费者监听这队列
  44.                  * 当Connection关闭时,是否删除队列
  45.                  *
  46.              4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
  47.              5. arguments:参数。
  48.          */
  49.         // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
  50.         channel.queueDeclare(
  51.                 // 队列名称
  52.                 "WorkQueue",
  53.                 // 是否持久化,当mq重启之后,还在
  54.                 true,
  55.                 // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
  56.                 false,
  57.                 // 是否自动删除。当没有Consumer时,自动删除掉
  58.                 false,
  59.                 // 参数
  60.                 null);
  61.         
  62.          /*
  63.             basicConsume(String queue, boolean autoAck, Consumer callback)
  64.             参数:
  65.                 1. queue:队列名称
  66.                 2. autoAck:是否自动确认
  67.                 3. callback:回调对象
  68.          */
  69.         // 6、接收消息
  70.         Consumer consumer = new DefaultConsumer(channel) {
  71.             /**
  72.              * <p>
  73.              *   回调方法,当收到消息后,会自动执行该方法
  74.              *      1. consumerTag:标识
  75.              *      2. envelope:获取一些信息,交换机,路由key...
  76.              *      3. properties:配置信息
  77.              *      4. body:数据
  78.              * </p>
  79.              */
  80.             @Override
  81.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  82.                 System.out.println("consumerTag:" + consumerTag);
  83.                 System.out.println("Exchange:" + envelope.getExchange());
  84.                 System.out.println("RoutingKey:" + envelope.getRoutingKey());
  85.                 System.out.println("properties:" + properties);
  86.                 System.out.println("body:" + new String(body));
  87.             }
  88.         };
  89.         channel.basicConsume(
  90.                 // 队列名称
  91.                 "WorkQueue",
  92.                 // 是否自动确认
  93.                 true,
  94.                 // 回调对象
  95.                 consumer
  96.         );
  97.         
  98.         // 不需要关闭资源
  99.     }
  100. }
复制代码
订阅模式


生产者
  1. package com.www.mq;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /**
  9. * 生产者:发送消息
  10. *
  11. * @author Www
  12. * @version 1.8
  13. * @since 2023/2/15  20:41 星期三
  14. */
  15. public class ProducerPubSub {
  16.    
  17.     public static void main(String[] args) throws IOException, TimeoutException {
  18.         // 1、创建连接工厂
  19.         ConnectionFactory connectionFactory = new ConnectionFactory();
  20.         // 2、设置参数
  21.         // IP 默认值 localhost
  22.         connectionFactory.setHost("192.168.36.100");
  23.         // 端口 默认 5672
  24.         connectionFactory.setPort(5672);
  25.         // 虚拟机 默认值 /
  26.         connectionFactory.setVirtualHost("/ljt");
  27.         // 用户名 默认值 guest
  28.         connectionFactory.setUsername("ljt");
  29.         // 密码 默认值 guest
  30.         connectionFactory.setPassword("ljt");
  31.         
  32.         
  33.         // 3、创建连接 Connection : 受检异常——> 抛出
  34.         Connection connection = connectionFactory.newConnection();
  35.         
  36.         // 4、创建通道 Channel
  37.         Channel channel = connection.createChannel();
  38.         
  39.         // 5、创建交换机
  40.         /*
  41.            exchangeDeclare(
  42.             String exchange,BuiltinExchangeType type,
  43.             boolean durable, boolean autoDelete,
  44.             boolean internal, Map<String, Object> arguments
  45.             )
  46.            参数:
  47.             1. exchange:交换机名称
  48.             2. type:交换机类型
  49.                 DIRECT("direct"),:定向
  50.                 FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
  51.                 TOPIC("topic"),通配符的方式
  52.                 HEADERS("headers");参数匹配
  53.             3. durable:是否持久化
  54.             4. autoDelete:自动删除
  55.             5. internal:内部使用。 一般false
  56.             6. arguments:参数
  57.         */
  58.         String exchangeName = "test_fanout";
  59.         channel.exchangeDeclare(
  60.                 // 交换机名称
  61.                 exchangeName,
  62.                 // type:交换机类型
  63.                 //   DIRECT("direct"),:定向
  64.                 //   FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
  65.                 //   TOPIC("topic"),通配符的方式
  66.                 //   HEADERS("headers");参数匹配
  67.                 BuiltinExchangeType.FANOUT,
  68.                 // 是否持久化
  69.                 true,
  70.                 // 内部使用
  71.                 false,
  72.                 // 参数
  73.                 null
  74.         
  75.         );
  76.         
  77.         // 6、创建队列
  78.         String queue1Name = "test_fanout_queue1";
  79.         String queue2Name = "test_fanout_queue2";
  80.         channel.queueDeclare(
  81.                 // 队列名
  82.                 queue1Name,
  83.                 // 是否持久
  84.                 true,
  85.                 // 是否独占
  86.                 false,
  87.                 //是否自动删除
  88.                 false,
  89.                 // 参数
  90.                 null
  91.         );
  92.         channel.queueDeclare(
  93.                 // 队列名
  94.                 queue2Name,
  95.                 // 是否持久
  96.                 true,
  97.                 // 是否独占
  98.                 false,
  99.                 //是否自动删除
  100.                 false,
  101.                 // 参数
  102.                 null
  103.         );
  104.         
  105.         // 7、绑定队列和交换机
  106.          /*
  107.             queueBind(String queue, String exchange, String routingKey)
  108.             参数:
  109.                 1. queue:队列名称
  110.                 2. exchange:交换机名称
  111.                 3. routingKey:路由键,绑定规则
  112.                     如果交换机的类型为fanout ,routingKey设置为""
  113.          */
  114.         channel.queueBind(
  115.                 // 队列名
  116.                 queue1Name,
  117.                 // 交换机名
  118.                 exchangeName,
  119.                 //  routingKey:路由键,绑定规则
  120.                 //    如果交换机的类型为fanout ,routingKey设置为""
  121.                 ""
  122.         );
  123.         channel.queueBind(
  124.                 // 队列名
  125.                 queue2Name,
  126.                 // 交换机名
  127.                 exchangeName,
  128.                 //  routingKey:路由键,绑定规则
  129.                 //    如果交换机的类型为fanout ,routingKey设置为""
  130.                 ""
  131.         );
  132.         
  133.          /*
  134.              basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
  135.             参数:
  136.                 1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
  137.                 2. routingKey:路由名称
  138.                 3. props:配置信息
  139.                 4. body:发送消息数据
  140.          */
  141.         // 发送消息内容
  142.         String body = "日志信息:张三调用了findAll方法...日志级别:info...";
  143.         // 8、发送消息
  144.         channel.basicPublish(
  145.                 // 交换机名称。简单模式下交换机会使用默认的 ""
  146.                 exchangeName,
  147.                 // 路由名称 :如果交换机的类型为fanout ,routingKey设置为""
  148.                 "",
  149.                 // 配置信息
  150.                 null,
  151.                 // 发送消息数据
  152.                 body.getBytes()
  153.         );
  154.         
  155.         
  156.         // 9、释放资源
  157.         channel.close();
  158.         connection.close();
  159.         
  160.     }
  161. }
复制代码
消费者1
  1. package com.www.mq;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * 消费者:获取消息
  7. *
  8. * @author Www
  9. * @version 1.8
  10. * @since 2023/2/15  20:41 星期三
  11. */
  12. public class ConsumerPubSub1 {
  13.    
  14.     public static void main(String[] args) throws IOException, TimeoutException {
  15.         // 1、创建连接工厂
  16.         ConnectionFactory connectionFactory = new ConnectionFactory();
  17.         // 2、设置参数
  18.         // IP 默认值 localhost
  19.         connectionFactory.setHost("192.168.36.100");
  20.         // 端口 默认 5672
  21.         connectionFactory.setPort(5672);
  22.         // 虚拟机 默认值 /
  23.         connectionFactory.setVirtualHost("/ljt");
  24.         // 用户名 默认值 guest
  25.         connectionFactory.setUsername("ljt");
  26.         // 密码 默认值 guest
  27.         connectionFactory.setPassword("ljt");
  28.         
  29.         
  30.         // 3、创建连接 Connection : 受检异常——> 抛出
  31.         Connection connection = connectionFactory.newConnection();
  32.         
  33.         // 4、创建通道 Channel
  34.         Channel channel = connection.createChannel();
  35.         
  36.         // 队列名
  37.         String queue1Name = "test_fanout_queue1";
  38.       
  39.         
  40.          /*
  41.             basicConsume(String queue, boolean autoAck, Consumer callback)
  42.             参数:
  43.                 1. queue:队列名称
  44.                 2. autoAck:是否自动确认
  45.                 3. callback:回调对象
  46.          */
  47.         // 5、接收消息
  48.         Consumer consumer = new DefaultConsumer(channel) {
  49.             /**
  50.              * <p>
  51.              *   回调方法,当收到消息后,会自动执行该方法
  52.              *      1. consumerTag:标识
  53.              *      2. envelope:获取一些信息,交换机,路由key...
  54.              *      3. properties:配置信息
  55.              *      4. body:数据
  56.              * </p>
  57.              */
  58.             @Override
  59.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  60.                 System.out.println("consumerTag:" + consumerTag);
  61.                 System.out.println("Exchange:" + envelope.getExchange());
  62.                 System.out.println("RoutingKey:" + envelope.getRoutingKey());
  63.                 System.out.println("properties:" + properties);
  64.                 System.out.println("body:" + new String(body));
  65.             }
  66.         };
  67.         channel.basicConsume(
  68.                 // 队列名称
  69.                 queue1Name,
  70.                 // 是否自动确认
  71.                 true,
  72.                 // 回调对象
  73.                 consumer
  74.         );
  75.         
  76.         // 不需要关闭资源
  77.     }
  78. }
复制代码
消费者2
  1. package com.www.mq;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * 消费者:获取消息
  7. *
  8. * @author Www
  9. * @version 1.8
  10. * @since 2023/2/15  20:41 星期三
  11. */
  12. public class ConsumerPubSub2 {
  13.    
  14.     public static void main(String[] args) throws IOException, TimeoutException {
  15.         // 1、创建连接工厂
  16.         ConnectionFactory connectionFactory = new ConnectionFactory();
  17.         // 2、设置参数
  18.         // IP 默认值 localhost
  19.         connectionFactory.setHost("192.168.36.100");
  20.         // 端口 默认 5672
  21.         connectionFactory.setPort(5672);
  22.         // 虚拟机 默认值 /
  23.         connectionFactory.setVirtualHost("/ljt");
  24.         // 用户名 默认值 guest
  25.         connectionFactory.setUsername("ljt");
  26.         // 密码 默认值 guest
  27.         connectionFactory.setPassword("ljt");
  28.         
  29.         
  30.         // 3、创建连接 Connection : 受检异常——> 抛出
  31.         Connection connection = connectionFactory.newConnection();
  32.         
  33.         // 4、创建通道 Channel
  34.         Channel channel = connection.createChannel();
  35.         
  36.         // 队列名
  37.         String queue2Name = "test_fanout_queue2";
  38.       
  39.         
  40.          /*
  41.             basicConsume(String queue, boolean autoAck, Consumer callback)
  42.             参数:
  43.                 1. queue:队列名称
  44.                 2. autoAck:是否自动确认
  45.                 3. callback:回调对象
  46.          */
  47.         // 5、接收消息
  48.         Consumer consumer = new DefaultConsumer(channel) {
  49.             /**
  50.              * <p>
  51.              *   回调方法,当收到消息后,会自动执行该方法
  52.              *      1. consumerTag:标识
  53.              *      2. envelope:获取一些信息,交换机,路由key...
  54.              *      3. properties:配置信息
  55.              *      4. body:数据
  56.              * </p>
  57.              */
  58.             @Override
  59.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  60.                 System.out.println("consumerTag:" + consumerTag);
  61.                 System.out.println("Exchange:" + envelope.getExchange());
  62.                 System.out.println("RoutingKey:" + envelope.getRoutingKey());
  63.                 System.out.println("properties:" + properties);
  64.                 System.out.println("body:" + new String(body));
  65.             }
  66.         };
  67.         channel.basicConsume(
  68.                 // 队列名称
  69.                 queue2Name,
  70.                 // 是否自动确认
  71.                 true,
  72.                 // 回调对象
  73.                 consumer
  74.         );
  75.         
  76.         // 不需要关闭资源
  77.     }
  78. }
复制代码
Routing 路由模式



生产者
  1. package com.www.mq;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /**
  9. * Routing 工作模式
  10. * <p>
  11. * 生产者:发送消息
  12. *
  13. * @author Www
  14. * @version 1.8
  15. * @since 2023/2/15  20:41 星期三
  16. */
  17. public class ProducerRouting {
  18.    
  19.     public static void main(String[] args) throws IOException, TimeoutException {
  20.         // 1、创建连接工厂
  21.         ConnectionFactory connectionFactory = new ConnectionFactory();
  22.         // 2、设置参数
  23.         // IP 默认值 localhost
  24.         connectionFactory.setHost("192.168.36.100");
  25.         // 端口 默认 5672
  26.         connectionFactory.setPort(5672);
  27.         // 虚拟机 默认值 /
  28.         connectionFactory.setVirtualHost("/ljt");
  29.         // 用户名 默认值 guest
  30.         connectionFactory.setUsername("ljt");
  31.         // 密码 默认值 guest
  32.         connectionFactory.setPassword("ljt");
  33.         
  34.         
  35.         // 3、创建连接 Connection : 受检异常——> 抛出
  36.         Connection connection = connectionFactory.newConnection();
  37.         
  38.         // 4、创建通道 Channel
  39.         Channel channel = connection.createChannel();
  40.         
  41.         // 5、创建交换机
  42.         /*
  43.            exchangeDeclare(
  44.             String exchange,BuiltinExchangeType type,
  45.             boolean durable, boolean autoDelete,
  46.             boolean internal, Map<String, Object> arguments
  47.             )
  48.            参数:
  49.             1. exchange:交换机名称
  50.             2. type:交换机类型
  51.                 DIRECT("direct"),:定向
  52.                 FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
  53.                 TOPIC("topic"),通配符的方式
  54.                 HEADERS("headers");参数匹配
  55.             3. durable:是否持久化
  56.             4. autoDelete:自动删除
  57.             5. internal:内部使用。 一般false
  58.             6. arguments:参数
  59.         */
  60.         String exchangeName = "test_direct";
  61.         channel.exchangeDeclare(
  62.                 // 交换机名称
  63.                 exchangeName,
  64.                 // type:交换机类型
  65.                 //   DIRECT("direct"),:定向
  66.                 //   FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
  67.                 //   TOPIC("topic"),通配符的方式
  68.                 //   HEADERS("headers");参数匹配
  69.                 BuiltinExchangeType.DIRECT,
  70.                 // 是否持久化
  71.                 true,
  72.                 // 内部使用
  73.                 false,
  74.                 // 参数
  75.                 null
  76.         
  77.         );
  78.         
  79.         // 6、创建队列
  80.         String queue1Name = "test_direct_queue1";
  81.         String queue2Name = "test_direct_queue2";
  82.         channel.queueDeclare(
  83.                 // 队列名
  84.                 queue1Name,
  85.                 // 是否持久
  86.                 true,
  87.                 // 是否独占
  88.                 false,
  89.                 //是否自动删除
  90.                 false,
  91.                 // 参数
  92.                 null
  93.         );
  94.         channel.queueDeclare(
  95.                 // 队列名
  96.                 queue2Name,
  97.                 // 是否持久
  98.                 true,
  99.                 // 是否独占
  100.                 false,
  101.                 //是否自动删除
  102.                 false,
  103.                 // 参数
  104.                 null
  105.         );
  106.         
  107.         // 7、绑定队列和交换机
  108.          /*
  109.             queueBind(String queue, String exchange, String routingKey)
  110.             参数:
  111.                 1. queue:队列名称
  112.                 2. exchange:交换机名称
  113.                 3. routingKey:路由键,绑定规则
  114.                     如果交换机的类型为fanout ,routingKey设置为""
  115.          */
  116.         // 队列1
  117.         channel.queueBind(
  118.                 // 队列名
  119.                 queue1Name,
  120.                 // 交换机名
  121.                 exchangeName,
  122.                 //  routingKey:路由键,绑定规则
  123.                 //    如果交换机的类型为fanout ,routingKey设置为""
  124.                 "error"
  125.         );
  126.         // 队列2
  127.         channel.queueBind(
  128.                 // 队列名
  129.                 queue2Name,
  130.                 // 交换机名
  131.                 exchangeName,
  132.                 //  routingKey:路由键,绑定规则
  133.                 //    如果交换机的类型为fanout ,routingKey设置为""
  134.                 "info"
  135.         );
  136.         channel.queueBind(
  137.                 // 队列名
  138.                 queue2Name,
  139.                 // 交换机名
  140.                 exchangeName,
  141.                 //  routingKey:路由键,绑定规则
  142.                 //    如果交换机的类型为fanout ,routingKey设置为""
  143.                 "error"
  144.         );
  145.         channel.queueBind(
  146.                 // 队列名
  147.                 queue2Name,
  148.                 // 交换机名
  149.                 exchangeName,
  150.                 //  routingKey:路由键,绑定规则
  151.                 //    如果交换机的类型为fanout ,routingKey设置为""
  152.                 "waring"
  153.         );
  154.         
  155.          /*
  156.              basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
  157.             参数:
  158.                 1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
  159.                 2. routingKey:路由名称
  160.                 3. props:配置信息
  161.                 4. body:发送消息数据
  162.          */
  163.         // 发送消息内容
  164.         String body = "日志信息:张三调用了delete方法...警告。。。日志级别:waring...";
  165.         // 8、发送消息
  166.         channel.basicPublish(
  167.                 // 交换机名称。简单模式下交换机会使用默认的 ""
  168.                 exchangeName,
  169.                 // 路由名称 :如果交换机的类型为fanout ,routingKey设置为""
  170.                 "waring",
  171.                 // 配置信息
  172.                 null,
  173.                 // 发送消息数据
  174.                 body.getBytes()
  175.         );
  176.         
  177.         
  178.         // 9、释放资源
  179.         channel.close();
  180.         connection.close();
  181.         
  182.     }
  183. }
复制代码
消费者 1
  1. package com.www.mq;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * 消费者:获取消息
  7. *
  8. * @author Www
  9. * @version 1.8
  10. * @since 2023/2/15  20:41 星期三
  11. */
  12. public class ConsumerRouting1 {
  13.    
  14.     public static void main(String[] args) throws IOException, TimeoutException {
  15.         // 1、创建连接工厂
  16.         ConnectionFactory connectionFactory = new ConnectionFactory();
  17.         // 2、设置参数
  18.         // IP 默认值 localhost
  19.         connectionFactory.setHost("192.168.36.100");
  20.         // 端口 默认 5672
  21.         connectionFactory.setPort(5672);
  22.         // 虚拟机 默认值 /
  23.         connectionFactory.setVirtualHost("/ljt");
  24.         // 用户名 默认值 guest
  25.         connectionFactory.setUsername("ljt");
  26.         // 密码 默认值 guest
  27.         connectionFactory.setPassword("ljt");
  28.         
  29.         
  30.         // 3、创建连接 Connection : 受检异常——> 抛出
  31.         Connection connection = connectionFactory.newConnection();
  32.         
  33.         // 4、创建通道 Channel
  34.         Channel channel = connection.createChannel();
  35.         
  36.         // 队列名
  37.         String queue2Name = "test_direct_queue1";
  38.          /*
  39.             basicConsume(String queue, boolean autoAck, Consumer callback)
  40.             参数:
  41.                 1. queue:队列名称
  42.                 2. autoAck:是否自动确认
  43.                 3. callback:回调对象
  44.          */
  45.         // 5、接收消息
  46.         Consumer consumer = new DefaultConsumer(channel) {
  47.             /**
  48.              * <p>
  49.              *   回调方法,当收到消息后,会自动执行该方法
  50.              *      1. consumerTag:标识
  51.              *      2. envelope:获取一些信息,交换机,路由key...
  52.              *      3. properties:配置信息
  53.              *      4. body:数据
  54.              * </p>
  55.              */
  56.             @Override
  57.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  58.                 System.out.println("consumerTag:" + consumerTag);
  59.                 System.out.println("Exchange:" + envelope.getExchange());
  60.                 System.out.println("RoutingKey:" + envelope.getRoutingKey());
  61.                 System.out.println("properties:" + properties);
  62.                 System.out.println("body:" + new String(body));
  63.             }
  64.         };
  65.         channel.basicConsume(
  66.                 // 队列名称
  67.                 queue2Name,
  68.                 // 是否自动确认
  69.                 true,
  70.                 // 回调对象
  71.                 consumer
  72.         );
  73.         
  74.         // 不需要关闭资源
  75.     }
  76. }
复制代码
消费者2
  1. package com.www.mq;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * 消费者:获取消息
  7. *
  8. * @author Www
  9. * @version 1.8
  10. * @since 2023/2/15  20:41 星期三
  11. */
  12. public class ConsumerRouting2 {
  13.    
  14.     public static void main(String[] args) throws IOException, TimeoutException {
  15.         // 1、创建连接工厂
  16.         ConnectionFactory connectionFactory = new ConnectionFactory();
  17.         // 2、设置参数
  18.         // IP 默认值 localhost
  19.         connectionFactory.setHost("192.168.36.100");
  20.         // 端口 默认 5672
  21.         connectionFactory.setPort(5672);
  22.         // 虚拟机 默认值 /
  23.         connectionFactory.setVirtualHost("/ljt");
  24.         // 用户名 默认值 guest
  25.         connectionFactory.setUsername("ljt");
  26.         // 密码 默认值 guest
  27.         connectionFactory.setPassword("ljt");
  28.         
  29.         
  30.         // 3、创建连接 Connection : 受检异常——> 抛出
  31.         Connection connection = connectionFactory.newConnection();
  32.         
  33.         // 4、创建通道 Channel
  34.         Channel channel = connection.createChannel();
  35.         
  36.         // 队列名
  37.         String queue2Name = "test_direct_queue2";
  38.          /*
  39.             basicConsume(String queue, boolean autoAck, Consumer callback)
  40.             参数:
  41.                 1. queue:队列名称
  42.                 2. autoAck:是否自动确认
  43.                 3. callback:回调对象
  44.          */
  45.         // 5、接收消息
  46.         Consumer consumer = new DefaultConsumer(channel) {
  47.             /**
  48.              * <p>
  49.              *   回调方法,当收到消息后,会自动执行该方法
  50.              *      1. consumerTag:标识
  51.              *      2. envelope:获取一些信息,交换机,路由key...
  52.              *      3. properties:配置信息
  53.              *      4. body:数据
  54.              * </p>
  55.              */
  56.             @Override
  57.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  58.                 System.out.println("consumerTag:" + consumerTag);
  59.                 System.out.println("Exchange:" + envelope.getExchange());
  60.                 System.out.println("RoutingKey:" + envelope.getRoutingKey());
  61.                 System.out.println("properties:" + properties);
  62.                 System.out.println("body:" + new String(body));
  63.             }
  64.         };
  65.         channel.basicConsume(
  66.                 // 队列名称
  67.                 queue2Name,
  68.                 // 是否自动确认
  69.                 true,
  70.                 // 回调对象
  71.                 consumer
  72.         );
  73.         
  74.         // 不需要关闭资源
  75.     }
  76. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

盛世宏图

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表