知者何南 发表于 3 天前

分布式锁算法——基于数据库的分布式锁全面解析

https://i-blog.csdnimg.cn/direct/ea623f9ac3cd4346add8dc2880c05bad.png#pic_center
Java实现基于数据库的分布式锁全面解析

一、核心原理与筹划考量

1. 分布式锁基本要求

   2. 数据库实现方案对比

方案范例实现方式优点缺点唯一索引法利用唯一约束插入记录实现简朴无主动过期机制乐观锁方案版本号/时间戳更新克制长事件高并发性能差悲观锁方案SELECT FOR UPDATE强一致性连接池耗尽风险存储过程方案数据库函数实现锁逻辑减少网络往返数据库移植性差 二、基础实现方案

1. 唯一索引方案实现

// 数据库表结构
CREATE TABLE distributed_lock (
    lock_key VARCHAR(64) PRIMARY KEY,
    client_id VARCHAR(36) NOT NULL,
    expire_time DATETIME NOT NULL,
    version INT DEFAULT 0
);

// Java实现类
public class UniqueIndexLock {
    private final DataSource dataSource;
   
    public boolean tryLock(String lockKey, String clientId, int expireSeconds) {
      String sql = "INSERT INTO distributed_lock (lock_key, client_id, expire_time) "
                   + "VALUES (?, ?, NOW() + INTERVAL ? SECOND) "
                   + "ON DUPLICATE KEY UPDATE "
                   + "client_id = IF(expire_time < NOW(), VALUES(client_id), client_id), "
                   + "expire_time = IF(expire_time < NOW(), VALUES(expire_time), expire_time)";
      
      try (Connection conn = dataSource.getConnection();
             PreparedStatement ps = conn.prepareStatement(sql)) {
            
            ps.setString(1, lockKey);
            ps.setString(2, clientId);
            ps.setInt(3, expireSeconds);
            
            return ps.executeUpdate() > 0;
      } catch (SQLException e) {
            if (e.getErrorCode() == 1062) { // 唯一键冲突
                return false;
            }
            throw new LockException("获取锁失败", e);
      }
    }
   
    public void unlock(String lockKey, String clientId) {
      String sql = "DELETE FROM distributed_lock WHERE lock_key = ? AND client_id = ?";
      try (Connection conn = dataSource.getConnection();
             PreparedStatement ps = conn.prepareStatement(sql)) {
            
            ps.setString(1, lockKey);
            ps.setString(2, clientId);
            int affected = ps.executeUpdate();
            
            if (affected == 0) {
                throw new LockException("释放锁失败:锁不存在或客户端不匹配");
            }
      } catch (SQLException e) {
            throw new LockException("释放锁异常", e);
      }
    }
}
2. 乐观锁版本控制方案

public class OptimisticLock {
    public boolean tryLock(String lockKey, String clientId, int expireSeconds) {
      Connection conn = null;
      try {
            conn = dataSource.getConnection();
            conn.setAutoCommit(false);
            
            // 1. 查询当前锁状态
            LockRecord record = selectForUpdate(conn, lockKey);
            
            if (record == null) {
                insertNewLock(conn, lockKey, clientId, expireSeconds);
                return true;
            }
            
            if (record.isExpired()) {
                int updated = updateLockOwner(conn, lockKey, clientId,
                  record.getVersion(), expireSeconds);
                return updated > 0;
            }
            
            return false;
      } finally {
            if (conn != null) {
                try {
                  conn.commit();
                  conn.close();
                } catch (SQLException e) {
                  // 处理异常
                }
            }
      }
    }
   
    private int updateLockOwner(Connection conn, String lockKey, String clientId,
                              int oldVersion, int expireSeconds) throws SQLException {
      String sql = "UPDATE distributed_lock SET "
                   + "client_id = ?, "
                   + "expire_time = NOW() + INTERVAL ? SECOND, "
                   + "version = version + 1 "
                   + "WHERE lock_key = ? AND version = ?";
      
      try (PreparedStatement ps = conn.prepareStatement(sql)) {
            ps.setString(1, clientId);
            ps.setInt(2, expireSeconds);
            ps.setString(3, lockKey);
            ps.setInt(4, oldVersion);
            return ps.executeUpdate();
      }
    }
}
三、生产级优化实现

1. 锁续期守护线程

public class LockRenewalDaemon {
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    private final ConcurrentMap<String, Lease> heldLocks = new ConcurrentHashMap<>();
   
