利用rabbitmq发送消息和caffeineCache生存当地

打印 上一主题 下一主题

主题 907|帖子 907|积分 2721

目录
利用stock_job工程采集到国内大盘的最新生意业务时间的信息并插入数据库,利用rabbitmq发送消息
 1.导入依赖
2.编写yml文件,设置连接rabbitmq的信息
3.编写mq的设置类,生成交换机,消息队列,并将他们绑定
4.采集最新到最新的国内大盘数据信息,并插入到数据时时,发送消息
5.查看消息队列是否存在消息
在stock_backend工程中定义消息监听类,并设置当地缓存
1.导入mq依赖和caffeine
 2.编写yml文件
3.编写caffeine的设置类,和mq的设置类
4.编写消息监听类
5. stockService.getInnerMarket()方法
6.debug启动SpringBoot引导类
自动跳到消息监听类
清空缓存,然后调用mapper方法重新向数据库查询最新的国内大盘数据,再重新生存到当地缓存中
​编辑 查看caffeineCache成员变量的值
成功缓存key为innerMarketInfosKey
的数据


当我们在查询最新的股票大盘数据时,我们会频仍的向mysql查询数据,会给mysql造成很大的压力,所以我们可以利用caffeineCache当地缓存。
大抵思绪:
我们先利用stock_job工程采集到国内大盘的最新生意业务时间的信息时并将数据插入数据库,利用rabbitmq发送消息(消息为当前时间)
在stock_backend工程定义消息队列监听类,如果吸收到的时间和发送消息的时间相差一分钟时,就报错,不凌驾一分钟,就清除之前的当地缓存,再通过mapper向数据库查询数据,然后再重新把最新的国内大盘数据存入当地缓存中。
这样一来,如果stock_job工程没有向数据库中插入最新生意业务时间的国内大盘数据信息,在stock_backend中查询最新生意业务时间的国内大盘数据信息时,就会直接从当地缓存中获取数据信息。
利用stock_job工程采集到国内大盘的最新生意业务时间的信息并插入数据库,利用rabbitmq发送消息

 1.导入依赖

  1. <!--        导入mq依赖-->
  2.         <dependency>
  3.             <groupId>org.springframework.boot</groupId>
  4.             <artifactId>spring-boot-starter-amqp</artifactId>
  5.         </dependency>
复制代码
2.编写yml文件,设置连接rabbitmq的信息

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.230.100 # rabbitMQ的ip地址
  4.     port: 5672 # 端口
  5.     username: hhh
  6.     password: 1234
  7.     virtual-host: /
复制代码
3.编写mq的设置类,生成交换机,消息队列,并将他们绑定

  1. @Configuration
  2. public class MqConfig {
  3.     /**
  4.      * 重新定义消息序列化的方式,改为基于json格式序列化和反序列化
  5.      */
  6.     @Bean
  7.     public MessageConverter messageConverter() {
  8.         return new Jackson2JsonMessageConverter();
  9.     }
  10.     /**
  11.      * 国内大盘信息队列
  12.      */
  13.     @Bean
  14.     public Queue innerMarketQueue() {
  15.         return new Queue("innerMarketQueue", true);
  16.     }
  17.     /**
  18.      * 定义路由股票信息的交换机
  19.      */
  20.     @Bean
  21.     public TopicExchange innerMarketTopicExchange() {
  22.         return new TopicExchange("stockExchange", true, false);
  23.     }
  24.     /**
  25.      * 绑定队列到指定交换机
  26.      */
  27.     @Bean
  28.     public Binding bindingInnerMarketExchange() {
  29.         return BindingBuilder.bind(innerMarketQueue()).to(innerMarketTopicExchange())
  30.                 .with("inner.market");//设置routingKey
  31.     }
  32. }
复制代码
4.采集最新到最新的国内大盘数据信息,并插入到数据时时,发送消息

  1. @Override
  2.     public void getInnerMarketInfo() {
  3.                 //......
  4.         //解析的数据批量插入数据库
  5.         int count= stockMarketIndexInfoMapper.insertBatch(entities);
  6.         log.info("当前插入了:{}行数据",count);
  7.                 //通知后台终端刷新本地缓存,发送的日期数据是告知对方当前更新的股票数据所在时间点
  8.         rabbitTemplate.convertAndSend("stockExchange","inner.market",new Date());
  9.     }
复制代码
5.查看消息队列是否存在消息



在stock_backend工程中定义消息监听类,并设置当地缓存

