尚品汇-商品上下架完善(更新ES)、延迟消息(四十四) ...

打印 上一主题 下一主题

主题 573|帖子 573|积分 1719

目次:
(1)改造商品搜刮上下架
(2)延迟消息
(1)改造商品搜刮上下架



定义商品上下架常量
  1. 在rabbit-util模块中导入常量类MqConst。
复制代码
  1. /**
  2. * 商品上下架.
  3. */
  4. public static final String EXCHANGE_DIRECT_GOODS = "exchange.direct.goods";
  5. public static final String ROUTING_GOODS_UPPER = "goods.upper";
  6. public static final String ROUTING_GOODS_LOWER = "goods.lower";
  7. //队列
  8. public static final String QUEUE_GOODS_UPPER  = "queue.goods.upper";
  9. public static final String QUEUE_GOODS_LOWER  = "queue.goods.lower";
复制代码
service-list与service-product引入依赖与配置
  1. <!--rabbitmq消息队列-->
  2. <dependency>
  3.    <groupId>com.atguigu.gmall</groupId>
  4.    <artifactId>rabbit-util</artifactId>
  5.    <version>1.0</version>
  6. </dependency>
复制代码
service-product发送消息
我在商品上架与商品添加时发送消息

商品上架
实现类

  1. @Override
  2. @Transactional
  3. public void onSale(Long skuId) {
  4.     // 更改销售状态
  5.     SkuInfo skuInfoUp = new SkuInfo();
  6.     skuInfoUp.setId(skuId);
  7.     skuInfoUp.setIsSale(1);
  8.     skuInfoMapper.updateById(skuInfoUp);
  9.     //商品上架 交换机 路由key 队列
  10.     rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_GOODS, MqConst.ROUTING_GOODS_UPPER, skuId);
  11. }
复制代码
商品下架
  1. @Override
  2. @Transactional
  3. public void cancelSale(Long skuId) {
  4.     // 更改销售状态
  5.     SkuInfo skuInfoUp = new SkuInfo();
  6.     skuInfoUp.setId(skuId);
  7.     skuInfoUp.setIsSale(0);
  8.     skuInfoMapper.updateById(skuInfoUp);
  9.     //商品下架  交换机 路由key 队列
  10.     rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_GOODS, MqConst.ROUTING_GOODS_LOWER, skuId);
  11. }
复制代码
service-list消费消息


  1. package com.atguigu.gmall.list.receiver;
  2. import com.atguigu.gmall.constant.MqConst;
  3. import com.atguigu.gmall.list.service.SearchService;
  4. import com.rabbitmq.client.Channel;
  5. import lombok.SneakyThrows;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.amqp.rabbit.annotation.Exchange;
  8. import org.springframework.amqp.rabbit.annotation.Queue;
  9. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  10. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.stereotype.Component;
  13. @Component
  14. public class ListReceiver {
  15.     @Autowired
  16.     private SearchService searchService;
  17.     //监听上架队列
  18.     @SneakyThrows
  19.     @RabbitListener(bindings = @QueueBinding(
  20.             value =@Queue(value = MqConst.QUEUE_GOODS_UPPER,durable = "true",autoDelete = "false"),
  21.             exchange =@Exchange(value = MqConst.EXCHANGE_DIRECT_GOODS,autoDelete = "false"),
  22.             key = {MqConst.ROUTING_GOODS_UPPER}
  23.     ))
  24.     public void upperGoodsToEs(Long skuId, Message message, Channel channel){
  25.         try {
  26.             //判断
  27.             if(skuId!=null){
  28.                 //操作搜索模块操作ES的上架方法
  29.                 searchService.upperGoods(skuId);
  30.             }
  31.         } catch (Exception e) {
  32.             //写入日志文件 ,,写入数据库, 对接程序员手机短信
  33.             e.printStackTrace();
  34.         }
  35.        //消息确认    // 参数一:消息的唯一标识,参数二:是否批量确认 false 确认一个消息,true 批量确认
  36.         channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  37.     }
  38.     //监听下架队列
  39.     //监听上架队列
  40.     @SneakyThrows
  41.     @RabbitListener(bindings = @QueueBinding(
  42.             value =@Queue(value = MqConst.QUEUE_GOODS_LOWER,durable = "true",autoDelete = "false"),
  43.             exchange =@Exchange(value = MqConst.EXCHANGE_DIRECT_GOODS,autoDelete = "false"),
  44.             key = {MqConst.ROUTING_GOODS_LOWER}
  45.     ))
  46.     public void lowerGoodsToEs(Long skuId, Message message, Channel channel){
  47.         try {
  48.             //判断
  49.             if(skuId!=null){
  50.                 //操作搜索模块操作ES的下架方法
  51.                 searchService.lowerGoods(skuId);
  52.             }
  53.         } catch (Exception e) {
  54.             //写入日志文件 ,,写入数据库, 对接程序员手机短信
  55.             e.printStackTrace();
  56.         }
  57.            //消息确认    // 参数一:消息的唯一标识,参数二:是否批量确认 false 确认一个消息,true 批量确认
  58.         channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  59.     }
  60. }
