RabbitMQ深度探索:创建消息队列

打印 上一主题 下一主题

主题 961|帖子 961|积分 2883

快速入门:实现 RabbitMQ 简单队列:


  • 在 RabbitMQ 平台创建 Virtual Hosts 和一个队列
  • /boyaVirtualHosts

    • 订单队列
    • 付出队列

  • 导入依赖:

    1. <dependency>
    2.   <groupId>com.rabbitmq</groupId>
    3.   <artifactId>amqp-client</artifactId>
    4.   <version>3.6.5 </version>
    5. </dependency>
    复制代码
  • 编写毗连类:

    1. public class RabbitMQConnection {
    2.     /**
    3.      * 获取连接
    4.      */
    5.     public static Connection getConnection() throws IOException, TimeoutException {
    6.         // 1.创建连接
    7.         ConnectionFactory connectionFactory = new ConnectionFactory();
    8.         // 2.设置连接地址
    9.         connectionFactory.setHost("127.0.0.1");
    10.         // 3.设置端口号
    11.         connectionFactory.setPort(5672);
    12.         // 4.设置账号和密码
    13.         connectionFactory.setUsername("guest");
    14.         connectionFactory.setPassword("guest");
    15.         // 5.设置 VirtualHost
    16.         connectionFactory.setVirtualHost("/boyaVirtualHostsR");
    17.         return connectionFactory.newConnection();
    18.     }
    19. }
    复制代码
  • 编写生产者代码:

    1. public class Producer {
    2.     private static final String QUEUE_NAME = "BoyatopMamber";
    3.     /**
    4.      * 获取连接
    5.      */
    6.     public static void main(String[] args) throws IOException, TimeoutException {
    7.         while (true){
    8.             // 1.创建连接
    9.             Connection connection = RabbitMQConnection.getConnection();
    10.             // 2.设置通道
    11.             Channel channel = connection.createChannel();
    12.             // 3.设置消息
    13.             String msg = "Hello World";
    14.             System.out.println("msg:" + msg);
    15.             channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    16.             channel.close();
    17.             connection.close();
    18.         }
    19.     }
    20. }
    复制代码
  • 编写消费者代码:

    1. public class Comsumer {
    2.     private static final String QUEUE_NAME = "BoyatopMamber";
    3.     public static void main(String[] args) throws IOException, TimeoutException {
    4.         // 1.创建链接
    5.         Connection connection = RabbitMQConnection.getConnection();
    6.         // 2.设置通道
    7.         Channel channel = connection.createChannel();
    8.         DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    9.             @Override
    10.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    11.                 String msg = new String(body,"UTF-8");
    12.                 System.out.println("消费者获取消息:" + msg);
    13.             }
    14.         };
    15.         // 3.监听队列
    16.         channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    17.     }
    18. }
    复制代码
  • RabbitMQ 怎样包管消息不丢失:

    • 生产者角色:

      • 确保生产者角色投递到 MQ 服务器端乐成
      • Ack 消息确认机制
      • 同步或异步的形式:

        • Confirms
        • 事件消息


    • 消费者角色:

      • 在 RabbitMQ 情况下:

        • 必须要将消息消费乐成之后,才会将消息从 MQ 服务器端移除

      • 在 kafka 中的情况下:

        • 不管是消费乐成照旧消费失败,该消息都不会立即从 MQ 服务器移除


    • MQ 服务器端:

      • 在默认的情况下,都会对队列中的消息长期化,长期化硬盘


  • 利用消息确认机制 + 长期化技术实现:

    • A 消费者确认收到消息机制
      1. channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
      复制代码

      • 第二个参数值为 false,代表关闭 RabbitMQ 的主动应答机制,改为手动应答
      • 在处置惩罚完消息时,返回应答状态,true 表示为主动应答模式
        1. channel.basicAck(envelope.getDeliveryTag(),false);
        复制代码

    • B 生产者确认投递消息乐成,利用 Confirm 机制或者事件消息

    • Confirm 机制,同步或异步的形式

  • RabbitMQ 默认创建是长期化的形式:

    • 将代码中的 durable 设为 true
    • 参数详解:

      • Durability:是否长期化

        • durable:长期化
        • Transient:不长期化

      • Auto delete:是否主动删除

        • 当最后一个消费者断开毗连之后队列是否主动被删除
        • 可以通过 RabbitMQ Management 查看某个队列的消费者数量
        • 当 consumers = 0 时,队列就会主动删除



  • 利用 RabbitMQ 事件:
    1. //设置事务
    2. channel.txSelect();
    3. channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    4. channel.txCommit();
    复制代码

    • 生产者:
      1. public class producer {
      2.     private static final String QUEUE_NAME = "BoyatopMamber";
      3.     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
      4.         // 1.创建新的连接
      5.         Connection connection = RabbitMQConnection.getConnection();
      6.         // 2.设置 channel
      7.         Channel channel = connection.createChannel();
      8.         // 3.发送消息
      9.         String msg = "Hello my Bro";
      10.         channel.confirmSelect();
      11.         channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
      12.         boolean result = channel.waitForConfirms();
      13.         if(result){
      14.             System.out.println("消息投递成功");
      15.         }else {
      16.             System.out.println("消息投递失败");
      17.         }
      18.         // 4.关闭资源
      19.         channel.close();
      20.         connection.close();
      21.     }
      22. }
      复制代码
    • 消费者:
      1. public class Consumer {
      2.     private static final String QUEUE_ANME = "BoyatopMamber";
      3.     public static void main(String[] args) throws IOException, TimeoutException {
      4.         // 1.创建连接
      5.         Connection connection = RabbitMQConnection.getConnection();
      6.         //2.设置通道
      7.         Channel channel = connection.createChannel();
      8.         DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
      9.             @Override
      10.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      11.                 String msg = new String(body,"UTF-8");
      12.                 System.out.println("消费者获取消息:" + msg);
      13.                 //消费者完成 消费该消息
      14.                 channel.basicAck(envelope.getDeliveryTag(),false);
      15.             }
      16.         };
      17.         // 3.监听队列
      18.         channel.basicConsume(QUEUE_ANME,false,defaultConsumer);
      19.     }
      20. }
      复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦见你的名字

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表