解决Redis和数据库双写一致方案

打印 上一主题 下一主题

主题 2184|帖子 2184|积分 6552

一.什么是双写一致

双写一致性:当 修改了数据库 的数据也要同时 更新缓存 的数据,缓存和数据库的数据要保持一致
二.俩种场景四种方案


1.允许延长 Delay 一致的

第一种.延长双删 (⭐保举指数:75%)

示例:
  1. public void updateWithDelayDelete(Product product) {
  2.     // 第一阶段删除
  3.     redis.delete(product.getId());
  4.    
  5.     // 数据库更新
  6.     db.update(product);
  7.    
  8.     // 异步延时删除
  9.     scheduledExecutor.schedule(() -> {
  10.         redis.delete(product.getId());
  11.     }, 500, TimeUnit.MILLISECONDS);
  12. }
复制代码
如图先删除Redis中的数据,在更新MySQL数据库中的数据,最后延长再次删除Redis中的数据;
延长原因: 如今大部分业务的数据库(主从集群),修改数据库之后,假如立马删除Redis数据,主库数据尚未同步到从库,后续有 其他线程 从 从库 中查询到尚未同步过来的数据写入redis ,照旧会导致脏数据的风险,以是要延时(定时器,或延时队列等)再删除一次redis中的数据
注!!!意:由于无法绝对确认什么时候数据库举行主从同步的,以是哪怕你延时了,照旧有可能在数据库同步之前删掉Redis,然后其他线程获取脏数据导致不一致的情况的!以是延时双删,无法保证强一致性
2.MQ异步消息(⭐保举指数:85%)

如图 更新了MySQL中的数据,必要写入redis时,可以发送一个异步消息,放到MQ中,由专门的消费者去写入redis中
注!!!意 


  • mq丢失数据问题 解决方案 
  • 设置合理的重试策略
    1. spring:
    2.   rabbitmq:
    3.     publisher-confirm-type: correlated  # 开启生产者确认模式
    4.     publisher-returns: true             # 开启消息路由失败回调
    5.     listener:
    6.       simple:
    7.         acknowledge-mode: manual  # 开启消费者手动ACK
    8.         retry:
    9.           enabled: true           # 开启消费者重试
    10.           max-attempts: 3        # 最大重试次数
    11.           initial-interval: 1000 # 初始重试间隔(ms)
    12.           multiplier: 2          # 间隔乘数(下次间隔=上次间隔*multiplier)
    13.           max-interval: 10000    # 最大重试间隔(ms)
    复制代码
  • 消息去重处理(防止重复消费)
3.Canal监听Binlog (⭐保举指数:99% 公司最常用)

canal是阿里巴巴出的一种中心件,基于MySQL的主从同步来实现的:
当有数据写入数据库,数据库举行主从同步时,会把全部ddl和dml的语句记录到一个binlog文件中;而canal的作用就是伪装成一个MySQL的从节点,去监听这个binlog日志,把MySQL中我们监听的数据的变革,异步关照给缓存服务,举行写入redis中
canal的优点是:对业务代码几乎无侵,速度很快
yml示例:
  1. database:
  2.   mysql:
  3.     config:
  4.       log_bin: "mysql-bin"          # 启用 Binlog
  5.       binlog_format: "ROW"          # 使用 ROW 格式
  6.       server_id: "1"                # 唯一服务器ID
  7.       expire_logs_days: "15"        # 保留15天日志
  8.       max_binlog_size: "500M"       # 单个文件500MB
  9.       sync_binlog: "1"              # 每次事务同步Binlog(高安全)
  10.       binlog_row_image: "FULL"      # 记录完整的行数据
  11.       gtid_mode: "ON"               # 启用GTID(可选,简化主从管理)
  12. canal:
  13.   server: 127.0.0.1:11111  # Canal Server地址
  14.   destination: example      # 对应Canal实例名称
  15.   username: canal
  16.   password: canal
  17.   filter: mydb\\.mytable   # 监听特定表(正则表达式)
