ToB企服应用市场:ToB评测及商务社交产业平台

标题: SpringBoot+Redis+RabbitMQ完成增删改查 [打印本页]

作者: 商道如狼道    时间: 2024-11-13 18:41
标题: SpringBoot+Redis+RabbitMQ完成增删改查
各部分分工职责

RabbitMQ负责添加、修改、删除的异步利用
Redis负责数据的缓存
RabbitMQ里面角色职责简单描述

RabbitMQ里面有几个角色要先分清以及他们的对应关系:
交换机、队列、路由键
交换机和队列是一对多
队列和路由键是多对多
然后就是消息的发送者(生产者)和消息的继承者(消耗者)
此案例中,添加修改删除要从生产者发送到消耗者,也就是说,消耗者才是具体干活的角色,消息生产者只需要把消息发送到对应的队列中,由交换机根据路由键发送到对应的队列中
Redis职责简单描述

Redis只需要把要看的数据以及新添加的数据,添加到缓冲中即可,如果缓冲中没有,就从数据库查,再添加到缓存中,所以此次数据类型用的Hash
pom.xml文件坐标引入

  1.         <!-- redis工具 -->
  2.         <dependency>
  3.             <groupId>org.springframework.boot</groupId>
  4.             <artifactId>spring-boot-starter-data-redis</artifactId>
  5.         </dependency>
  6.         <!-- JSON工具 https://mvnrepository.com/artifact/com.alibaba/fastjson -->
  7.         <dependency>
  8.             <groupId>com.alibaba</groupId>
  9.             <artifactId>fastjson</artifactId>
  10.             <version>1.2.47</version>
  11.         </dependency>
  12.         <!--spring2.X集成redis所需common-pool2-->
  13.         <dependency>
  14.             <groupId>org.apache.commons</groupId>
  15.             <artifactId>commons-pool2</artifactId>
  16.             <version>2.6.0</version>
  17.         </dependency>
  18.         <!-- Spring Boot Starter AMQP -->
  19.         <dependency>
  20.             <groupId>org.springframework.boot</groupId>
  21.             <artifactId>spring-boot-starter-amqp</artifactId>
  22.         </dependency>
复制代码
SpringBoot的设置文件

  1. # RabbitMQ:配置,服务器地址,端口,用户名,密码
  2. spring.rabbitmq.host=localhost
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=guest
  5. spring.rabbitmq.password=guest
  6. # 使用 Redis 作为缓存存储,具体配置:服务器地址,端口,密码
  7. spring.cache.type=redis
  8. spring.redis.host=127.0.0.1
  9. spring.redis.port=6379
  10. #spring.redis.password=root
  11. spring.redis.password=
  12. # 连接工厂使用的数据库索引,redis默认有16个db,索引0-15
  13. spring.redis.database=0
  14. #spring.redis.timeout=0
  15. # 连接池最大连接数(使用负值表示没有限制) 这个值决定了同时可以有多少个活动的连接
  16. spring.redis.lettuce.pool.max-active=8
  17. ## 连接池最大阻塞等待时间(-1表示没有限制) 当连接池中的所有连接都被占用时,新的请求会等待一段时间
  18. spring.redis.lettuce.pool.max-wait=-1
  19. ## 连接池中的最大空闲连接,连接池中最多可以有多少个空闲的连接
  20. spring.redis.lettuce.pool.max-idle=8
  21. ## 连接池中的最小空闲连接,连接池中至少要有多少个空闲的连接
  22. spring.redis.lettuce.pool.min-idle=0