复制代码
测试
启动后台管理页面
http://localhost:8888/#/product/sku/list
操作商品的上架,下架。动态更改es中的数据。

全部下架,页面查看:


上架几个:




(2)延迟消息

前面解决了搜刮与商品服务的题目,下面解决这个题目订单取消题目:

 
延迟消息有两种实现方案:

  • 基于死信队列
  • 集成延迟插件
1.基于死信实现延迟消息

使用RabbitMQ来实现延迟消息必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现延迟队列
消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。凌驾了这个时间,我们认为这个消息就死了,称之为死信。
怎样设置TTL:
我们创建一个队列queue.temp,在Arguments 中添加x-message-ttl 为5000 (单位是毫秒),那所在压在这个队列的消息在5秒后会消散。
死信互换机  Dead Letter Exchanges


一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
(1) 一个消息被Consumer拒收了,而且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
(2)上面的消息的TTL到了,消息逾期了。
(3)队列的长度限制满了。排在前面的消息会被抛弃大概扔到死信路由上。
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息逾期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
 

我们现在可以测试一下延迟队列。
(1)创建死信队列
(2)创建互换机
(3)创建互换器与队列之间的绑定
(4)创建队列
代码实现


 
在service-mq 中添加配置类:前面我们使用注解的方式举行创建互换机队列,下面我们用配置类举行创建 

  1. package com.atguigu.gmall.mq.config;
  2. import org.springframework.amqp.core.DirectExchange;
  3. import org.springframework.amqp.core.Queue;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class DeadLetterMqConfig {
  8.     // 声明一些变量
  9.        public static final String exchange_dead = "exchange.dead";
  10.     public static final String routing_dead_1 = "routing.dead.1";
  11.     public static final String routing_dead_2 = "routing.dead.2";
  12.     public static final String queue_dead_1 = "queue.dead.1";
  13.     public static final String queue_dead_2 = "queue.dead.2";
  14.     // 定义交换机
  15.     @Bean
  16.     public DirectExchange exchange(){
  17.         return new DirectExchange(exchange_dead,true,false,null);
  18.     }
  19.     @Bean
  20.     public Queue queue1(){
  21.         // 设置如果队列一 出现问题,则通过参数转到exchange_dead,routing_dead_2 上!
  22.         HashMap<String, Object> map = new HashMap<>();
  23.         // 参数绑定 此处的key 固定值,不能随意写
  24.         map.put("x-dead-letter-exchange",exchange_dead);
  25.         map.put("x-dead-letter-routing-key",routing_dead_2);
  26.         // 设置延迟时间
  27.         map.put("x-message-ttl", 10 * 1000);
  28.         // 队列名称,是否持久化,是否独享、排外的【true:只可以在本次连接中访问】,是否自动删除,队列的其他属性参数
  29.         return new Queue(queue_dead_1,true,false,false,map);
  30.     }
  31.     @Bean
  32.     public Binding binding(){
  33.         // 将队列一 通过routing_dead_1 key 绑定到exchange_dead 交换机上
  34.         return BindingBuilder.bind(queue1()).to(exchange()).with(routing_dead_1);
  35.     }
  36.     // 这个队列二就是一个普通队列
  37.     @Bean
  38.     public Queue queue2(){
  39.         return new Queue(queue_dead_2,true,false,false,null);
  40.     }
  41.     // 设置队列二的绑定规则
  42.     @Bean
  43.     public Binding binding2(){
  44.         // 将队列二通过routing_dead_2 key 绑定到exchange_dead交换机上!
  45.         return BindingBuilder.bind(queue2()).to(exchange()).with(routing_dead_2);
  46.     }
  47. }
