自由的羽毛 发表于 3 天前

解决Redis和数据库双写一致方案

一.什么是双写一致

双写一致性:当 修改了数据库 的数据也要同时 更新缓存 的数据,缓存和数据库的数据要保持一致
二.俩种场景四种方案

https://i-blog.csdnimg.cn/direct/85367639055842aea336acc723fdcec7.png
1.允许延长 Delay 一致的

第一种.延长双删 (⭐保举指数:75%)
https://i-blog.csdnimg.cn/direct/f65bab6086e44dbf9bf47cdb9ee3ce62.png
示例:
public void updateWithDelayDelete(Product product) {
    // 第一阶段删除
    redis.delete(product.getId());
   
    // 数据库更新
    db.update(product);
   
    // 异步延时删除
    scheduledExecutor.schedule(() -> {
      redis.delete(product.getId());
    }, 500, TimeUnit.MILLISECONDS);
}
如图先删除Redis中的数据,在更新MySQL数据库中的数据,最后延长再次删除Redis中的数据;
延长原因: 如今大部分业务的数据库(主从集群),修改数据库之后,假如立马删除Redis数据,主库数据尚未同步到从库,后续有 其他线程 从 从库 中查询到尚未同步过来的数据写入redis ,照旧会导致脏数据的风险,以是要延时(定时器,或延时队列等)再删除一次redis中的数据
注!!!意:由于无法绝对确认什么时候数据库举行主从同步的,以是哪怕你延时了,照旧有可能在数据库同步之前删掉Redis,然后其他线程获取脏数据导致不一致的情况的!以是延时双删,无法保证强一致性
2.MQ异步消息(⭐保举指数:85%)
https://i-blog.csdnimg.cn/direct/2e9abe13806e4d67bb4d6add9dff48fd.png
如图 更新了MySQL中的数据,必要写入redis时,可以发送一个异步消息,放到MQ中,由专门的消费者去写入redis中
注!!!意 


[*]mq丢失数据问题 解决方案 
[*]设置合理的重试策略
[*] spring:
rabbitmq:
    publisher-confirm-type: correlated# 开启生产者确认模式
    publisher-returns: true             # 开启消息路由失败回调
    listener:
      simple:
      acknowledge-mode: manual# 开启消费者手动ACK
      retry:
          enabled: true         # 开启消费者重试
          max-attempts: 3      # 最大重试次数
          initial-interval: 1000 # 初始重试间隔(ms)
          multiplier: 2          # 间隔乘数(下次间隔=上次间隔*multiplier)
          max-interval: 10000    # 最大重试间隔(ms)
[*]消息去重处理(防止重复消费)
3.Canal监听Binlog (⭐保举指数:99% 公司最常用)
https://i-blog.csdnimg.cn/direct/7af67d27a71149b89e343aefc371294d.png
canal是阿里巴巴出的一种中心件,基于MySQL的主从同步来实现的:
当有数据写入数据库,数据库举行主从同步时,会把全部ddl和dml的语句记录到一个binlog文件中;而canal的作用就是伪装成一个MySQL的从节点,去监听这个binlog日志,把MySQL中我们监听的数据的变革,异步关照给缓存服务,举行写入redis中
canal的优点是:对业务代码几乎无侵,速度很快
yml示例:
database:
mysql:
    config:
      log_bin: "mysql-bin"          # 启用 Binlog
      binlog_format: "ROW"          # 使用 ROW 格式
      server_id: "1"                # 唯一服务器ID
      expire_logs_days: "15"      # 保留15天日志
      max_binlog_size: "500M"       # 单个文件500MB
      sync_binlog: "1"            # 每次事务同步Binlog(高安全)
      binlog_row_image: "FULL"      # 记录完整的行数据
      gtid_mode: "ON"               # 启用GTID(可选,简化主从管理)
canal:
server: 127.0.0.1:11111# Canal Server地址
destination: example      # 对应Canal实例名称
username: canal
password: canal
filter: mydb\\.mytable   # 监听特定表(正则表达式) 核心代码参考:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;

@Component
public class CanalRedisSyncService {

    @Value("${canal.server}")
    private String canalServer;
    @Value("${canal.destination}")
    private String destination;
    @Value("${canal.filter}")
    private String filter;

    private final StringRedisTemplate redisTemplate;

    public CanalRedisSyncService(StringRedisTemplate redisTemplate) {
      this.redisTemplate = redisTemplate;
    }