复制代码
Redis设置类

  1. @EnableCaching      //开启缓存
  2. @Configuration
  3. public class RedisConfig {
  4.     @Bean
  5.     public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
  6.         RedisTemplate<String, Object> template = new RedisTemplate<>();
  7.         template.setConnectionFactory(factory);
  8.         // 使用 Jackson2JsonRedisSerializer 来序列化和反序列化 Redis 的值(Value)
  9.         GenericJackson2JsonRedisSerializer jackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
  10.         // 使用 StringRedisSerializer 来序列化和反序列化 Redis 的键(Key)
  11.         StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
  12.         // 设置键(key)的序列化器
  13.         template.setKeySerializer(stringRedisSerializer);
  14.         // 设置值(value)的序列化器
  15.         template.setValueSerializer(jackson2JsonRedisSerializer);
  16.         // 设置 Hash 键(key)的序列化器
  17.         template.setHashKeySerializer(stringRedisSerializer);
  18.         // 设置 Hash 值(value)的序列化器
  19.         template.setHashValueSerializer(jackson2JsonRedisSerializer);
  20.         template.afterPropertiesSet();
  21.         return template;
  22.     }
  23.     @Bean
  24.     public CacheManager cacheManager(RedisConnectionFactory factory) {
  25.         RedisSerializer<String> redisSerializer = new StringRedisSerializer();
  26.         Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
  27.         //解决查询缓存转换异常的问题
  28.         ObjectMapper om = new ObjectMapper();
  29.         om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
  30.         om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
  31.         jackson2JsonRedisSerializer.setObjectMapper(om);
  32.         // 配置序列化(解决乱码的问题),过期时间600秒
  33.         RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
  34.                 .entryTtl(Duration.ofSeconds(600))  //设置数据过期时间600秒
  35.                 .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
  36.                 .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
  37.                 .disableCachingNullValues();
  38.         RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
  39.                 .cacheDefaults(config)
  40.                 .build();
  41.         return cacheManager;
  42.     }
  43. }
复制代码
RabbitMQ设置类

如果没有设置logback,就把log.info()的代码全部删了,不影响运行,后面有的话也都删了
  1. @Configuration
  2. public class RabbitMQConfig {
  3.     public static final String EXCHANGE_NAME = "bill.exchange"; // 交换机名称
  4.     public static final String QUEUE_SAVE_UPDATE = "bill.saveupdate"; // 保存修改队列
  5.     public static final String QUEUE_DELETE = "bill.delete"; // 删除队列
  6.     public static final String ROUTING_SAVE_UPDATE_KEY = "bill.saveupdatekey"; // 保存修改路由键
  7.     public static final String ROUTING_DELETE_KEY = "bill.deletekey"; // 删除路由键
  8.     /**
  9.      * 添加/修改定义队列
  10.      * @return 队列对象
  11.      */
  12.     @Bean
  13.     public Queue queueSaveUpdate() {
  14.         log.info(QUEUE_SAVE_UPDATE + ":RabbitMQ队列初始化成功:" + LocalDateTime.now());
  15.         return new Queue(QUEUE_SAVE_UPDATE, true); // durable: 是否持久化
  16.     }
  17.     /**
  18.      * 删除定义队列
  19.      * @return 队列对象
  20.      */
  21.     @Bean
  22.     public Queue queueDelete() {
  23.         log.info(QUEUE_DELETE + ":RabbitMQ队列初始化成功:" + LocalDateTime.now());
  24.         return new Queue(QUEUE_DELETE, true); // durable: 是否持久化
  25.     }
  26.     /**
  27.      * 定义交换机
  28.      * @return 交换机对象
  29.      */
  30.     @Bean
  31.     public TopicExchange exchange() {
  32.         log.info(EXCHANGE_NAME + ":RabbitMQ交换机初始化成功:" + LocalDateTime.now());
  33.         return new TopicExchange(EXCHANGE_NAME);
  34.     }
  35.     /**
  36.      * 绑定队列和交换机
  37.      * @return 绑定对象
  38.      */
  39.     @Bean
  40.     public Binding bindingSaveUpdate() {
  41.         log.info(ROUTING_SAVE_UPDATE_KEY + ":RabbitMQ绑定队列和交换机成功:" + LocalDateTime.now());
  42.         return BindingBuilder.bind(queueSaveUpdate()).to(exchange()).with(ROUTING_SAVE_UPDATE_KEY);
  43.     }
  44.     /**
  45.      * 绑定队列和交换机
  46.      * @return 绑定对象
  47.      */
  48.     @Bean
  49.     public Binding bindingDelete() {
  50.         log.info(ROUTING_DELETE_KEY + ":RabbitMQ绑定队列和交换机成功:" + LocalDateTime.now());
  51.         return BindingBuilder.bind(queueDelete()).to(exchange()).with(ROUTING_DELETE_KEY);
  52.     }
  53. }
复制代码
RabbitMQ设置消息发送者(生产者)

也就是说,在需要异步调用的地方,注入BillMessageSender,然后,调对应的方法就可以了
  1. @Service
  2. public class BillMessageSender {
  3.     @Autowired
  4.     private RabbitTemplate rabbitTemplate;
  5.     /**
  6.      * 发送用户添加修改消息
  7.      * @param bill 参数对象
  8.      */
  9.     public void sendBillSaveUpdateMessage(Bill bill) {
  10.         rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_SAVE_UPDATE_KEY, bill);
  11.     }
  12.     /**
  13.      * 发送用户删除消息
  14.      * @param ids 参数列表
  15.      */
  16.     public void sendBillDeleteMessage(List<Long> ids) {
  17.         rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_DELETE_KEY, ids);
  18.     }
  19. }
复制代码
异步发送消息

这是在Service层,
所以把ApiResult()这个自定义返回类换成你们的就行了
ObjectMapper这个工具重要是用来处理JSON数据的,这里我用是由于为了方便实体类和Map之间相互转换的,BillMapper是我自己用到的,这个可以换成你们自己的,不影响,剩下的就基本上没啥了,有不懂的可以批评区问,看到会回复
  1. @Slf4j
  2. @Service
  3. public class BillRedisService {
  4.     @Resource
  5.     private BillMapper billMapper;
  6.     @Resource
  7.     private ObjectMapper objectMapper;
  8.     @Resource
  9.     private BillMessageSender billMessageSender;
  10.     @Resource
  11.     private RedisTemplate<String, Object> redisTemplate;
  12.     // redis 中 bill的键
  13.     private static final String BILL_REDIS_KEY = "bill:info";
  14.     /**
  15.      * 保存或者修改信息
  16.      * @param bill
  17.      * @return
  18.      */
  19.     @Transactional
  20.     public ApiResult saveUpdateByRedis(Bill bill){
  21.         if(bill.getId() == null){
  22.             billMessageSender.sendBillSaveUpdateMessage(bill);   //把要添加的信息放入消息队列
  23.         }else {
  24.             log.info("[ " + bill.getId() + " ] 修改缓存中的数据");
  25.             String key = BILL_REDIS_KEY + bill.getId();          //找到要修改值对应的redis的key
  26.             Map map = objectMapper.convertValue(bill, Map.class);//把对象转换成map
  27.             redisTemplate.opsForHash().putAll(key, map);         //更新数据到缓存中
  28.             billMessageSender.sendBillSaveUpdateMessage(bill);   //把要修改的信息放入消息队列
  29.         }
  30.         return ApiResult.success();
  31.     }
  32.     /**
  33.      * 从缓存取数据
  34.      * @param id
  35.      * @return
  36.      */
  37.     @Transactional
  38.     public Bill selectPrimaryKeyByRedis(Long id){
  39.         Bill bill;
  40.         String key = BILL_REDIS_KEY + id;
  41.         //有这个键就取数据,不然就查数据库
  42.         if (redisTemplate.hasKey(key)) {
  43.             log.info("[ " + id + " ] 从缓存中取数据");
  44.             Map<Object, Object> map = redisTemplate.opsForHash().entries(key);
  45.             bill = objectMapper.convertValue(map, Bill.class);
  46.         }else{
  47.             log.info("[ " + id + " ] 缓存中没有,向数据库中查询数据");
  48.             bill = billMapper.selectByPrimaryKey(id);
  49.             String putKey = BILL_REDIS_KEY + bill.getId();          //找到要修改值对应的redis的key
  50.             Map map = objectMapper.convertValue(bill, Map.class);   //把对象转换成map
  51.             redisTemplate.opsForHash().putAll(putKey, map);         //更新数据到缓存中
  52.         }
  53.         return bill;
  54.     }
  55.     /**
  56.      * 删除方法
  57.      * @param ids
  58.      * @return
  59.      */
  60.     @Transactional
  61.     public ApiResult delByRedis(List<Long> ids) {
  62.         log.info("[ " + Arrays.toString(ids.toArray()) + " ] 以上数据要被删除");
  63.         for (Long id : ids) {
  64.             String key = BILL_REDIS_KEY + id;
  65.             redisTemplate.delete(key);  //删除缓存中的数据
  66.         }
  67.         //数据库信息交给消息队列删除
  68.         billMessageSender.sendBillDeleteMessage(ids);
  69.         return ApiResult.success();
  70.     }
  71. }
