rabbitMQ消息转换器

打印 上一主题 下一主题

主题 1020|帖子 1020|积分 3060

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
消息转换器

Spring的消息发送代码吸取的消息体是一个Object:


而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,吸取消息的时间,还会把字节反序列化为Java对象。
只不外,默认环境下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:


  • 数据体积过大
  • 有安全漏洞
  • 可读性差
我们来测试一下。

测试默认转换器

创建测试队列

我们在consumer服务中声明一个设置类,MessageConfig,里面编写方法创建队列。
  1. @Configuration
  2. public class MessageConfig {
  3.     @Bean
  4.     public Queue objectQueue(){
  5.         return new Queue("object.queue");
  6.     }
  7. }
复制代码


重启consumer服务,看看rabbit控制台。


发送map消息

在consumer服务中,新增ObjectQueueTest类,发送消息。
  1. @SpringBootTest
  2. public class ObjectQueueTest {
  3.     @Autowired
  4.     private RabbitTemplate rabbitTemplate;
  5.     @Test
  6.     public void testSendObjectQueue(){
  7.         // 1 队列
  8.         String queueName = "object.queue";
  9.         // 2 消息
  10.         Map<String,Object> map = new HashMap<>();
  11.         map.put("name","sde");
  12.         map.put("age",18);
  13.         // 3 发送消息
  14.         rabbitTemplate.convertAndSend(queueName,map);
  15.     }
  16. }
复制代码


看看控制台是否有消息


查看消息



可以看到消息格式非常不友好。

设置JSON转换器

添加依赖

   显然,JDK序列化方式并不合适。我们盼望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
  在publisher和consumer两个服务中都引入依赖
  1. <dependency>
  2.   <groupId>com.fasterxml.jackson.core</groupId>
  3.   <artifactId>jackson-databind</artifactId>
  4. </dependency>
复制代码


注意:如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。
设置消息转换器

   设置消息转换器,在publisher和consumer两个服务的启动类中添加一个Bean即可。
  在PublisherApplication启动类中添加
  1. @SpringBootApplication
  2. public class PublisherApplication {
  3.     public static void main(String[] args) {
  4.         SpringApplication.run(PublisherApplication.class);
  5.     }
  6.     @Bean
  7.     public MessageConverter messageConverter(){
  8.         // 1 定义消息转换器
  9.         Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
  10.         //2、配置每条消息自动创建id;用于识别不同消息,也可以在页面中基于id判断是否是重复消息
  11.         jjmc.setCreateMessageIds(true);
  12.         return jjmc;
  13.     }
  14. }
复制代码


在ConsumerApplication 启动类添加如下
  1. @SpringBootApplication
  2. public class ConsumerApplication {
  3.     public static void main(String[] args) {
  4.         SpringApplication.run(ConsumerApplication.class, args);
  5.     }
  6.     @Bean
  7.     public MessageConverter messageConverter(){
  8.         // 1 定义消息转换器
  9.         Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
  10.         //2、配置每条消息自动创建id;用于识别不同消息,也可以在页面中基于id判断是否是重复消息
  11.         jjmc.setCreateMessageIds(true);
  12.         return jjmc;
  13.     }
  14. }
复制代码



测试

① 在rabbitMQ的控制台中删除 object.queue 队列中的消息;重新启动 consumer
② 执行 com.sde.publisher.ObjectQueueTest类里面的方法,发送消息。
③ 在rabbitMQ的控制台中;查看消息

删除队列里面的消息


重启了 consumer服务


再次发送了一条消息


控制台查看消息


消费者吸取Object

我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用Map吸取。
新创建 ObjectQueueListener 类,编写代码吸取消息。

  1. @Slf4j
  2. @Component
  3. public class ObjectQueueListener {
  4.     /**
  5.      * 监听 object.queue 队列中的消息。
  6.      */
  7.     @RabbitListener(queues = "object.queue")
  8.     public void listenObjectQueue(Map<String,Object> map){
  9.         System.out.println("【消费者】监听到 object.queue 队列的消息:"+map);
  10.     }
  11. }
复制代码


控制台



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

立聪堂德州十三局店

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表