    class Lease {
      final String lockKey;
      final String clientId;
      volatile long expiryTime;
      
      Lease(String lockKey, String clientId, int ttl) {
            this.lockKey = lockKey;
            this.clientId = clientId;
            this.expiryTime = System.currentTimeMillis() + ttl * 1000;
      }
    }
   
    public void startRenewal(String lockKey, String clientId, int ttl) {
      Lease lease = new Lease(lockKey, clientId, ttl);
      heldLocks.put(lockKey, lease);
      
      scheduler.scheduleAtFixedRate(() -> {
            try {
                renewLock(lease);
            } catch (Exception e) {
                // 处理续期失败
            }
      }, ttl / 3 * 1000, ttl / 3 * 1000, TimeUnit.MILLISECONDS);
    }
   
    private void renewLock(Lease lease) {
      String sql = "UPDATE distributed_lock SET "
                   + "expire_time = NOW() + INTERVAL ? SECOND "
                   + "WHERE lock_key = ? AND client_id = ?";
      
      try (Connection conn = dataSource.getConnection();
             PreparedStatement ps = conn.prepareStatement(sql)) {
            
            ps.setInt(1, lease.ttl);
            ps.setString(2, lease.lockKey);
            ps.setString(3, lease.clientId);
            
            if (ps.executeUpdate() > 0) {
                lease.expiryTime = System.currentTimeMillis() + lease.ttl * 1000;
            }
      } catch (SQLException e) {
            // 处理续期失败
      }
    }
}
2. 锁获取重试策略

public class RetryPolicy {
    private static final int MAX_RETRIES = 5;
    private static final long BASE_DELAY = 100;
    private static final double JITTER_FACTOR = 0.2;
   
    public boolean acquireWithRetry(String lockKey, String clientId) {
      int attempt = 0;
      while (attempt < MAX_RETRIES) {
            if (tryAcquire(lockKey, clientId)) {
                return true;
            }
            
            long delay = calculateBackoff(attempt);
            try {
                Thread.sleep(delay);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
            attempt++;
      }
      return false;
    }
   
    private long calculateBackoff(int attempt) {
      double jitter = (Math.random() * 2 - 1) * JITTER_FACTOR * BASE_DELAY;
      return (long) (Math.pow(2, attempt) * BASE_DELAY + jitter);
    }
}
四、高可用架构筹划

1. 多数据库实例部署

   2. 跨数据库锁同步方案

public class MultiDBSyncLock {
    private final List<DataSource> dataSources;
   
    public boolean crossDBlock(String lockKey, String clientId) {
      int successCount = 0;
      for (DataSource ds : dataSources) {
            if (tryLockOnDB(ds, lockKey, clientId)) {
                successCount++;
            }
      }
      return successCount > dataSources.size() / 2;
    }
   
    private boolean tryLockOnDB(DataSource ds, String lockKey, String clientId) {
      // 实现单数据库锁获取逻辑
    }
   
    public void releaseCrossDBLock(String lockKey, String clientId) {
      dataSources.parallelStream().forEach(ds -> {
            try {
                releaseLockOnDB(ds, lockKey, clientId);
            } catch (Exception e) {
                // 记录释放失败
            }
      });
    }
}
五、性能优化策略

1. 数据库连接池设置

# HikariCP配置示例
hikari.maximumPoolSize=20
hikari.minimumIdle=5
hikari.idleTimeout=30000
hikari.connectionTimeout=2000
hikari.leakDetectionThreshold=5000
2. 锁表索引优化

ALTER TABLE distributed_lock
ADD INDEX idx_expire_time (expire_time),
ADD INDEX idx_client (client_id);
3. 批量锁操作优化

public class BatchLockOperator {
    public Map<String, Boolean> batchLock(List<String> keys, String clientId) {
      String sql = "INSERT INTO distributed_lock (lock_key, client_id, expire_time) "
                   + "VALUES %s ON DUPLICATE KEY UPDATE "
                   + "client_id = IF(expire_time < NOW(), VALUES(client_id), client_id)";
      
      String values = keys.stream()
            .map(k -> String.format("('%s', '%s', NOW() + INTERVAL 30 SECOND)", k, clientId))
            .collect(Collectors.joining(","));
      
      try (Connection conn = dataSource.getConnection();
             Statement stmt = conn.createStatement()) {
            
            int affected = stmt.executeUpdate(String.format(sql, values));
            return parseBatchResult(keys, affected);
      }
    }
}
六、非常处置惩罚机制

1. 锁超时主动开释

public class LockTimeoutCleaner {
    private final ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();
   
