一.什么是双写一致
双写一致性:当 修改了数据库 的数据也要同时 更新缓存 的数据,缓存和数据库的数据要保持一致
二.俩种场景四种方案
1.允许延长 Delay 一致的
第一种.延长双删 (⭐保举指数:75%)
示例:
- public void updateWithDelayDelete(Product product) {
- // 第一阶段删除
- redis.delete(product.getId());
-
- // 数据库更新
- db.update(product);
-
- // 异步延时删除
- scheduledExecutor.schedule(() -> {
- redis.delete(product.getId());
- }, 500, TimeUnit.MILLISECONDS);
- }
复制代码 如图先删除Redis中的数据,在更新MySQL数据库中的数据,最后延长再次删除Redis中的数据;
延长原因: 如今大部分业务的数据库(主从集群),修改数据库之后,假如立马删除Redis数据,主库数据尚未同步到从库,后续有 其他线程 从 从库 中查询到尚未同步过来的数据写入redis ,照旧会导致脏数据的风险,以是要延时(定时器,或延时队列等)再删除一次redis中的数据
注!!!意:由于无法绝对确认什么时候数据库举行主从同步的,以是哪怕你延时了,照旧有可能在数据库同步之前删掉Redis,然后其他线程获取脏数据导致不一致的情况的!以是延时双删,无法保证强一致性
2.MQ异步消息(⭐保举指数:85%)
如图 更新了MySQL中的数据,必要写入redis时,可以发送一个异步消息,放到MQ中,由专门的消费者去写入redis中
注!!!意
- mq丢失数据问题 解决方案
- 设置合理的重试策略
- spring:
- rabbitmq:
- publisher-confirm-type: correlated # 开启生产者确认模式
- publisher-returns: true # 开启消息路由失败回调
- listener:
- simple:
- acknowledge-mode: manual # 开启消费者手动ACK
- retry:
- enabled: true # 开启消费者重试
- max-attempts: 3 # 最大重试次数
- initial-interval: 1000 # 初始重试间隔(ms)
- multiplier: 2 # 间隔乘数(下次间隔=上次间隔*multiplier)
- max-interval: 10000 # 最大重试间隔(ms)
复制代码- 消息去重处理(防止重复消费)
3.Canal监听Binlog (⭐保举指数:99% 公司最常用)
canal是阿里巴巴出的一种中心件,基于MySQL的主从同步来实现的:
当有数据写入数据库,数据库举行主从同步时,会把全部ddl和dml的语句记录到一个binlog文件中;而canal的作用就是伪装成一个MySQL的从节点,去监听这个binlog日志,把MySQL中我们监听的数据的变革,异步关照给缓存服务,举行写入redis中
canal的优点是:对业务代码几乎无侵,速度很快
yml示例:
- database:
- mysql:
- config:
- log_bin: "mysql-bin" # 启用 Binlog
- binlog_format: "ROW" # 使用 ROW 格式
- server_id: "1" # 唯一服务器ID
- expire_logs_days: "15" # 保留15天日志
- max_binlog_size: "500M" # 单个文件500MB
- sync_binlog: "1" # 每次事务同步Binlog(高安全)
- binlog_row_image: "FULL" # 记录完整的行数据
- gtid_mode: "ON" # 启用GTID(可选,简化主从管理)
- canal:
- server: 127.0.0.1:11111 # Canal Server地址
- destination: example # 对应Canal实例名称
- username: canal
- password: canal
- filter: mydb\\.mytable # 监听特定表(正则表达式)
复制代码 核心代码参考:
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.client.CanalConnectors;
- import com.alibaba.otter.canal.protocol.CanalEntry.*;
- import com.alibaba.otter.canal.protocol.Message;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.data.redis.core.StringRedisTemplate;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- import java.util.List;
- @Component
- public class CanalRedisSyncService {
- @Value("${canal.server}")
- private String canalServer;
- @Value("${canal.destination}")
- private String destination;
- @Value("${canal.filter}")
- private String filter;
- private final StringRedisTemplate redisTemplate;
- public CanalRedisSyncService(StringRedisTemplate redisTemplate) {
- this.redisTemplate = redisTemplate;
- }
- @PostConstruct
- public void startListening() {
- new Thread(() -> {
- CanalConnector connector = CanalConnectors.newSingleConnector(
- canalServer, destination, "", "");
-
- try {
- connector.connect();
- connector.subscribe(filter); // 订阅过滤规则
- connector.rollback(); // 重置位点
- while (true) {
- Message message = connector.getWithoutAck(100); // 批量获取
- long batchId = message.getId();
- if (batchId == -1 || message.getEntries().isEmpty()) {
- Thread.sleep(1000);
- continue;
- }
- processEntries(message.getEntries());
- connector.ack(batchId); // 确认消费
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- connector.disconnect();
- }
- }).start();
- }
- private void processEntries(List<Entry> entries) {
- for (Entry entry : entries) {
- if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||
- entry.getEntryType() == EntryType.TRANSACTIONEND) {
- continue;
- }
- RowChange rowChange;
- try {
- rowChange = RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("解析Binlog错误", e);
- }
- EventType eventType = rowChange.getEventType();
- String tableName = entry.getHeader().getTableName();
- for (RowData rowData : rowChange.getRowDatasList()) {
- // 根据操作类型处理数据
- if (eventType == EventType.DELETE) {
- handleDelete(tableName, rowData.getBeforeColumnsList());
- } else if (eventType == EventType.INSERT || eventType == EventType.UPDATE) {
- handleInsertOrUpdate(tableName, rowData.getAfterColumnsList());
- }
- }
- }
- }
- private void handleInsertOrUpdate(String tableName, List<Column> columns) {
- String id = null;
- String jsonData = buildJson(columns); // 构建JSON数据
- // 提取主键ID(假设第一列是ID)
- for (Column column : columns) {
- if ("id".equals(column.getName())) {
- id = column.getValue();
- break;
- }
- }
- if (id != null) {
- // Redis Key格式: table_name:id (如 user:123)
- String redisKey = tableName + ":" + id;
- redisTemplate.opsForValue().set(redisKey, jsonData);
- System.out.println("更新Redis: " + redisKey);
- }
- }
- private void handleDelete(String tableName, List<Column> columns) {
- String id = null;
- for (Column column : columns) {
- if ("id".equals(column.getName())) {
- id = column.getValue();
- break;
- }
- }
- if (id != null) {
- String redisKey = tableName + ":" + id;
- redisTemplate.delete(redisKey);
- System.out.println("删除Redis: " + redisKey);
- }
- }
- private String buildJson(List<Column> columns) {
- // 简化为JSON字符串(实际可用Jackson)
- StringBuilder json = new StringBuilder("{");
- for (Column column : columns) {
- json.append(""").append(column.getName()).append("":"")
- .append(column.getValue()).append("",");
- }
- json.deleteCharAt(json.length() - 1).append("}");
- return json.toString();
- }
- }
复制代码 2.一致性 Consistency 要求高的
1. 分布式锁方案(⭐保举指数:90%)

如图给资源加上一个互斥锁:当线程1要更新MySQL数据库和删除Redis中的数据前,加互斥锁,这样其他线程无法获取Redis中的数据,只能等线程1写入MySQL,并删除缓存完成释放锁后,才能读取数据;
互斥锁保证了强一致性,但是性能很低,充斥大量的获取锁和释放锁的额外开销
示例:
- // 读操作:使用读锁保证一致性
- public Integer getProductStock(Long productId) {
- String cacheKey = "product:stock:" + productId;
- RReadWriteLock lock = redissonClient.getReadWriteLock("product_lock:" + productId);
-
- try {
- // 1. 获取读锁(共享锁)
- lock.readLock().lock();
-
- // 2. 先查缓存
- Integer stock = (Integer) redisTemplate.opsForValue().get(cacheKey);
- if (stock != null) {
- return stock;
- }
-
- // 3. 缓存未命中,查数据库
- try {
- stock = jdbcTemplate.queryForObject(
- "SELECT stock FROM product WHERE id = ?",
- Integer.class,
- productId
- );
- } catch (EmptyResultDataAccessException e) {
- return 0; // 处理数据不存在的情况
- }
-
- // 4. 写入缓存(设置过期时间防雪崩)
- redisTemplate.opsForValue().set(cacheKey, stock, 30, TimeUnit.MINUTES);
- return stock;
-
- } finally {
- // 5. 释放读锁
- lock.readLock().unlock();
- }
- }
- // 写操作:使用写锁保证强一致性
- public void updateProductStock(Long productId, int newStock) {
- String cacheKey = "product:stock:" + productId;
- RReadWriteLock lock = redissonClient.getReadWriteLock("product_lock:" + productId);
-
- try {
- // 1. 获取写锁(排他锁)
- lock.writeLock().lock();
-
- // 2. 更新数据库
- jdbcTemplate.update(
- "UPDATE product SET stock = ? WHERE id = ?",
- newStock,
- productId
- );
-
- // 3. 删除缓存(直接删除,下次读时重建)
- redisTemplate.delete(cacheKey);
-
- } finally {
- // 4. 释放写锁
- lock.writeLock().unlock();
- }
- }
复制代码
- 读锁(共享锁):允许多个线程同时加锁,保证并发读性能,但会阻塞写锁。
- 写锁(排他锁):独占锁,同一时候只允许一个线程持有,阻塞全部读锁和写锁。
- 强一致性保证,读写互斥控制严格。
- 利用 Redisson 的分布式锁特性,支持高可用和自动续期。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |