IT评测·应用市场-qidao123.com技术社区

标题: 操纵018:Stream Queue [打印本页]

作者: 愛在花開的季節    时间: 2025-1-3 15:24
标题: 操纵018:Stream Queue
操纵018:Stream Queue

一、启用插件

   说明:只有启用了Stream插件,才华使用流式队列的完整功能
  在集群每个节点中依次执行如下操纵:
  1. # 启用Stream插件
  2. rabbitmq-plugins enable rabbitmq_stream
  3. # 重启rabbit应用
  4. rabbitmqctl stop_app
  5. rabbitmqctl start_app
  6. # 查看插件状态
  7. rabbitmq-plugins list
复制代码

二、负载平衡

在文件/etc/haproxy/haproxy.cfg末了追加:
  1. frontend rabbitmq_stream_frontend
  2. bind 192.168.200.100:33333
  3. mode tcp
  4. default_backend rabbitmq_stream_backend
  5. backend rabbitmq_stream_backend
  6. mode tcp
  7. balance roundrobin
  8. server rabbitmq1 192.168.200.100:5552 check
  9. server rabbitmq2 192.168.200.150:5552 check
  10. server rabbitmq3 192.168.200.200:5552 check
复制代码
三、Java代码

1、引入依赖

Stream 专属 Java 客户端官方网址:https://github.com/rabbitmq/rabbitmq-stream-java-client

Stream 专属 Java 客户端官方文档网址:https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/
  1. <dependencies>
  2.     <dependency>
  3.         <groupId>com.rabbitmq</groupId>
  4.         <artifactId>stream-client</artifactId>
  5.         <version>0.15.0</version>
  6.     </dependency>
  7.     <dependency>
  8.         <groupId>org.slf4j</groupId>
  9.         <artifactId>slf4j-api</artifactId>
  10.         <version>1.7.30</version>
  11.     </dependency>
  12.     <dependency>
  13.         <groupId>ch.qos.logback</groupId>
  14.         <artifactId>logback-classic</artifactId>
  15.         <version>1.2.3</version>
  16.     </dependency>
  17. </dependencies>
复制代码
2、创建Stream

   说明:不需要创建交换机
  ①代码方式创建

  1. Environment environment = Environment.builder()
  2.         .host("192.168.200.100")
  3.         .port(33333)
  4.         .username("atguigu")
  5.         .password("123456")
  6.         .build();
  7. environment.streamCreator().stream("stream.atguigu.test2").create();
  8. environment.close();
复制代码
②ManagementUI创建


3、生产者端步调

①内部机制说明

[1]官方文档

   Internally, the Environment will query the broker to find out about the topology of the stream and will create or re-use a connection to publish to the leader node of the stream.
  翻译:
   在内部,Environment将查询broker以了解流的拓扑结构,并将创建或重用毗连以发布到流的 leader 节点。
  [2]剖析



[3]配置

为了让本机的应用步调知道 Leader 节点名称对应的 IP 地址,我们需要在本地配置 hosts 文件,创建从节点名称到 IP 地址的映射关系

②示例代码

  1. Environment environment = Environment.builder()
  2.         .host("192.168.200.100")
  3.         .port(33333)
  4.         .username("atguigu")
  5.         .password("123456")
  6.         .build();
  7. Producer producer = environment.producerBuilder()
  8.         .stream("stream.atguigu.test")
  9.         .build();
  10. byte[] messagePayload = "hello rabbit stream".getBytes(StandardCharsets.UTF_8);
  11. CountDownLatch countDownLatch = new CountDownLatch(1);
  12. producer.send(
  13.         producer.messageBuilder().addData(messagePayload).build(),
  14.         confirmationStatus -> {
  15.             if (confirmationStatus.isConfirmed()) {
  16.                 System.out.println("[生产者端]the message made it to the broker");
  17.             } else {
  18.                 System.out.println("[生产者端]the message did not make it to the broker");
  19.             }
  20.             countDownLatch.countDown();
  21.         });
  22. countDownLatch.await();
  23. producer.close();
  24. environment.close();
复制代码
4、消耗端步调

  1. Environment environment = Environment.builder()
  2.         .host("192.168.200.100")
  3.         .port(33333)
  4.         .username("atguigu")
  5.         .password("123456")
  6.         .build();
  7. environment.consumerBuilder()
  8.         .stream("stream.atguigu.test")
  9.         .name("stream.atguigu.test.consumer")
  10.         .autoTrackingStrategy()
  11.         .builder()
  12.         .messageHandler((offset, message) -> {
  13.             byte[] bodyAsBinary = message.getBodyAsBinary();
  14.             String messageContent = new String(bodyAsBinary);
  15.             System.out.println("[消费者端]messageContent = " + messageContent + " Offset=" + offset.offset());
  16.         })
  17.         .build();
复制代码
四、指定偏移量消耗

1、偏移量


2、官方文档说明

   The offset is the place in the stream where the consumer starts consuming from. The possible values for the offset parameter are the following:
  
  3、指定Offset消耗

  1. Environment environment = Environment.builder()
  2.         .host("192.168.200.100")
  3.         .port(33333)
  4.         .username("atguigu")
  5.         .password("123456")
  6.         .build();
  7. CountDownLatch countDownLatch = new CountDownLatch(1);
  8. Consumer consumer = environment.consumerBuilder()
  9.         .stream("stream.atguigu.test")
  10.         .offset(OffsetSpecification.first())
  11.         .messageHandler((offset, message) -> {
  12.             byte[] bodyAsBinary = message.getBodyAsBinary();
  13.             String messageContent = new String(bodyAsBinary);
  14.             System.out.println("[消费者端]messageContent = " + messageContent);
  15.             countDownLatch.countDown();
  16.         })
  17.         .build();
  18. countDownLatch.await();
  19. consumer.close();
复制代码
4、对比



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/) Powered by Discuz! X3.4