愛在花開的季節 发表于 2025-1-3 15:24:57

操纵018:Stream Queue

操纵018:Stream Queue

一、启用插件

   说明:只有启用了Stream插件,才华使用流式队列的完整功能
在集群每个节点中依次执行如下操纵:
# 启用Stream插件
rabbitmq-plugins enable rabbitmq_stream

# 重启rabbit应用
rabbitmqctl stop_app
rabbitmqctl start_app

# 查看插件状态
rabbitmq-plugins list
https://i-blog.csdnimg.cn/direct/81cecb36a14045b6b7ea950c1917690c.png
二、负载平衡

在文件/etc/haproxy/haproxy.cfg末了追加:
frontend rabbitmq_stream_frontend
bind 192.168.200.100:33333
mode tcp
default_backend rabbitmq_stream_backend

backend rabbitmq_stream_backend
mode tcp
balance roundrobin
server rabbitmq1 192.168.200.100:5552 check
server rabbitmq2 192.168.200.150:5552 check
server rabbitmq3 192.168.200.200:5552 check
三、Java代码

1、引入依赖

Stream 专属 Java 客户端官方网址:https://github.com/rabbitmq/rabbitmq-stream-java-client

Stream 专属 Java 客户端官方文档网址:https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/
<dependencies>
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>stream-client</artifactId>
      <version>0.15.0</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.30</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.2.3</version>
    </dependency>
</dependencies>
2、创建Stream

   说明:不需要创建交换机
①代码方式创建

Environment environment = Environment.builder()
      .host("192.168.200.100")
      .port(33333)
      .username("atguigu")
      .password("123456")
      .build();

environment.streamCreator().stream("stream.atguigu.test2").create();

environment.close();
②ManagementUI创建

https://i-blog.csdnimg.cn/direct/8ad60d3bd7604897b9809fa282e3d6a4.png
3、生产者端步调

①内部机制说明

官方文档

   Internally, the Environment will query the broker to find out about the topology of the stream and will create or re-use a connection to publish to the leader node of the stream.
翻译:
   在内部,Environment将查询broker以了解流的拓扑结构,并将创建或重用毗连以发布到流的 leader 节点。
剖析



[*]在 Environment 中封装的毗连信息仅负责毗连到 broker
[*]Producer 在构建对象时会访问 broker 拉取集群中 Leader 的毗连信息
[*]未来实际访问的是集群中的 Leader 节点
[*]Leader 的毗连信息格式是:节点名称:端口号
https://i-blog.csdnimg.cn/direct/fe96f95b22c54a1b81000a63abc66dc7.png
配置

为了让本机的应用步调知道 Leader 节点名称对应的 IP 地址,我们需要在本地配置 hosts 文件,创建从节点名称到 IP 地址的映射关系
https://i-blog.csdnimg.cn/direct/64f3d2e88450484d99db865f13c12287.png
②示例代码

Environment environment = Environment.builder()
      .host("192.168.200.100")
      .port(33333)
      .username("atguigu")
      .password("123456")
      .build();

Producer producer = environment.producerBuilder()
      .stream("stream.atguigu.test")
      .build();

byte[] messagePayload = "hello rabbit stream".getBytes(StandardCharsets.UTF_8);

CountDownLatch countDownLatch = new CountDownLatch(1);

producer.send(
      producer.messageBuilder().addData(messagePayload).build(),
      confirmationStatus -> {
            if (confirmationStatus.isConfirmed()) {
                System.out.println("[生产者端]the message made it to the broker");
            } else {
                System.out.println("[生产者端]the message did not make it to the broker");
            }

            countDownLatch.countDown();
      });

countDownLatch.await();

producer.close();

environment.close();
4、消耗端步调

Environment environment = Environment.builder()
      .host("192.168.200.100")
      .port(33333)
      .username("atguigu")
      .password("123456")
      .build();

environment.consumerBuilder()
      .stream("stream.atguigu.test")
      .name("stream.atguigu.test.consumer")
      .autoTrackingStrategy()
      .builder()
      .messageHandler((offset, message) -> {
            byte[] bodyAsBinary = message.getBodyAsBinary();
            String messageContent = new String(bodyAsBinary);
            System.out.println("[消费者端]messageContent = " + messageContent + " Offset=" + offset.offset());
      })
      .build();
四、指定偏移量消耗

1、偏移量

https://i-blog.csdnimg.cn/direct/a72ae86f06c94249a587608aa69cf03c.png
2、官方文档说明

   The offset is the place in the stream where the consumer starts consuming from. The possible values for the offset parameter are the following:


[*]OffsetSpecification.first(): starting from the first available offset. If the stream has not been truncated, this means the beginning of the stream (offset 0).
[*]OffsetSpecification.last(): starting from the end of the stream and returning the last chunk of messages immediately (if the stream is not empty).
[*]OffsetSpecification.next(): starting from the next offset to be written. Contrary to OffsetSpecification.last(), consuming with OffsetSpecification.next() will not return anything if no-one is publishing to the stream. The broker will start sending messages to the consumer when messages are published to the stream.
[*]OffsetSpecification.offset(offset): starting from the specified offset. 0 means consuming from the beginning of the stream (first messages). The client can also specify any number, for example the offset where it left off in a previous incarnation of the application.
[*]OffsetSpecification.timestamp(timestamp): starting from the messages stored after the specified timestamp. Note consumers can receive messages published a bit before the specified timestamp. Application code can filter out those messages if necessary.
3、指定Offset消耗

Environment environment = Environment.builder()
      .host("192.168.200.100")
      .port(33333)
      .username("atguigu")
      .password("123456")
      .build();

CountDownLatch countDownLatch = new CountDownLatch(1);

Consumer consumer = environment.consumerBuilder()
      .stream("stream.atguigu.test")
      .offset(OffsetSpecification.first())
      .messageHandler((offset, message) -> {
            byte[] bodyAsBinary = message.getBodyAsBinary();
            String messageContent = new String(bodyAsBinary);
            System.out.println("[消费者端]messageContent = " + messageContent);
            countDownLatch.countDown();
      })
      .build();

countDownLatch.await();

consumer.close();
4、对比



[*]autoTrackingStrategy 方式:始终监听Stream中的新消息(狗狗看家,忠于职守)
[*]指定偏移量方式:针对指定偏移量的消息消耗之后就制止(狗狗叼飞盘,叼回来就完)

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 操纵018:Stream Queue