复制代码
核心代码参考:
  1. import com.alibaba.otter.canal.client.CanalConnector;
  2. import com.alibaba.otter.canal.client.CanalConnectors;
  3. import com.alibaba.otter.canal.protocol.CanalEntry.*;
  4. import com.alibaba.otter.canal.protocol.Message;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.data.redis.core.StringRedisTemplate;
  7. import org.springframework.stereotype.Component;
  8. import javax.annotation.PostConstruct;
  9. import java.util.List;
  10. @Component
  11. public class CanalRedisSyncService {
  12.     @Value("${canal.server}")
  13.     private String canalServer;
  14.     @Value("${canal.destination}")
  15.     private String destination;
  16.     @Value("${canal.filter}")
  17.     private String filter;
  18.     private final StringRedisTemplate redisTemplate;
  19.     public CanalRedisSyncService(StringRedisTemplate redisTemplate) {
  20.         this.redisTemplate = redisTemplate;
  21.     }
  22.     @PostConstruct
  23.     public void startListening() {
  24.         new Thread(() -> {
  25.             CanalConnector connector = CanalConnectors.newSingleConnector(
  26.                 canalServer, destination, "", "");
  27.             
  28.             try {
  29.                 connector.connect();
  30.                 connector.subscribe(filter); // 订阅过滤规则
  31.                 connector.rollback(); // 重置位点
  32.                 while (true) {
  33.                     Message message = connector.getWithoutAck(100); // 批量获取
  34.                     long batchId = message.getId();
  35.                     if (batchId == -1 || message.getEntries().isEmpty()) {
  36.                         Thread.sleep(1000);
  37.                         continue;
  38.                     }
  39.                     processEntries(message.getEntries());
  40.                     connector.ack(batchId); // 确认消费
  41.                 }
  42.             } catch (Exception e) {
  43.                 e.printStackTrace();
  44.             } finally {
  45.                 connector.disconnect();
  46.             }
  47.         }).start();
  48.     }
  49.     private void processEntries(List<Entry> entries) {
  50.         for (Entry entry : entries) {
  51.             if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||
  52.                 entry.getEntryType() == EntryType.TRANSACTIONEND) {
  53.                 continue;
  54.             }
  55.             RowChange rowChange;
  56.             try {
  57.                 rowChange = RowChange.parseFrom(entry.getStoreValue());
  58.             } catch (Exception e) {
  59.                 throw new RuntimeException("解析Binlog错误", e);
  60.             }
  61.             EventType eventType = rowChange.getEventType();
  62.             String tableName = entry.getHeader().getTableName();
  63.             for (RowData rowData : rowChange.getRowDatasList()) {
  64.                 // 根据操作类型处理数据
  65.                 if (eventType == EventType.DELETE) {
  66.                     handleDelete(tableName, rowData.getBeforeColumnsList());
  67.                 } else if (eventType == EventType.INSERT || eventType == EventType.UPDATE) {
  68.                     handleInsertOrUpdate(tableName, rowData.getAfterColumnsList());
  69.                 }
  70.             }
  71.         }
  72.     }
  73.     private void handleInsertOrUpdate(String tableName, List<Column> columns) {
  74.         String id = null;
  75.         String jsonData = buildJson(columns); // 构建JSON数据
  76.         // 提取主键ID(假设第一列是ID)
  77.         for (Column column : columns) {
  78.             if ("id".equals(column.getName())) {
  79.                 id = column.getValue();
  80.                 break;
  81.             }
  82.         }
  83.         if (id != null) {
  84.             // Redis Key格式: table_name:id (如 user:123)
  85.             String redisKey = tableName + ":" + id;
  86.             redisTemplate.opsForValue().set(redisKey, jsonData);
  87.             System.out.println("更新Redis: " + redisKey);
  88.         }
  89.     }
  90.     private void handleDelete(String tableName, List<Column> columns) {
  91.         String id = null;
  92.         for (Column column : columns) {
  93.             if ("id".equals(column.getName())) {
  94.                 id = column.getValue();
  95.                 break;
  96.             }
  97.         }
  98.         if (id != null) {
  99.             String redisKey = tableName + ":" + id;
  100.             redisTemplate.delete(redisKey);
  101.             System.out.println("删除Redis: " + redisKey);
  102.         }
  103.     }
  104.     private String buildJson(List<Column> columns) {
  105.         // 简化为JSON字符串(实际可用Jackson)
  106.         StringBuilder json = new StringBuilder("{");
  107.         for (Column column : columns) {
  108.             json.append(""").append(column.getName()).append("":"")
  109.                 .append(column.getValue()).append("",");
  110.         }
  111.         json.deleteCharAt(json.length() - 1).append("}");
  112.         return json.toString();
  113.     }
  114. }
复制代码
2.一致性 Consistency 要求高的 

1. 分布式锁方案(⭐保举指数:90%)

如图给资源加上一个互斥锁:当线程1要更新MySQL数据库和删除Redis中的数据前,加互斥锁,这样其他线程无法获取Redis中的数据,只能等线程1写入MySQL,并删除缓存完成释放锁后,才能读取数据;
互斥锁保证了强一致性,但是性能很低,充斥大量的获取锁和释放锁的额外开销
示例:
  1. // 读操作:使用读锁保证一致性
  2.     public Integer getProductStock(Long productId) {
  3.         String cacheKey = "product:stock:" + productId;
  4.         RReadWriteLock lock = redissonClient.getReadWriteLock("product_lock:" + productId);
  5.         
  6.         try {
  7.             // 1. 获取读锁(共享锁)
  8.             lock.readLock().lock();
  9.             
  10.             // 2. 先查缓存
  11.             Integer stock = (Integer) redisTemplate.opsForValue().get(cacheKey);
  12.             if (stock != null) {
  13.                 return stock;
  14.             }
  15.             
  16.             // 3. 缓存未命中,查数据库
  17.             try {
  18.                 stock = jdbcTemplate.queryForObject(
  19.                     "SELECT stock FROM product WHERE id = ?",
  20.                     Integer.class,
  21.                     productId
  22.                 );
  23.             } catch (EmptyResultDataAccessException e) {
  24.                 return 0; // 处理数据不存在的情况
  25.             }
  26.             
  27.             // 4. 写入缓存(设置过期时间防雪崩)
  28.             redisTemplate.opsForValue().set(cacheKey, stock, 30, TimeUnit.MINUTES);
  29.             return stock;
  30.             
  31.         } finally {
  32.             // 5. 释放读锁
  33.             lock.readLock().unlock();
  34.         }
  35.     }
  36.     // 写操作:使用写锁保证强一致性
  37.     public void updateProductStock(Long productId, int newStock) {
  38.         String cacheKey = "product:stock:" + productId;
  39.         RReadWriteLock lock = redissonClient.getReadWriteLock("product_lock:" + productId);
  40.         
  41.         try {
  42.             // 1. 获取写锁(排他锁)
  43.             lock.writeLock().lock();
  44.             
  45.             // 2. 更新数据库
  46.             jdbcTemplate.update(
  47.                 "UPDATE product SET stock = ? WHERE id = ?",
  48.                 newStock,
  49.                 productId
  50.             );
  51.             
  52.             // 3. 删除缓存(直接删除,下次读时重建)
  53.             redisTemplate.delete(cacheKey);
  54.             
  55.         } finally {
  56.             // 4. 释放写锁
  57.             lock.writeLock().unlock();
  58.         }
  59.     }
复制代码


  • 读锁(共享锁):允许多个线程同时加锁,保证并发读性能,但会阻塞写锁。
  • 写锁(排他锁):独占锁,同一时候只允许一个线程持有,阻塞全部读锁和写锁。
  • 强一致性保证,读写互斥控制严格。
  • 利用 Redisson 的分布式锁特性,支持高可用和自动续期。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

自由的羽毛

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表