    @PostConstruct
    public void startListening() {
      new Thread(() -> {
            CanalConnector connector = CanalConnectors.newSingleConnector(
                canalServer, destination, "", "");
            
            try {
                connector.connect();
                connector.subscribe(filter); // 订阅过滤规则
                connector.rollback(); // 重置位点

                while (true) {
                  Message message = connector.getWithoutAck(100); // 批量获取
                  long batchId = message.getId();
                  if (batchId == -1 || message.getEntries().isEmpty()) {
                        Thread.sleep(1000);
                        continue;
                  }

                  processEntries(message.getEntries());
                  connector.ack(batchId); // 确认消费
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                connector.disconnect();
            }
      }).start();
    }

    private void processEntries(List<Entry> entries) {
      for (Entry entry : entries) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||
                entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChange;
            try {
                rowChange = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("解析Binlog错误", e);
            }

            EventType eventType = rowChange.getEventType();
            String tableName = entry.getHeader().getTableName();

            for (RowData rowData : rowChange.getRowDatasList()) {
                // 根据操作类型处理数据
                if (eventType == EventType.DELETE) {
                  handleDelete(tableName, rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT || eventType == EventType.UPDATE) {
                  handleInsertOrUpdate(tableName, rowData.getAfterColumnsList());
                }
            }
      }
    }

    private void handleInsertOrUpdate(String tableName, List<Column> columns) {
      String id = null;
      String jsonData = buildJson(columns); // 构建JSON数据

      // 提取主键ID(假设第一列是ID)
      for (Column column : columns) {
            if ("id".equals(column.getName())) {
                id = column.getValue();
                break;
            }
      }

      if (id != null) {
            // Redis Key格式: table_name:id (如 user:123)
            String redisKey = tableName + ":" + id;
            redisTemplate.opsForValue().set(redisKey, jsonData);
            System.out.println("更新Redis: " + redisKey);
      }
    }

    private void handleDelete(String tableName, List<Column> columns) {
      String id = null;
      for (Column column : columns) {
            if ("id".equals(column.getName())) {
                id = column.getValue();
                break;
            }
      }

      if (id != null) {
            String redisKey = tableName + ":" + id;
            redisTemplate.delete(redisKey);
            System.out.println("删除Redis: " + redisKey);
      }
    }

    private String buildJson(List<Column> columns) {
      // 简化为JSON字符串(实际可用Jackson)
      StringBuilder json = new StringBuilder("{");
      for (Column column : columns) {
            json.append("\"").append(column.getName()).append("\":\"")
                .append(column.getValue()).append("\",");
      }
      json.deleteCharAt(json.length() - 1).append("}");
      return json.toString();
    }
} 2.一致性 Consistency 要求高的 

1. 分布式锁方案(⭐保举指数:90%)
https://i-blog.csdnimg.cn/direct/2821ebc224cb498aa4c2ec5f525f3986.png
如图给资源加上一个互斥锁:当线程1要更新MySQL数据库和删除Redis中的数据前,加互斥锁,这样其他线程无法获取Redis中的数据,只能等线程1写入MySQL,并删除缓存完成释放锁后,才能读取数据;
互斥锁保证了强一致性,但是性能很低,充斥大量的获取锁和释放锁的额外开销
示例:
// 读操作:使用读锁保证一致性
    public Integer getProductStock(Long productId) {
      String cacheKey = "product:stock:" + productId;
      RReadWriteLock lock = redissonClient.getReadWriteLock("product_lock:" + productId);
      
      try {
            // 1. 获取读锁(共享锁)
            lock.readLock().lock();
            
            // 2. 先查缓存
            Integer stock = (Integer) redisTemplate.opsForValue().get(cacheKey);
            if (stock != null) {
                return stock;
            }
            
            // 3. 缓存未命中,查数据库
            try {
                stock = jdbcTemplate.queryForObject(
                  "SELECT stock FROM product WHERE id = ?",
                  Integer.class,
                  productId
                );
            } catch (EmptyResultDataAccessException e) {
                return 0; // 处理数据不存在的情况
            }
            
            // 4. 写入缓存(设置过期时间防雪崩)
            redisTemplate.opsForValue().set(cacheKey, stock, 30, TimeUnit.MINUTES);
            return stock;
            
      } finally {
            // 5. 释放读锁
            lock.readLock().unlock();
      }
    }

    // 写操作:使用写锁保证强一致性
    public void updateProductStock(Long productId, int newStock) {
      String cacheKey = "product:stock:" + productId;
      RReadWriteLock lock = redissonClient.getReadWriteLock("product_lock:" + productId);
      
      try {
            // 1. 获取写锁(排他锁)
            lock.writeLock().lock();
            
            // 2. 更新数据库
            jdbcTemplate.update(
                "UPDATE product SET stock = ? WHERE id = ?",
                newStock,
                productId
            );
            
            // 3. 删除缓存(直接删除,下次读时重建)
            redisTemplate.delete(cacheKey);
            
      } finally {
            // 4. 释放写锁
            lock.writeLock().unlock();
      }
    }


[*]读锁(共享锁):允许多个线程同时加锁,保证并发读性能,但会阻塞写锁。
[*]写锁(排他锁):独占锁,同一时候只允许一个线程持有,阻塞全部读锁和写锁。
[*]强一致性保证,读写互斥控制严格。
[*]利用 Redisson 的分布式锁特性,支持高可用和自动续期。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 解决Redis和数据库双写一致方案