ToB企服应用市场:ToB评测及商务社交产业平台

标题: 【RabbitMQ】rabbitmq广播模式的使用 [打印本页]

作者: 前进之路    时间: 昨天 23:55
标题: 【RabbitMQ】rabbitmq广播模式的使用
媒介:
        项目需要同步另一个系统的数据,对方系统接纳MQ的发布/订阅模式方便我们同步数据,即当对方系统中的某条数据修改后,会向绑定他们交换机的每一个队列发布消息。消费者(即我们)监听到消息变动,举行信息消费同步至我们库中。
我们需要做的就是:
        1、创建一个新队列绑定到对方系统的交换机
        2、将监听到的消息举行合明白析,取出消息中的请求头:
              请求头信息为:"R"  ,则代表该生为入学操作;
              请求头信息为:"X"  ,则代表该生为休学操作;
              请求头信息为:"T"  ,则代表该生为退学操作;
        3、接下来根据获取到的请求头内容,来对对方系统传来的数据举行对应操作。
上代码,看思路:
    实现1:
  1. /**
  2. * @Author: 宁兴星
  3. * @CreateTime: 2026-01-16  14:05
  4. * @Description: TODO
  5. */
  6. @Configuration
  7. public class RabbitMqConfig extends AbstractRabbitMQConfig {
  8.     /**
  9.      * 创建广播模式交换机(扇形)
  10.      */
  11.     @Bean
  12.     public FanoutExchange fanoutExchange() {
  13.         return new FanoutExchange(EventConstant.STUDENT_EXCHANGE, true, false);
  14.     }
  15.     /**
  16.      * 创建被监听的队列
  17.      */
  18.     @Bean
  19.     public Queue dealerInfoQueue() {
  20.         return new Queue(EventConstant.STUDENT_QUEUE, true, false, false);
  21.     }
  22.     /**
  23.      * 将队列绑定到扇形交换机上,实现广播模式消息接收
  24.      *
  25.      * @param dealerInfoQueue
  26.      * @param fanoutExchange
  27.      * @return
  28.      */
  29.     @Bean
  30.     public Binding binding(Queue dealerInfoQueue, FanoutExchange fanoutExchange) {
  31.         return BindingBuilder.bind(dealerInfoQueue).to(fanoutExchange);
  32.     }
  33.     /**
  34.      * 配置消息监听容器工厂
  35.      *
  36.      * @param connectionFactory
  37.      * @return
  38.      */
  39.     @Bean
  40.     public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
  41.         SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  42.         factory.setConnectionFactory(connectionFactory);
  43.         factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  44.         factory.setPrefetchCount(10);
  45.         return factory;
  46.     }
  47. }
复制代码
实现2:
       
  1.     /**
  2.      * MQ监听学生数据变更
  3.      *
  4.      * @param message 消息体
  5.      * @param deliveryTag 消息标识
  6.      * @param channel 通道
  7.      * @throws IOException IO异常
  8.      */
  9.     @RabbitListener(queues = EventConstant.STUDENT_QUEUE)
  10.     @Operation(summary = "MQ监听学生数据变更", description = "MQ监听学生数据变更")
  11.     public void handleMessage(Message message,
  12.                             @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
  13.                             Channel channel) throws IOException {
  14.         try {
  15.             log.info("同步学生数据,接收到MQ消息: {}", message);
  16.             
  17.             // 解析学生信息
  18.             StudentInfo studentInfo = parseStudentInfo(message);
  19.             log.info("解析后的学生数据: {}", studentInfo);
  20.             
  21.             // 获取操作类型并处理
  22.             String action = getActionFromHeaders(message);
  23.             processStudentAction(action, studentInfo);
  24.             
  25.             // 确认消息处理完成
  26.             channel.basicAck(deliveryTag, false);
  27.             
  28.         } catch (Exception e) {
  29.             log.error("处理学生数据消息异常: ", e);
  30.             // 消息处理失败,重新入队
  31.             channel.basicNack(deliveryTag, false, true);
  32.         }
  33.     }
  34.     /**
  35.      * 解析消息中的学生信息
  36.      */
  37.     private StudentInfo parseStudentInfo(Message message) throws IOException {
  38.         ObjectMapper objectMapper = new ObjectMapper();
  39.         return objectMapper.readValue(message.getBody(), StudentInfo.class);
  40.     }
  41.     /**
  42.      * 从消息头获取action
  43.      */
  44.     private String getActionFromHeaders(Message message) {
  45.         Map<String, Object> headers = message.getMessageProperties().getHeaders();
  46.         String action = headers.get("action").toString();
  47.         log.info("操作类型: {}", action);
  48.         return action;
  49.     }
复制代码
实现3:
  1.      /**
  2.      * 根据不同action处理学生数据
  3.      */
  4.     private void processStudentAction(String action, StudentInfo studentInfo) {
  5.         if (action == null) {
  6.             return;
  7.         }
  8.         
  9.         switch (action) {
  10.             case EventConstant.LIGHT_UP:
  11.                 // 编写对应录取方法,此处省略具体信息
  12.                 handleLightUp(studentInfo);
  13.                 break;
  14.             case EventConstant.OFFLINE:
  15.                 // 编写对应休学方法,此处省略具体信息
  16.                 handleOffline(studentInfo);
  17.                 break;
  18.             case EventConstant.DELETE:
  19.                 // 编写对应退学方法,此处省略具体信息
  20.                 handleDelete(studentInfo);
  21.                 break;
  22.             default:
  23.                 log.warn("未知的操作类型: {}", action);
  24.         }
  25.     }
复制代码
竣事啦,如有错误,敬请雅正!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4