    @PostConstruct
    public void init() {
      cleaner.scheduleAtFixedRate(this::cleanExpiredLocks,
            1, 1, TimeUnit.MINUTES);
    }
   
    private void cleanExpiredLocks() {
      String sql = "DELETE FROM distributed_lock WHERE expire_time < NOW()";
      try (Connection conn = dataSource.getConnection();
             PreparedStatement ps = conn.prepareStatement(sql)) {
            ps.executeUpdate();
      } catch (SQLException e) {
            // 记录清理失败
      }
    }
}
2. 数据库故障转移处置惩罚

public class FailoverHandler {
    private volatile DataSource activeDataSource;
    private final List<DataSource> standbyDataSources;
   
    public boolean tryLockWithFailover(String lockKey, String clientId) {
      for (DataSource ds : getAllDataSources()) {
            try {
                if (tryLockOnDB(ds, lockKey, clientId)) {
                  activeDataSource = ds;
                  return true;
                }
            } catch (SQLException e) {
                // 标记数据库不可用
                markDBNotAvailable(ds);
            }
      }
      return false;
    }
}
七、生产环境监控

1. 关键监控指标

class LockMetrics {
    // 锁操作统计
    Counter lockSuccess = new Counter();
    Counter lockFailure = new Counter();
    Histogram lockTime = new Histogram();
   
    // 数据库连接池状态
    Gauge connectionPoolUsage = new Gauge();
    Counter connectionTimeout = new Counter();
   
    // 锁持有时间分布
    Timer lockDuration = new Timer();
}
2. 慢查询监控

-- MySQL慢查询日志分析
SELECT
    lock_key,
    COUNT(*) AS lock_operations,
    AVG(query_time) AS avg_time,
    MAX(query_time) AS max_time
FROM
    mysql.slow_log
WHERE
    sql_text LIKE '%distributed_lock%'
GROUP BY
    lock_key;
八、与其他方案对比

特性数据库方案Redis方案ZooKeeper方案一致性强一致性最终一致性强一致性性能中等(千级QPS)高(万级QPS)中等(千级QPS)实现复杂度中等简朴复杂依赖基础办法需要数据库集群需要Redis集群需要ZK集群主动过期需自行实现原生支持临时节点主动删除可重入性需自行实现需自行实现原生支持 九、典范应用场景

1. 库存扣减操作

public class InventoryService {
    private final DistributedLock lock;
   
    @Transactional
    public void deductInventory(String itemId, int quantity) {
      String lockKey = "inventory_lock:" + itemId;
      if (lock.tryLock(lockKey, 30)) {
            try {
                Item item = itemDAO.selectForUpdate(itemId);
                if (item.getStock() >= quantity) {
                  itemDAO.updateStock(itemId, -quantity);
                }
            } finally {
                lock.unlock(lockKey);
            }
      } else {
            throw new BusinessException("系统繁忙,请稍后重试");
      }
    }
}
2. 定时任务调治

public class DistributedJobScheduler {
    private final DistributedLock lock;
   
    @Scheduled(fixedRate = 60000)
    public void runReportGeneration() {
      if (lock.tryLock("job_report_gen", 60)) {
            try {
                generateDailyReport();
            } finally {
                lock.unlock("job_report_gen");
            }
      }
    }
}
十、总结与最佳实践

1. 方案上风



[*]数据强一致:基于数据库事件保证锁状态一致性
[*]无额外依赖:复用现有数据库基础办法
[*]简朴可靠:实现方案直接,恰当中小规模系统
2. 适用场景



[*]已有数据库集群且不希望引入新组件
[*]对锁精度要求高的金融交易场景
[*]低频但需要强一致性的关键操作
3. 注意事项



[*]克制长事件:锁持有时间应只管收缩
[*]索引优化:确保锁表的查询效率
[*]连接池管理:防止锁竞争导致连接耗尽
[*]监控告警:及时跟踪锁争用环境
部署建议:

[*]使用主从数据库集群提高可用性
[*]定期实行锁表维护(清理过期锁、重建索引)
[*]为锁操作设置独立数据库用户和连接池
[*]结合应用层重试和数据库事件优化性能
更多资源:

http://sj.ysok.net/jydoraemon 访问码:JYAM
本文发表于【纪元A梦】,关注我,获取更多免费实用教程/资源!


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 分布式锁算法——基于数据库的分布式锁全面解析