1.导入mq依赖和caffeine

  1. <dependencies>        <!--        导入mq依赖-->
  2.         <dependency>
  3.             <groupId>org.springframework.boot</groupId>
  4.             <artifactId>spring-boot-starter-amqp</artifactId>
  5.         </dependency><!--        当地缓存依赖-->        <dependency>            <groupId>com.github.ben-manes.caffeine</groupId>            <artifactId>caffeine</artifactId>        </dependency>
复制代码
 2.编写yml文件

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.230.130 # rabbitMQ的ip地址
  4.     port: 5672 # 端口
  5.     username: hhh
  6.     password: 1234
  7.     virtual-host: /
复制代码
3.编写caffeine的设置类,和mq的设置类

  1. /**
  2.      * 构建缓存bean
  3.      * @return
  4.      */
  5.     @Bean
  6.     public Cache<String,Object> caffeineCache(){
  7.         Cache<String, Object> cache = Caffeine
  8.                 .newBuilder()
  9.                 .maximumSize(200)//设置缓存数量上限
  10. //                .expireAfterAccess(1, TimeUnit.SECONDS)//访问1秒后删除
  11. //                .expireAfterWrite(1,TimeUnit.SECONDS)//写入1秒后删除
  12.                 .initialCapacity(100)// 初始的缓存空间大小
  13.                 .recordStats()//开启统计
  14.                 .build();
  15.         return cache;
  16.     }
复制代码
这里不用定义交换机和消息队列 
  1. @Configuration
  2. public class MqConfig {
  3.     /**
  4.      * 重新定义消息序列化的方式,改为基于json格式序列化和反序列化
  5.      */
  6.     @Bean
  7.     public MessageConverter messageConverter() {
  8.         return new Jackson2JsonMessageConverter();
  9.     }
  10. }
复制代码
4.编写消息监听类

  1. @Component
  2. @Slf4j
  3. public class MqListener {
  4.     @Autowired
  5.     private Cache<String,Object> caffeineCache;
  6.     @Autowired
  7.     private StockService stockService;
  8.     @RabbitListener(queues = "innerMarketQueue")
  9.     public void acceptInnerMarketInfo(Date date){//消息队列里的数据类型是Date,所以接收的参数类型也是Date
  10.         long differTime = DateTime.now().getMillis() - new DateTime(date).getMillis();
  11.         if(differTime>60000L){
  12.             log.error("采集国内大盘时间点:{},同步超时:{}ms",new DateTime(date).toString("yyyy-MM-dd HH:mm:ss"),differTime);
  13.         }
  14.         //发送信息和接收信息在一分钟以内
  15.         //删除key为innerMarketInfosKey的缓存
  16.         caffeineCache.invalidate("innerMarketInfosKey");
  17.         //重新获取数据
  18.         //调用服务更新缓存
  19.         stockService.getInnerMarket();
  20.     }
  21. }
复制代码
5. stockService.getInnerMarket()方法

  1.     /**
  2.      * 获取国内大盘最新的数据
  3.      * @return
  4.      */
  5.     @Override
  6.     public R<List<InnerMarketDomain>> getInnerMarket() {
  7.         //获取key为innerMarketInfosKey的本地缓存数据,如果不存在,就去数据库中查询数据,并存入本地缓存中
  8.         //本地缓存默认一分钟消失
  9.         R<List<InnerMarketDomain>> result= (R<List<InnerMarketDomain>>) caffeineCache.get("innerMarketInfosKey", key->{
  10.             //1.获取当前时间的最新交易点(精确到分钟,秒和毫秒置为0)
  11.             Date curDate = DateTimeUtil.getLastDate4Stock(DateTime.now()).toDate();
  12.             //Date curDate = MyDateTimeUtil.getLateDate4Stock(DateTime.now()).toDate();
  13.             //mock data 等后续股票采集job工程完成,再将此代码删除
  14.             curDate=DateTime.parse("2021-12-28 09:31:00", DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toDate();
  15.             //log.info("curDate:{}",curDate);
  16.             //2.获取国内大盘的编码集合
  17.             List<String> mcodes = stockInfoConfig.getInner();
  18.             //3.调用mapper进行查询
  19.             List<InnerMarketDomain> data=stockMarketIndexInfoMapper.getInnerMarket(curDate,mcodes);
  20.             //4.返回数据
  21.            return R.ok(data);
  22.         });
  23.        return result;
  24.     }
复制代码
6.debug启动SpringBoot引导类

自动跳到消息监听类


清空缓存,然后调用mapper方法重新向数据库查询最新的国内大盘数据,再重新生存到当地缓存中

 查看caffeineCache成员变量的值



成功缓存key为innerMarketInfosKey

的数据



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

络腮胡菲菲

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表