复制代码
配置发送消息
  1. package com.atguigu.gmall.mq.controller;
  2. @RestController
  3. @RequestMapping("/mq")
  4. @Slf4j
  5. public class MqController {
  6.    @Autowired
  7.    private RabbitTemplate rabbitTemplate;
  8.    @Autowired
  9.    private RabbitService rabbitService;
  10. @GetMapping("sendDeadLettle")
  11.    public Result sendDeadLettle() {
  12.       SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  13.      this.rabbitService.convertAndSend(DeadLetterMqConfig.exchange_dead, DeadLetterMqConfig.routing_dead_1, "我是延迟消息");
  14.       System.out.println(" 消息发送时间:"+sdf.format(new Date()) );
  15.       return Result.ok();
  16.    }
  17. }
复制代码
 
消息接收方
  1. package com.atguigu.gmall.mq.receiver;
  2. @Component
  3. @Configuration
  4. public class DeadLetterReceiver {
  5.     //消费的是队列2,队列1,没有人消息,超过超时时间后会变成死信
  6.     @RabbitListener(queues = DeadLetterMqConfig.queue_dead_2)
  7.     public void get(String msg) {
  8.         System.out.println("Receive:" + msg);
  9.         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  10.         System.out.println("消息接收时间: " + sdf.format(new Date()));
  11.          System.out.println("消息内容为:"+msg)
  12.     }
  13. }
复制代码



 
2.基于延迟插件实现延迟消息

Rabbitmq实现了一个插件x-delay-message来实现延时队列
插件安装
1. 首先我们将刚下载下来的rabbitmq_delayed_message_exchange-3.9.0.ez文件上传到RabbitMQ所在服务器,下载地址:https://www.rabbitmq.com/community-plugins.html
2. 切换到插件所在目次,执行 docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins 下令,将刚插件拷贝到容器内plugins目次下
3. 执行 docker exec -it rabbitmq /bin/bash 下令进入到容器内部,并 cd plugins 进入plugins目次
4. 执行 ls -l|grep delay  下令查看插件是否copy成功
5. 在容器内plugins目次下,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange  下令启用插件
6. exit下令退出RabbitMQ容器内部,然后执行 docker restart rabbitmq 下令重启RabbitMQ容器
代码实现

在service-mq 中添加类

配置队列
  1. package com.atguigu.gmall.mq.config;
  2. @Configuration
  3. public class DelayedMqConfig {
  4.     public static final String exchange_delay = "exchange.delay";
  5.     public static final String routing_delay = "routing.delay";
  6.     public static final String queue_delay_1 = "queue.delay.1";
  7.      @Bean
  8.     public Queue delayQeue1() {
  9.         // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
  10.         return new Queue(queue_delay_1, true);
  11.     }
  12.     @Bean
  13.     public CustomExchange delayExchange() {
  14.         Map<String, Object> args = new HashMap<String, Object>();
  15.         args.put("x-delayed-type", "direct");
  16.         return new CustomExchange(exchange_delay, "x-delayed-message", true, false, args);
  17.     }
  18.     @Bean
  19.     public Binding delayBbinding1() {
  20.         return BindingBuilder.bind(delayQeue1()).to(delayExchange()).with(routing_delay).noargs();
  21.     }
  22. }
复制代码
发送消息:这个延迟插件,需要使用rabbitTemplate,上面的可以使用rabbitService封装的service
  1. @GetMapping("sendelay")
  2. public Result sendDelay() {
  3.    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  4.    this.rabbitTemplate.convertAndSend(DelayedMqConfig.exchange_delay, DelayedMqConfig.routing_delay, "我是延迟插件的消息", new MessagePostProcessor() {
  5.       @Override
  6.       public Message postProcessMessage(Message message) throws AmqpException {
  7.          message.getMessageProperties().setDelay(10 * 1000);//消息的延迟时间
  8.          System.out.println("延迟插件消息发送时间:"+sdf.format(new Date()) );
  9.          return message;//放行消息
  10.       }
  11.    });
  12.    return Result.ok();
  13. }
复制代码
接收消息

  1. package com.atguigu.gmall.mq.receiver;
  2. @Component
  3. public class DelayReceiver {
  4.     @RabbitListener(queues = DelayedMqConfig.queue_delay_1)
  5.     public void get(String msg) {
  6.         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  7.         System.out.println("消息接受时间: " + sdf.format(new Date()));
  8.         System.out.println("消息内容是:"+msg)
  9.     }
  10. }
复制代码





报错了不用管,因为我们灭有使用封装的发送消息的rabbittservice 发送消息,没有往Redis放消息,发送延迟消息发送失败,会消息回调重新发送,会获取key,没有key就会报错:


修改消息回调 

加一个为空的判定:
而且在接受消息的时候加一个消息确认 

 更改后就不报错了


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

莱莱

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表