媒介:
项目需要同步另一个系统的数据,对方系统接纳MQ的发布/订阅模式方便我们同步数据,即当对方系统中的某条数据修改后,会向绑定他们交换机的每一个队列发布消息。消费者(即我们)监听到消息变动,举行信息消费同步至我们库中。
我们需要做的就是:
1、创建一个新队列绑定到对方系统的交换机
2、将监听到的消息举行合明白析,取出消息中的请求头:
请求头信息为:"R" ,则代表该生为入学操作;
请求头信息为:"X" ,则代表该生为休学操作;
请求头信息为:"T" ,则代表该生为退学操作;
3、接下来根据获取到的请求头内容,来对对方系统传来的数据举行对应操作。
上代码,看思路:
实现1:
- /**
- * @Author: 宁兴星
- * @CreateTime: 2026-01-16 14:05
- * @Description: TODO
- */
- @Configuration
- public class RabbitMqConfig extends AbstractRabbitMQConfig {
- /**
- * 创建广播模式交换机(扇形)
- */
- @Bean
- public FanoutExchange fanoutExchange() {
- return new FanoutExchange(EventConstant.STUDENT_EXCHANGE, true, false);
- }
- /**
- * 创建被监听的队列
- */
- @Bean
- public Queue dealerInfoQueue() {
- return new Queue(EventConstant.STUDENT_QUEUE, true, false, false);
- }
- /**
- * 将队列绑定到扇形交换机上,实现广播模式消息接收
- *
- * @param dealerInfoQueue
- * @param fanoutExchange
- * @return
- */
- @Bean
- public Binding binding(Queue dealerInfoQueue, FanoutExchange fanoutExchange) {
- return BindingBuilder.bind(dealerInfoQueue).to(fanoutExchange);
- }
- /**
- * 配置消息监听容器工厂
- *
- * @param connectionFactory
- * @return
- */
- @Bean
- public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setConnectionFactory(connectionFactory);
- factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
- factory.setPrefetchCount(10);
- return factory;
- }
- }
复制代码 实现2:
- /**
- * MQ监听学生数据变更
- *
- * @param message 消息体
- * @param deliveryTag 消息标识
- * @param channel 通道
- * @throws IOException IO异常
- */
- @RabbitListener(queues = EventConstant.STUDENT_QUEUE)
- @Operation(summary = "MQ监听学生数据变更", description = "MQ监听学生数据变更")
- public void handleMessage(Message message,
- @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
- Channel channel) throws IOException {
- try {
- log.info("同步学生数据,接收到MQ消息: {}", message);
-
- // 解析学生信息
- StudentInfo studentInfo = parseStudentInfo(message);
- log.info("解析后的学生数据: {}", studentInfo);
-
- // 获取操作类型并处理
- String action = getActionFromHeaders(message);
- processStudentAction(action, studentInfo);
-
- // 确认消息处理完成
- channel.basicAck(deliveryTag, false);
-
- } catch (Exception e) {
- log.error("处理学生数据消息异常: ", e);
- // 消息处理失败,重新入队
- channel.basicNack(deliveryTag, false, true);
- }
- }
- /**
- * 解析消息中的学生信息
- */
- private StudentInfo parseStudentInfo(Message message) throws IOException {
- ObjectMapper objectMapper = new ObjectMapper();
- return objectMapper.readValue(message.getBody(), StudentInfo.class);
- }
- /**
- * 从消息头获取action
- */
- private String getActionFromHeaders(Message message) {
- Map<String, Object> headers = message.getMessageProperties().getHeaders();
- String action = headers.get("action").toString();
- log.info("操作类型: {}", action);
- return action;
- }
复制代码 实现3:
- /**
- * 根据不同action处理学生数据
- */
- private void processStudentAction(String action, StudentInfo studentInfo) {
- if (action == null) {
- return;
- }
-
- switch (action) {
- case EventConstant.LIGHT_UP:
- // 编写对应录取方法,此处省略具体信息
- handleLightUp(studentInfo);
- break;
- case EventConstant.OFFLINE:
- // 编写对应休学方法,此处省略具体信息
- handleOffline(studentInfo);
- break;
- case EventConstant.DELETE:
- // 编写对应退学方法,此处省略具体信息
- handleDelete(studentInfo);
- break;
- default:
- log.warn("未知的操作类型: {}", action);
- }
- }
复制代码 竣事啦,如有错误,敬请雅正!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |