各部分分工职责
RabbitMQ负责添加、修改、删除的异步利用
Redis负责数据的缓存
RabbitMQ里面角色职责简单描述
RabbitMQ里面有几个角色要先分清以及他们的对应关系:
交换机、队列、路由键
交换机和队列是一对多
队列和路由键是多对多
然后就是消息的发送者(生产者)和消息的继承者(消耗者)
此案例中,添加修改删除要从生产者发送到消耗者,也就是说,消耗者才是具体干活的角色,消息生产者只需要把消息发送到对应的队列中,由交换机根据路由键发送到对应的队列中
Redis职责简单描述
Redis只需要把要看的数据以及新添加的数据,添加到缓冲中即可,如果缓冲中没有,就从数据库查,再添加到缓存中,所以此次数据类型用的Hash
pom.xml文件坐标引入
- <!-- redis工具 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-redis</artifactId>
- </dependency>
- <!-- JSON工具 https://mvnrepository.com/artifact/com.alibaba/fastjson -->
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.47</version>
- </dependency>
- <!--spring2.X集成redis所需common-pool2-->
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-pool2</artifactId>
- <version>2.6.0</version>
- </dependency>
- <!-- Spring Boot Starter AMQP -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
复制代码 SpringBoot的设置文件
- # RabbitMQ:配置,服务器地址,端口,用户名,密码
- spring.rabbitmq.host=localhost
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
- # 使用 Redis 作为缓存存储,具体配置:服务器地址,端口,密码
- spring.cache.type=redis
- spring.redis.host=127.0.0.1
- spring.redis.port=6379
- #spring.redis.password=root
- spring.redis.password=
- # 连接工厂使用的数据库索引,redis默认有16个db,索引0-15
- spring.redis.database=0
- #spring.redis.timeout=0
- # 连接池最大连接数(使用负值表示没有限制) 这个值决定了同时可以有多少个活动的连接
- spring.redis.lettuce.pool.max-active=8
- ## 连接池最大阻塞等待时间(-1表示没有限制) 当连接池中的所有连接都被占用时,新的请求会等待一段时间
- spring.redis.lettuce.pool.max-wait=-1
- ## 连接池中的最大空闲连接,连接池中最多可以有多少个空闲的连接
- spring.redis.lettuce.pool.max-idle=8
- ## 连接池中的最小空闲连接,连接池中至少要有多少个空闲的连接
- spring.redis.lettuce.pool.min-idle=0
复制代码 Redis设置类
- @EnableCaching //开启缓存
- @Configuration
- public class RedisConfig {
- @Bean
- public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
- RedisTemplate<String, Object> template = new RedisTemplate<>();
- template.setConnectionFactory(factory);
- // 使用 Jackson2JsonRedisSerializer 来序列化和反序列化 Redis 的值(Value)
- GenericJackson2JsonRedisSerializer jackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
- // 使用 StringRedisSerializer 来序列化和反序列化 Redis 的键(Key)
- StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
- // 设置键(key)的序列化器
- template.setKeySerializer(stringRedisSerializer);
- // 设置值(value)的序列化器
- template.setValueSerializer(jackson2JsonRedisSerializer);
- // 设置 Hash 键(key)的序列化器
- template.setHashKeySerializer(stringRedisSerializer);
- // 设置 Hash 值(value)的序列化器
- template.setHashValueSerializer(jackson2JsonRedisSerializer);
- template.afterPropertiesSet();
- return template;
- }
- @Bean
- public CacheManager cacheManager(RedisConnectionFactory factory) {
- RedisSerializer<String> redisSerializer = new StringRedisSerializer();
- Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
- //解决查询缓存转换异常的问题
- ObjectMapper om = new ObjectMapper();
- om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
- om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
- jackson2JsonRedisSerializer.setObjectMapper(om);
- // 配置序列化(解决乱码的问题),过期时间600秒
- RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
- .entryTtl(Duration.ofSeconds(600)) //设置数据过期时间600秒
- .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
- .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
- .disableCachingNullValues();
- RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
- .cacheDefaults(config)
- .build();
- return cacheManager;
- }
- }
复制代码 RabbitMQ设置类
如果没有设置logback,就把log.info()的代码全部删了,不影响运行,后面有的话也都删了
- @Configuration
- public class RabbitMQConfig {
- public static final String EXCHANGE_NAME = "bill.exchange"; // 交换机名称
- public static final String QUEUE_SAVE_UPDATE = "bill.saveupdate"; // 保存修改队列
- public static final String QUEUE_DELETE = "bill.delete"; // 删除队列
- public static final String ROUTING_SAVE_UPDATE_KEY = "bill.saveupdatekey"; // 保存修改路由键
- public static final String ROUTING_DELETE_KEY = "bill.deletekey"; // 删除路由键
- /**
- * 添加/修改定义队列
- * @return 队列对象
- */
- @Bean
- public Queue queueSaveUpdate() {
- log.info(QUEUE_SAVE_UPDATE + ":RabbitMQ队列初始化成功:" + LocalDateTime.now());
- return new Queue(QUEUE_SAVE_UPDATE, true); // durable: 是否持久化
- }
- /**
- * 删除定义队列
- * @return 队列对象
- */
- @Bean
- public Queue queueDelete() {
- log.info(QUEUE_DELETE + ":RabbitMQ队列初始化成功:" + LocalDateTime.now());
- return new Queue(QUEUE_DELETE, true); // durable: 是否持久化
- }
- /**
- * 定义交换机
- * @return 交换机对象
- */
- @Bean
- public TopicExchange exchange() {
- log.info(EXCHANGE_NAME + ":RabbitMQ交换机初始化成功:" + LocalDateTime.now());
- return new TopicExchange(EXCHANGE_NAME);
- }
- /**
- * 绑定队列和交换机
- * @return 绑定对象
- */
- @Bean
- public Binding bindingSaveUpdate() {
- log.info(ROUTING_SAVE_UPDATE_KEY + ":RabbitMQ绑定队列和交换机成功:" + LocalDateTime.now());
- return BindingBuilder.bind(queueSaveUpdate()).to(exchange()).with(ROUTING_SAVE_UPDATE_KEY);
- }
- /**
- * 绑定队列和交换机
- * @return 绑定对象
- */
- @Bean
- public Binding bindingDelete() {
- log.info(ROUTING_DELETE_KEY + ":RabbitMQ绑定队列和交换机成功:" + LocalDateTime.now());
- return BindingBuilder.bind(queueDelete()).to(exchange()).with(ROUTING_DELETE_KEY);
- }
- }
复制代码 RabbitMQ设置消息发送者(生产者)
也就是说,在需要异步调用的地方,注入BillMessageSender,然后,调对应的方法就可以了
- @Service
- public class BillMessageSender {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- /**
- * 发送用户添加修改消息
- * @param bill 参数对象
- */
- public void sendBillSaveUpdateMessage(Bill bill) {
- rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_SAVE_UPDATE_KEY, bill);
- }
- /**
- * 发送用户删除消息
- * @param ids 参数列表
- */
- public void sendBillDeleteMessage(List<Long> ids) {
- rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_DELETE_KEY, ids);
- }
- }
复制代码 异步发送消息
这是在Service层,
所以把ApiResult()这个自定义返回类换成你们的就行了
ObjectMapper这个工具重要是用来处理JSON数据的,这里我用是由于为了方便实体类和Map之间相互转换的,BillMapper是我自己用到的,这个可以换成你们自己的,不影响,剩下的就基本上没啥了,有不懂的可以批评区问,看到会回复
- @Slf4j
- @Service
- public class BillRedisService {
- @Resource
- private BillMapper billMapper;
- @Resource
- private ObjectMapper objectMapper;
- @Resource
- private BillMessageSender billMessageSender;
- @Resource
- private RedisTemplate<String, Object> redisTemplate;
- // redis 中 bill的键
- private static final String BILL_REDIS_KEY = "bill:info";
- /**
- * 保存或者修改信息
- * @param bill
- * @return
- */
- @Transactional
- public ApiResult saveUpdateByRedis(Bill bill){
- if(bill.getId() == null){
- billMessageSender.sendBillSaveUpdateMessage(bill); //把要添加的信息放入消息队列
- }else {
- log.info("[ " + bill.getId() + " ] 修改缓存中的数据");
- String key = BILL_REDIS_KEY + bill.getId(); //找到要修改值对应的redis的key
- Map map = objectMapper.convertValue(bill, Map.class);//把对象转换成map
- redisTemplate.opsForHash().putAll(key, map); //更新数据到缓存中
- billMessageSender.sendBillSaveUpdateMessage(bill); //把要修改的信息放入消息队列
- }
- return ApiResult.success();
- }
- /**
- * 从缓存取数据
- * @param id
- * @return
- */
- @Transactional
- public Bill selectPrimaryKeyByRedis(Long id){
- Bill bill;
- String key = BILL_REDIS_KEY + id;
- //有这个键就取数据,不然就查数据库
- if (redisTemplate.hasKey(key)) {
- log.info("[ " + id + " ] 从缓存中取数据");
- Map<Object, Object> map = redisTemplate.opsForHash().entries(key);
- bill = objectMapper.convertValue(map, Bill.class);
- }else{
- log.info("[ " + id + " ] 缓存中没有,向数据库中查询数据");
- bill = billMapper.selectByPrimaryKey(id);
- String putKey = BILL_REDIS_KEY + bill.getId(); //找到要修改值对应的redis的key
- Map map = objectMapper.convertValue(bill, Map.class); //把对象转换成map
- redisTemplate.opsForHash().putAll(putKey, map); //更新数据到缓存中
- }
- return bill;
- }
- /**
- * 删除方法
- * @param ids
- * @return
- */
- @Transactional
- public ApiResult delByRedis(List<Long> ids) {
- log.info("[ " + Arrays.toString(ids.toArray()) + " ] 以上数据要被删除");
- for (Long id : ids) {
- String key = BILL_REDIS_KEY + id;
- redisTemplate.delete(key); //删除缓存中的数据
- }
- //数据库信息交给消息队列删除
- billMessageSender.sendBillDeleteMessage(ids);
- return ApiResult.success();
- }
- }
复制代码 RabbitMQ设置消息接收者(消耗者)
添加上这个注解@RabbitListener(queues = RabbitMQConfig.QUEUE_SAVE_UPDATE),
并指明监听的队列queues = RabbitMQConfig.QUEUE_SAVE_UPDATE
就能获取到消息发送者发送过来的使命以及使命参数了,就可以在这里写处理逻辑了,如果没有设置logback,可以把@Slf4j,以及log.info(),这两个代码删除了
- @Slf4j
- @Component
- public class BillMessageReceiver {
- @Autowired
- private BillMapper billMapper;
- @Autowired
- private ObjectMapper objectMapper;
- @Resource
- private RedisTemplate<String, Object> redisTemplate;
- // redis 中 bill的键
- private static final String BILL_REDIS_KEY = "bill:info";
- /**
- * 处理添加和修改操作
- * @param bill 参数对象
- */
- @Transactional
- @RabbitListener(queues = RabbitMQConfig.QUEUE_SAVE_UPDATE)
- public void receiveBillSaveUpdateMessage(Bill bill) {
- log.info(RabbitMQConfig.QUEUE_SAVE_UPDATE + " 队列获取到数据:" + bill.toString());
- if (bill == null) {
- return;
- }
- if (bill.getId() == null) {
- bill.setDeleted(0);
- bill.setCreateTime(new Date());
- billMapper.insertSelective(bill);
- } else {
- bill.setUpdateTime(new Date());
- billMapper.updateByPrimaryKeySelective(bill);
- }
- String key = BILL_REDIS_KEY + bill.getId(); //添加后就有主键了,拼接成redis的key
- Map map = objectMapper.convertValue(bill, Map.class);//把对象转换成map
- redisTemplate.opsForHash().putAll(key, map); //把添加的数据放到缓存中
- }
- /**
- * 删除数据
- * @param ids 参数列表
- */
- @Transactional
- @RabbitListener(queues = RabbitMQConfig.QUEUE_DELETE)
- public void receiveBillDeleteMessage(List<Long> ids) {
- log.info(RabbitMQConfig.QUEUE_DELETE + " 队列获取到数据:" + Arrays.toString(ids.toArray()));
- Bill bill = new Bill();
- bill.setDeleted(1);
- Example example = new Example(Bill.class);
- Example.Criteria criteria = example.createCriteria();
- criteria.andIn("id",ids);
- billMapper.updateByExampleSelective(bill,example);
- }
- }
复制代码 总结
再把逻辑捋一下
添加修改删除,这些利用同一发送给RabbitMQ,由RabbitMQ的消耗者处理后续利用
查看详情,添加和更新的数据,交给Redis缓存,缓存没有,就查数据库,然后再缓存到Redis中,就第一遍查数据库,后续走的都是缓存
以上代码实现的功能就是,
全部数据查询还是走的数据库(数据量不多),但是单个查询,查询详情,先查缓存,缓存没有再查数据库,然后再添加到缓存中,下次查询就不走数据库了
添加修改删除同一发送给RabbitMQ消息队列,由消息队列异步完成后续的使命,并更新大概删除对应的缓存
这比之前单独的对数据库利用,多了2层逻辑,RabbitMQ和缓存的处理,这个例子就是简单的使用RabbitMQ和Redis,算是个小入门,如果有其他好的建议,可以批评一下,非常感谢!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |