尚未崩坏 发表于 2024-8-20 18:58:26

基于RabbitMQ的消息监听器

1. 背景

机构的新增、更新、删除在微服务A中已经完成了(微服务A已经部署,不能修改代码),假如在微服务A中对机构进行新增、更新、删除操作后,需要同步到自己的微服务B中,这里采用MQ消息关照的方式实现。
微服务A中配置如下:
消息发往的交换机为:itcast-auth,交换机的类型为:topic
发送消息的规则如下:
● 消息为json字符串
○ 如:{"type":"ORG","content":[{"managerId":"1","parentId":"0","name":"测试组织","id":"973902113476182273","status":true}],"operation":"UPDATE"}
● type表示变更的对象,比如组织:ORG
● content为更改对象列表
● operation类型列表
○ 新增-ADD
○ 修改-UPDATE
○ 删除-DEL 2. 消息监听器

/**
* 对于微服务A消息的处理
*/
@Slf4j
@Component
public class AuthMQListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = Constants.MQ.Queues.AUTH_TRANSPORT),
            exchange = @Exchange(name = "${rabbitmq.exchange}", type = ExchangeTypes.TOPIC),
            key = "#"
    ))
    public void listenAgencyMsg(String msg) {
      //{"type":"ORG","operation":"ADD","content":[{"id":"977263044792942657","name":"55","parentId":"0","managerId":null,"status":true}]}
      log.info("接收到消息 -> {}", msg);
      JSONObject jsonObject = JSONUtil.parseObj(msg);
      String type = jsonObject.getStr("type");
      if (!StrUtil.equalsIgnoreCase(type, "ORG")) {
            //非机构消息
            return;
      }
      String operation = jsonObject.getStr("operation");
      JSONObject content = (JSONObject) jsonObject.getJSONArray("content").getObj(0);
      String name = content.getStr("name");
      Long parentId = content.getLong("parentId");

      // 。。。消息处理。。。
    }

} 2.1 标记监听器