复制代码
RabbitMQ设置消息接收者(消耗者)

添加上这个注解@RabbitListener(queues = RabbitMQConfig.QUEUE_SAVE_UPDATE),
并指明监听的队列queues = RabbitMQConfig.QUEUE_SAVE_UPDATE
就能获取到消息发送者发送过来的使命以及使命参数了,就可以在这里写处理逻辑了,如果没有设置logback,可以把@Slf4j,以及log.info(),这两个代码删除了
  1. @Slf4j
  2. @Component
  3. public class BillMessageReceiver {
  4.     @Autowired
  5.     private BillMapper billMapper;
  6.     @Autowired
  7.     private ObjectMapper objectMapper;
  8.     @Resource
  9.     private RedisTemplate<String, Object> redisTemplate;
  10.     // redis 中 bill的键
  11.     private static final String BILL_REDIS_KEY = "bill:info";
  12.     /**
  13.      * 处理添加和修改操作
  14.      * @param bill 参数对象
  15.      */
  16.     @Transactional
  17.     @RabbitListener(queues = RabbitMQConfig.QUEUE_SAVE_UPDATE)
  18.     public void receiveBillSaveUpdateMessage(Bill bill) {
  19.         log.info(RabbitMQConfig.QUEUE_SAVE_UPDATE + " 队列获取到数据:" + bill.toString());
  20.         if (bill == null) {
  21.             return;
  22.         }
  23.         if (bill.getId() == null) {
  24.             bill.setDeleted(0);
  25.             bill.setCreateTime(new Date());
  26.             billMapper.insertSelective(bill);
  27.         } else {
  28.             bill.setUpdateTime(new Date());
  29.             billMapper.updateByPrimaryKeySelective(bill);
  30.         }
  31.         String key = BILL_REDIS_KEY + bill.getId();          //添加后就有主键了,拼接成redis的key
  32.         Map map = objectMapper.convertValue(bill, Map.class);//把对象转换成map
  33.         redisTemplate.opsForHash().putAll(key, map);         //把添加的数据放到缓存中
  34.     }
  35.     /**
  36.      * 删除数据
  37.      * @param ids 参数列表
  38.      */
  39.     @Transactional
  40.     @RabbitListener(queues = RabbitMQConfig.QUEUE_DELETE)
  41.     public void receiveBillDeleteMessage(List<Long> ids) {
  42.         log.info(RabbitMQConfig.QUEUE_DELETE + " 队列获取到数据:" + Arrays.toString(ids.toArray()));
  43.         Bill bill = new Bill();
  44.         bill.setDeleted(1);
  45.         Example example = new Example(Bill.class);
  46.         Example.Criteria criteria = example.createCriteria();
  47.         criteria.andIn("id",ids);
  48.         billMapper.updateByExampleSelective(bill,example);
  49.     }
  50. }
复制代码
总结

再把逻辑捋一下
添加修改删除,这些利用同一发送给RabbitMQ,由RabbitMQ的消耗者处理后续利用
查看详情,添加和更新的数据,交给Redis缓存,缓存没有,就查数据库,然后再缓存到Redis中,就第一遍查数据库,后续走的都是缓存
以上代码实现的功能就是,
全部数据查询还是走的数据库(数据量不多),但是单个查询,查询详情,先查缓存,缓存没有再查数据库,然后再添加到缓存中,下次查询就不走数据库了
添加修改删除同一发送给RabbitMQ消息队列,由消息队列异步完成后续的使命,并更新大概删除对应的缓存
这比之前单独的对数据库利用,多了2层逻辑,RabbitMQ和缓存的处理,这个例子就是简单的使用RabbitMQ和Redis,算是个小入门,如果有其他好的建议,可以批评一下,非常感谢!


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4