@RabbitListener(bindings = @QueueBinding(
      value = @Queue(name = Constants.MQ.Queues.AUTH_TRANSPORT),
      exchange = @Exchange(name = "${rabbitmq.exchange}", type = ExchangeTypes.TOPIC),
      key = "#"
))
public void listenAgencyMsg(String msg) {


[*]@RabbitListener:标记该方法为RabbitMQ的消息监听器。它会监听指定的队列并处置惩罚收到的消息。
[*]@QueueBinding:将队列与交换机绑定。

[*]@Queue(name = Constants.MQ.Queues.AUTH_TRANSPORT):指定要监听的队列的名称,这里在常量类里界说了。
[*]@Exchange(name = "${rabbitmq.exchange}", type = ExchangeTypes.TOPIC):指定交换机的名称和类型(Topic),这里在常量类里界说了,与微服务A中配置雷同。
[*]key = "#":路由键,这里#表示匹配所有路由键。

[*]listenAgencyMsg(String msg):当消息队列吸收到消息时,会调用这个方法,并将消息内容转达进来。
2.2 消息剖析

log.info("接收到消息 -> {}", msg);
JSONObject jsonObject = JSONUtil.parseObj(msg);
String type = jsonObject.getStr("type");
if (!StrUtil.equalsIgnoreCase(type, "ORG")) {
    //非机构消息
    return;
}
String operation = jsonObject.getStr("operation");
JSONObject content = (JSONObject) jsonObject.getJSONArray("content").getObj(0);
String name = content.getStr("name");
Long parentId = content.getLong("parentId");

[*]log.info("吸收到消息 -> {}", msg):记录吸收到的消息日志。
[*]JSONObject jsonObject = JSONUtil.parseObj(msg):将消息字符串剖析为JSON对象。
[*]String type = jsonObject.getStr("type"):从消息中提取type字段。
[*]if (!StrUtil.equalsIgnoreCase(type, "ORG")) { return; }:判定消息类型是否为“ORG”,假如不是,直接返回不做处置惩罚。


[*]提取operation字段:操作类型(如ADD、UPDATE、DEL)。
[*]提取content内容:content字段是一个数组,这里取第一个对象。
[*]提取name字段:表示机构的名称。
[*]提取parentId字段:表示父机构的ID。
3. RabbitMQ介绍

RabbitMQ是一种广泛利用的消息队列(Message Queue)系统,它基于AMQP(Advanced Message Queuing Protocol)协议,用于在差别的系统或组件之间转达消息。通过消息队列,系统可以实现解耦、异步处置惩罚、负载平衡等特性,从而进步系统的可扩展性和可靠性。
3.1 RabbitMQ的核心概念


[*] 生产者(Producer)

[*]生产者是消息的发送方。它负责将消息发送到RabbitMQ的交换机中。

[*] 斲丧者(Consumer)

[*]斲丧者是消息的吸收方。它从RabbitMQ的队列中获取并处置惩罚消息。

[*] 队列(Queue)

[*]队列是RabbitMQ内部存储消息的地方。消息从生产者发送到队列中,斲丧者从队列中获取消息。队列类似于一个消息的存储池。

[*] 交换机(Exchange)

[*]交换机负责吸收生产者发送的消息,并根据一定的路由规则将消息路由到一个或多个队列。交换机有差别的类型,常见的有:

[*]Direct Exchange:直接交换机,根据消息的路由键精确匹配队列。
[*]Fanout Exchange:扇出交换机,不考虑路由键,直接将消息广播到所有绑定的队列中。
[*]Topic Exchange:主题交换机,根据路由键的模式匹配(利用通配符)将消息路由到一个或多个队列。
[*]Headers Exchange:头交换机,通过消息的头下属性来路由消息。


[*] 路由键(Routing Key)

[*]路由键是生产者在将消息发送到交换机时指定的一个字符串。交换时机根据这个字符串决定将消息路由到哪个队列。

[*] 绑定(Binding)

[*]绑定是交换机和队列之间的毗连关系。通过绑定,可以将交换机和队列关联起来,并通过路由键决定消息的流向。

3.2 消息的生命周期


[*] 生产者发送消息

[*]生产者将消息发送到交换机,并指定一个路由键。

[*] 交换机路由消息

[*]交换机根据路由键和绑定规则,将消息路由到一个或多个队列中。

[*] 斲丧者吸收消息

[*]斲丧者从队列中取出消息并进行处置惩罚。处置惩罚完成后,斲丧者可以向RabbitMQ发送一个确认消息(ACK),告知RabbitMQ该消息已成功处置惩罚。

[*] 消息确认与重试

[*]假如斲丧者处置惩罚消息失败,可以选择不发送确认消息,RabbitMQ会将消息重新放回队列,等候其他斲丧者处置惩罚,或进行重试。

3.3 RabbitMQ的常见利用场景


[*] 解耦

[*]在复杂系统中,各个组件之间可能有很强的依赖性。通过消息队列,生产者和斲丧者可以实现解耦,生产者只需将消息发送到队列,不需要关心谁会处置惩罚这些消息。

[*] 异步处置惩罚

[*]有些任务可能是耗时操作,比方生成陈诉、图片处置惩罚等。通过消息队列,系统可以将这些耗时操作异步处置惩罚,不会阻塞主流程。

[*] 负载平衡

[*]RabbitMQ可以将消息分发给多个斲丧者,从而实现负载平衡。即使流量高峰期,消息处置惩罚也不会成为系统瓶颈。

[*] 消息广播

[*]通过Fanout Exchange,可以实现消息广播,将同一消息同时发送给多个队列,让多个系统或服务同时收到消息并处置惩罚。

4. 总结

这段代码的主要作用是通过监听RabbitMQ消息队列,处置惩罚微服务A中与机构干系的消息。在微服务B中通过剖析消息内容,动态确定消息的类型和需要实行的操作,并调用相应的服务处置惩罚该消息。这种设计可以有用地处置惩罚异步消息,并将业务逻辑与消息队列解耦。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 基于RabbitMQ的消息监听器