Java基于数据库、乐观锁、悲观锁、Redis、Zookeeper分布式锁的简单案例实现 ...

立山  论坛元老 | 2024-8-20 16:33:56 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1005|帖子 1005|积分 3015

1. 分布式锁的界说

分布式锁是一种在分布式系统中用来协调多个历程或线程对共享资源举行访问的机制。它确保在分布式环境下,多个节点(如不同的服务器或历程)不会同时访问同一个共享资源,从而避免数据不一致、资源竞争等问题。
2. 分布式锁的工作原理

分布式锁的工作原理与单机锁雷同,但它需要考虑多个节点之间的协调。在获取锁时,历程必须确保锁的唯一性,即在任何时刻,只有一个历程可以大概乐成获取锁,而且锁的状态可以大概在不同节点之间保持一致。通常,分布式锁的实现需要满意以下条件:
条件描述互斥性在同一时间,只有一个历程能获得锁,确保没有其他历程可以大概同时访问该资源。死锁避免应防止由于某些原因(如历程瓦解或网络问题)导致的死锁情况。这通常通过设置锁的过期时间来实现,确保锁在持有者不测失联时可以大概主动开释。容错性纵然某些节点发生故障或网络分区,锁机制依然可以大概正确运行,或在适当的时间内恢复正常。这意味着锁的状态在多个节点之间要保持一致,且系统具备肯定的自愈能力。 3. 基于数据库

基于数据库的分布式锁是利用数据库的特性来实现的一种简单而常见的分布式锁机制。通过对数据库记录的插入、更新或删除操作,确保在同一时间只有一个历程可以大概持有锁,从而实现对共享资源的互斥访问。
3.1 利用表记录实现分布式锁

原理:利用表记录实现分布式锁的焦点思想是通过数据库表的一行记录来充当锁的标识。每次哀求想要获取锁时,会尝试在表中插入一条特定的记录。如果插入乐成,则表示获得了锁;如果插入失败(如违反了唯一性束缚),则表示锁已被其他哀求持有。
步骤

  • 创建一张专门用于锁定的表,表中包含一个锁名(或资源名)字段和一个锁定状态字段。
  • 当一个历程需要获取锁时,尝试向该表中插入一条记录。如果插入乐成,则表示获取锁乐成。
  • 如果记录已存在(利用主键辩说,唯一束缚实现大概更加灵活,可以应用在表的任何字段上),表示锁已被其他历程持有,获取锁失败。
  • 任务完成后,持锁历程删除该记录以开释锁。
除了锁表,我们还需要一个商品库存表模仿秒杀。
  1. create database lock_;
  2. use lock_;
  3. CREATE TABLE product
  4. (
  5.     product_id   INT PRIMARY KEY,
  6.     product_name VARCHAR(255),
  7.     stock        INT -- 库存量
  8. );
  9. -- 初始化库存为5
  10. INSERT INTO product (product_id, product_name, stock)
  11. VALUES (1, '大白菜', 5);
  12. -- 分布式锁表
  13. CREATE TABLE distributed_lock (
  14.     lock_name VARCHAR(255) PRIMARY KEY,  -- 锁的名称
  15.     lock_owner VARCHAR(255),             -- 锁的持有者标识
  16.     lock_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP -- 锁定时间
  17. );
复制代码
3.1.1 项目结构


3.1.2 pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4.     <modelVersion>4.0.0</modelVersion>
  5.     <parent>
  6.         <groupId>org.springframework.boot</groupId>
  7.         <artifactId>spring-boot-starter-parent</artifactId>
  8.         <version>3.3.2</version>
  9.         <relativePath/> <!-- lookup parent from repository -->
  10.     </parent>
  11.     <groupId>org.example</groupId>
  12.     <artifactId>lock</artifactId>
  13.     <version>0.0.1-SNAPSHOT</version>
  14.     <properties>
  15.         <java.version>17</java.version>
  16.     </properties>
  17.     <dependencies>
  18.         <dependency>
  19.             <groupId>org.springframework.boot</groupId>
  20.             <artifactId>spring-boot-starter-web</artifactId>
  21.         </dependency>
  22.         <dependency>
  23.             <groupId>org.mybatis.spring.boot</groupId>
  24.             <artifactId>mybatis-spring-boot-starter</artifactId>
  25.             <version>3.0.3</version>
  26.         </dependency>
  27.         <dependency>
  28.             <groupId>com.mysql</groupId>
  29.             <artifactId>mysql-connector-j</artifactId>
  30.             <version>8.3.0</version>
  31.         </dependency>
  32.         <dependency>
  33.             <groupId>org.projectlombok</groupId>
  34.             <artifactId>lombok</artifactId>
  35.             <optional>true</optional>
  36.         </dependency>
  37.         
  38.     </dependencies>
  39.     <build>
  40.         <plugins>
  41.             <plugin>
  42.                 <groupId>org.springframework.boot</groupId>
  43.                 <artifactId>spring-boot-maven-plugin</artifactId>
  44.                 <configuration>
  45.                     <excludes>
  46.                         <exclude>
  47.                             <groupId>org.projectlombok</groupId>
  48.                             <artifactId>lombok</artifactId>
  49.                         </exclude>
  50.                     </excludes>
  51.                 </configuration>
  52.             </plugin>
  53.         </plugins>
  54.     </build>
  55. </project>
复制代码
3.1.3 application.yml

  1. spring:
  2.   application:
  3.     name: lock
  4.   datasource:
  5.     driver-class-name: com.mysql.cj.jdbc.Driver
  6.     url: jdbc:mysql://localhost:3306/lock_?useSSL=false&serverTimezone=UTC
  7.     username: root
  8.     password: 123456
  9. mybatis:
  10.   configuration:
  11.     map-underscore-to-camel-case: true
  12. server:
  13.   port: 8001
复制代码
3.1.4 LockApplication.java

  1. package org.example;
  2. import org.mybatis.spring.annotation.MapperScan;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. @SpringBootApplication
  6. @MapperScan("org.example.mapper")
  7. public class LockApplication {
  8.     public static void main(String[] args) {
  9.         SpringApplication.run(LockApplication.class, args);
  10.     }
  11. }
复制代码
3.1.5 Product.java

  1. package org.example.model;
  2. import lombok.Data;
  3. @Data
  4. public class Product {
  5.     private Integer productId;
  6.     private String productName;
  7.     private Integer stock;
  8. }
复制代码
3.1.6 DistributedLock.java

  1. package org.example.model;
  2. import java.util.Date;
  3. import lombok.Data;
  4. @Data
  5. public class DistributedLock {
  6.     private String lockName;
  7.     private String lockOwner;
  8.     private Date lockTime;
  9. }
复制代码
3.1.7 ProductMapper.java

  1. package org.example.mapper;
  2. import org.apache.ibatis.annotations.Select;
  3. import org.apache.ibatis.annotations.Update;
  4. import org.springframework.stereotype.Repository;
  5. @Repository
  6. public interface ProductMapper {
  7.     @Select("SELECT stock FROM product WHERE product_id = #{productId}")
  8.     int  queryStock(Integer productId);
  9.     @Update("UPDATE product SET stock = stock - 1 WHERE product_id = #{productId}")
  10.     int updateStock(Integer productId);
  11. }
复制代码
3.1.8 DistributedLockMapper.java

  1. package org.example.mapper;
  2. import org.apache.ibatis.annotations.Delete;
  3. import org.apache.ibatis.annotations.Insert;
  4. import org.springframework.stereotype.Repository;
  5. @Repository
  6. public interface DistributedLockMapper {
  7.     @Insert("INSERT INTO distributed_lock (lock_name, lock_owner) VALUES (#{lockName}, #{lockOwner})")
  8.     int insert(String lockName, String lockOwner);
  9.     @Delete("DELETE FROM distributed_lock WHERE lock_name = #{lockName} AND lock_owner = #{lockOwner}")
  10.     int delete(String lockName, String lockOwner);
  11. }
复制代码
3.1.9 DistributedLockService.java

  1. package org.example.service;
  2. import org.example.mapper.DistributedLockMapper;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Service;
  5. @Service
  6. public class DistributedLockService {
  7.     private final DistributedLockMapper distributedLockMapper;
  8.     @Autowired
  9.     public DistributedLockService(DistributedLockMapper distributedLockMapper) {
  10.         this.distributedLockMapper = distributedLockMapper;
  11.     }
  12.     // 尝试获取锁
  13.     public boolean tryLock(String lockName, String lockOwner) {
  14.         try {
  15.             return distributedLockMapper.insert(lockName, lockOwner) > 0;
  16.         } catch (Exception e) {
  17.             return false;
  18.         }
  19.     }
  20.     // 释放锁
  21.     public boolean unlock(String lockName, String lockOwner) {
  22.         try {
  23.             return distributedLockMapper.delete(lockName, lockOwner) > 0;
  24.         } catch (Exception e) {
  25.             return false;
  26.         }
  27.     }
  28. }
复制代码
3.1.10 ProductService.java

  1. package org.example.service;
  2. import org.example.mapper.ProductMapper;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Service;
  5. @Service
  6. public class ProductService {
  7.     private final ProductMapper productMapper;
  8.     @Autowired
  9.     public ProductService(ProductMapper productMapper) {
  10.         this.productMapper = productMapper;
  11.     }
  12.     // 扣减库存
  13.     public boolean reduceStock(int productId) {
  14.             // 查询库存
  15.             int stock = productMapper.queryStock(productId);
  16.             if (stock > 0) {
  17.                 // 扣减库存
  18.                 productMapper.updateStock(productId);
  19.                 System.out.println("库存扣减成功,剩余库存: " + (stock - 1));
  20.                 return true;
  21.             }
  22.             System.out.println("库存不足,无法扣减。");
  23.             return false;
  24.     }
  25. }
复制代码
3.1.11 DistributedLockController.java

  1. package org.example.controller;
  2. import org.example.service.DistributedLockService;
  3. import org.example.service.ProductService;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. import java.util.UUID;
  8. @RestController
  9. public class DistributedLockController {
  10.     @Autowired
  11.     private DistributedLockService distributedLockService;
  12.     @Autowired
  13.     private ProductService productService;
  14.     @GetMapping("/order")
  15.     public void pay() {
  16.         UUID uuid = UUID.randomUUID();
  17.         String lockName = "product_1";
  18.         String lockOwner = "user_" + uuid;
  19.         boolean lockAcquired = false;
  20.         int retryCount = 0;
  21.         int maxRetries = 10; // 重试次数
  22.         while (!lockAcquired && retryCount < maxRetries) {
  23.             lockAcquired = distributedLockService.tryLock(lockName, lockOwner);
  24.             if (lockAcquired) {
  25.                 System.out.println(lockOwner + " 成功获取锁,尝试扣减库存...");
  26.                 try {
  27.                     productService.reduceStock(1);
  28.                 } finally {
  29.                     if (distributedLockService.unlock(lockName, lockOwner)) {
  30.                         System.out.println(lockOwner + " 成功释放锁。");
  31.                     } else {
  32.                         System.out.println(lockOwner + " 释放锁失败。");
  33.                     }
  34.                 }
  35.             } else {
  36.                 retryCount++;
  37.                 System.out.println(lockOwner + " 获取锁失败,重试中... (" + retryCount + "/" + maxRetries + ")");
  38.             }
  39.         }
  40.         if (!lockAcquired) {
  41.             System.out.println(lockOwner + " 最终未能获取锁,放弃。");
  42.         }
  43.     }
  44. }
复制代码
3.1.12 测试验证

条件预备:修改目录3.1.3的服务端口,先启动8001和8002两个不同的端口代表两台不同的服务器,等候两台服务器启动完成后打开Apache JMeter工具,创建一个线程组100个用户,10s内启动所有用户,同时创建两个不同的HTTP端口哀求。


说明:Copy然后另一个HTTP Request改端口为8002即可。
运行效果8001:
   user_8001 乐成获取锁,尝试扣减库存…
库存扣减乐成,剩余库存: 4
user_8001 乐成开释锁。
user_8001 乐成获取锁,尝试扣减库存…
库存扣减乐成,剩余库存: 2
user_8001 乐成开释锁。
user_8001 乐成获取锁,尝试扣减库存…
库存扣减乐成,剩余库存: 0
user_8001 乐成开释锁。
  user_8001 乐成获取锁,尝试扣减库存…
库存不足,无法扣减。
user_8001 乐成开释锁。
  …
  运行效果8002:
   user_8002 乐成获取锁,尝试扣减库存…
库存扣减乐成,剩余库存: 3
user_8002 乐成开释锁。
user_8002 乐成获取锁,尝试扣减库存…
库存扣减乐成,剩余库存: 1
user_8002 乐成开释锁。
user_8002 乐成获取锁,尝试扣减库存…
库存不足,无法扣减。
user_8002 乐成开释锁。
  …
  说明:在高并发情况下,锁的争取会导致部门用户未能乐成获取锁,造成库存无法完全斲丧的情况。这种设计虽然确保了数据的一致性,但在极端并发场景下,大概会导致一些哀求被拒绝.以是引入了重试机制举行优化。
3.2 利用乐观锁 (Optimistic Lock) 实现分布式锁

**原理:**乐观锁基于“乐观”的假设,认为并发辩说的大概性较小,因此在更新数据时不直接加锁,而是通过版本号或时间戳等机制来检测数据是否被其他事务修改过。如果在提交时检测到辩说(版本号变革),则放弃本次操作,并要求重试。


  • 步骤

    • 创建一张锁表,包含锁名、锁定状态以及一个版本号或时间戳字段。
    • 获取锁时,通过更新操作将锁的状态改变,并检查版本号或时间戳,确保操作是原子性的。
    • 如果版本号不匹配,表示锁已被其他历程持有,获取锁失败。


  • 任务完成后,历程更新锁的状态并修改版本号或时间戳。
  1. CREATE TABLE product
  2. (
  3.     product_id INT PRIMARY KEY,
  4.     stock      INT,
  5.     version    INT  default 0 -- 乐观锁的版本号
  6. );
  7. -- 初始化库存为5
  8. INSERT INTO product (product_id, stock)
  9. VALUES (1, 5);
复制代码
3.2.1 项目结构


3.2.2 pom.xml

同:目录3.1.2完全一致。
3.2.3 application.yml

同:目录3.1.3完全一致。
3.2.4 LockApplication.java

同:目录3.1.4完全一致。
3.2.5 Product.java

  1. package org.example.model;
  2. import lombok.Data;
  3. @Data
  4. public class Product {
  5.     private Integer productId;
  6.     private Integer stock;
  7.     private Integer version;
  8. }
复制代码
3.2.6 ProductStock.java

  1. package org.example.model;
  2. import lombok.Data;
  3. @Data
  4. public class ProductStock {
  5.     private Integer stock;
  6.     private Integer version;
  7. }
复制代码
3.2.7 ProductMapper.java

  1. package org.example.mapper;
  2. import org.apache.ibatis.annotations.Select;
  3. import org.apache.ibatis.annotations.Update;
  4. import org.example.model.ProductStock;
  5. import org.springframework.stereotype.Repository;
  6. @Repository
  7. public interface ProductMapper {
  8.     // 获取当前库存和版本号
  9.     @Select("SELECT stock,version FROM product WHERE product_id = #{productId}")
  10.     ProductStock queryStock(Integer productId);
  11.     // 更新库存并更新版本号
  12.     @Update("UPDATE product SET stock = stock - 1 , version = version + 1 WHERE product_id = #{productId} AND version =#{version} AND stock > 0")
  13.     int updateStock(Integer productId,Integer version);
  14. }
复制代码
3.2.8 ProductService.java

  1. package org.example.service;
  2. import org.example.mapper.ProductMapper;
  3. import org.example.model.ProductStock;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.stereotype.Service;
  7. @Service
  8. public class ProductService {
  9.     private final ProductMapper productMapper;
  10.     @Autowired
  11.     public ProductService(ProductMapper productMapper) {
  12.         this.productMapper = productMapper;
  13.     }
  14.     @Value("${server.port}")
  15.     private String port;
  16.     // 扣减库存
  17.     public boolean reduceStock(int productId) {
  18.         // 获取当前库存和版本号
  19.         ProductStock stock = productMapper.queryStock(productId);
  20.         // 检查库存是否足够
  21.         if (stock.getStock()<=0) {
  22.             System.out.println("库存不足,扣减失败");
  23.             return false;
  24.         }
  25.         // 尝试更新库存并更新版本号
  26.         if (productMapper.updateStock(productId,stock.getVersion())==0) {
  27.             // 如果更新失败,说明版本号不一致,乐观锁冲突
  28.             System.out.println(port+"-乐观锁冲突,扣减库存失败,可能其他线程已经修改了数据");
  29.             return false;
  30.         }
  31.         System.out.println(port+"-库存扣减成功,剩余库存: " + (stock.getStock()-1));
  32.         return true;
  33.     }
  34. }
复制代码
3.2.9 ProductController.java

  1. package org.example.controller;
  2. import org.example.service.ProductService;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. @RestController
  7. public class ProductController {
  8.     @Autowired
  9.     private ProductService productService;
  10.     @GetMapping("/order")
  11.     public String order() {
  12.         final int MAX_RETRIES = 3; // 设置最大重试次数
  13.         for (int i = 0; i < MAX_RETRIES; i++) {
  14.             boolean success = productService.reduceStock(1);
  15.             if (success) {
  16.                 return "库存扣减成功";
  17.             } else {
  18.                 System.out.println("第 " + (i + 1) + " 次扣减库存失败,尝试重试...");
  19.             }
  20.         }
  21.         return "扣减库存失败,超出最大重试次数";
  22.     }
  23. }
复制代码
3.2.10 测试验证

条件预备:同目录3.1.12完全一致。
运行效果8001:
   8001-库存扣减乐成,剩余库存: 4
8001-库存扣减乐成,剩余库存: 2
8001-库存扣减乐成,剩余库存: 0
库存不足,扣减失败
第 1 次扣减库存失败,尝试重试…
库存不足,扣减失败
第 2 次扣减库存失败,尝试重试…
库存不足,扣减失败
第 3 次扣减库存失败,尝试重试…
库存不足,扣减失败
  …
  运行效果8002:
   8002-库存扣减乐成,剩余库存: 3
8002-库存扣减乐成,剩余库存: 1
库存不足,扣减失败
第 1 次扣减库存失败,尝试重试…
库存不足,扣减失败
第 2 次扣减库存失败,尝试重试…
库存不足,扣减失败
第 3 次扣减库存失败,尝试重试…
库存不足,扣减失败
  …
  

3.3 利用悲观锁(Pessimistic Lock)实现分布式锁

原理:
悲观锁(Pessimistic Lock)是一种基于“悲观”假设的锁机制,即假设每次对数据的操作都会发生并发辩说,因此在操作数据之前必须先对其举行加锁,防止其他事务或线程对数据举行并发操作。如许可以确保在加锁期间,只有获得锁的历程或线程可以大概访问数据,从而避免数据的不一致性。
开始事务: 当方法用@Transactional注解标记时,Spring会在方法开始时开启一个事务。
获取悲观锁:在事务内执行查询时,如果利用了悲观锁(如SELECT ... FOR UPDATE),数据库会锁定相干数据行。
事务提交:当方法执行完毕且没有发生异常时,事务提交,锁会被开释,其他等候的事务可以继续执行。
事务回滚:如果在事务期间发生了异常,事务回滚,锁也会被开释,确保数据的原子性和一致性。
  1. CREATE TABLE product
  2. (
  3.     product_id INT PRIMARY KEY,
  4.     stock      INT
  5. );
  6. -- 初始化库存为5
  7. INSERT INTO product (product_id, stock)
  8. VALUES (1, 5);
复制代码
3.3.1 项目结构

同:目录3.2.1项目完全一致。以下说明不同的类。
3.3.2 Product.java

  1. package org.example.model;
  2. import lombok.Data;
  3. @Data
  4. public class Product {
  5.     private Integer productId;
  6.     private Integer stock;
  7. }
复制代码
3.3.3 ProductMapper.java

  1. package org.example.mapper;
  2. import org.apache.ibatis.annotations.Select;
  3. import org.apache.ibatis.annotations.Update;
  4. import org.example.model.Product;
  5. import org.springframework.stereotype.Repository;
  6. @Repository
  7. public interface ProductMapper {
  8.     // 使用悲观锁获取当前库存
  9.     @Select("SELECT stock FROM product WHERE product_id = #{productId} FOR UPDATE")
  10.     Product queryStockWithLock(Integer productId);
  11.     // 更新库存
  12.     @Update("UPDATE product SET stock = stock - 1 WHERE product_id = #{productId} AND stock > 0")
  13.     int updateStock(Integer productId);
  14. }
复制代码
3.3.4 ProductService.java

  1. package org.example.service;
  2. import org.example.mapper.ProductMapper;
  3. import org.example.model.Product;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.stereotype.Service;
  7. import org.springframework.transaction.annotation.Transactional;
  8. @Service
  9. public class ProductService {
  10.     private final ProductMapper productMapper;
  11.     @Autowired
  12.     public ProductService(ProductMapper productMapper) {
  13.         this.productMapper = productMapper;
  14.     }
  15.     @Value("${server.port}")
  16.     private String port;
  17.     // 使用悲观锁扣减库存
  18.     @Transactional
  19.     public boolean reduceStock(int productId) {
  20.         System.out.println(port+"-开始尝试获取悲观锁,锁定库存记录...");
  21.         // 使用悲观锁获取当前库存
  22.         Product stock = productMapper.queryStockWithLock(productId);
  23.         // 悲观锁已生效,此时其他事务无法修改当前行
  24.         System.out.println(port+"-已成功获取悲观锁,库存记录已锁定。当前库存:" + stock.getStock());
  25.         // 检查库存是否足够
  26.         if (stock.getStock() <= 0) {
  27.             System.out.println("库存不足,扣减失败");
  28.             return false;
  29.         }
  30.         // 更新库存
  31.         int result = productMapper.updateStock(productId);
  32.         if (result == 0) {
  33.             System.out.println("库存扣减失败");
  34.             return false;
  35.         }
  36.         System.out.println("库存扣减成功,剩余库存: " + (stock.getStock() - 1));
  37.         return true;
  38.     }
  39. }
复制代码
3.3.5 测试验证

条件预备:同目录3.1.12完全一致。
运行效果8001:
   8001-开始尝试获取悲观锁,锁定库存记录…
8001-已乐成获取悲观锁,库存记录已锁定。当前库存:5
库存扣减乐成,剩余库存: 4
8001-开始尝试获取悲观锁,锁定库存记录…
8001-已乐成获取悲观锁,库存记录已锁定。当前库存:3
库存扣减乐成,剩余库存: 2
8001-开始尝试获取悲观锁,锁定库存记录…
8001-已乐成获取悲观锁,库存记录已锁定。当前库存:1
库存扣减乐成,剩余库存: 0
8001-开始尝试获取悲观锁,锁定库存记录…
  …
  运行效果8002:
   8002-开始尝试获取悲观锁,锁定库存记录…
8002-已乐成获取悲观锁,库存记录已锁定。当前库存:4
库存扣减乐成,剩余库存: 3
8002-开始尝试获取悲观锁,锁定库存记录…
8002-已乐成获取悲观锁,库存记录已锁定。当前库存:2
库存扣减乐成,剩余库存: 1
8002-开始尝试获取悲观锁,锁定库存记录…
8002-已乐成获取悲观锁,库存记录已锁定。当前库存:0
库存不足,扣减失败
第 1 次扣减库存失败,尝试重试…
  …
  4.基于 Redis

4.1 安装Redis

4.1.1 拉取 Redis 官方镜像

  1. docker pull redis:latest
复制代码
4.1.2 启动 Redis 容器并设置用户名和密码

  1. docker run -d --name redis \
  2.     -p 6379:6379 \
  3.     redis:latest \
  4.     --requirepass "123456" #设置密码
复制代码
4.1.3 验证 Redis 服务

进入redis容器
  1. docker exec -it redis /bin/bash
复制代码
通过 Redis CLI 客户端连接到 Redis 服务器:
  1. redis-cli -h 127.0.0.1 -p 6379
复制代码
在 Redis 命令行界面中手动输入密码举行验证
  1. AUTH 123456 #验证密码
复制代码
乐成连接后,可以通过运行简单的 Redis 命令来验证连接是否乐成:
  1. 127.0.0.1:6379> ping
复制代码
如果返回 PONG,表示连接乐成。
4.2 Redis 分布式锁的实现思路

Redis 分布式锁的焦点在于:

  • 获取锁:利用 SETNX 命令尝试设置锁,如果设置乐成则获取锁。
  • 设置过期时间:利用 Redis 的 EXPIRE 或 SET 命令设置锁的过期时间,避免死锁。
  • 开释锁:业务完成后开释锁,确保只有持有锁的线程可以开释它。
  1. CREATE TABLE product
  2. (
  3.     product_id INT PRIMARY KEY,
  4.     stock      INT
  5. );
  6. -- 初始化库存为5
  7. INSERT INTO product (product_id, stock)
  8. VALUES (1, 5);
复制代码
4.2.1 项目结构


4.2.2 pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4.     <modelVersion>4.0.0</modelVersion>
  5.     <parent>
  6.         <groupId>org.springframework.boot</groupId>
  7.         <artifactId>spring-boot-starter-parent</artifactId>
  8.         <version>3.3.2</version>
  9.         <relativePath/> <!-- lookup parent from repository -->
  10.     </parent>
  11.     <groupId>org.example</groupId>
  12.     <artifactId>lock</artifactId>
  13.     <version>0.0.1-SNAPSHOT</version>
  14.     <properties>
  15.         <java.version>17</java.version>
  16.     </properties>
  17.     <dependencies>
  18.         <dependency>
  19.             <groupId>org.springframework.boot</groupId>
  20.             <artifactId>spring-boot-starter-web</artifactId>
  21.         </dependency>
  22.         <dependency>
  23.             <groupId>org.mybatis.spring.boot</groupId>
  24.             <artifactId>mybatis-spring-boot-starter</artifactId>
  25.             <version>3.0.3</version>
  26.         </dependency>
  27.         <dependency>
  28.             <groupId>com.mysql</groupId>
  29.             <artifactId>mysql-connector-j</artifactId>
  30.             <version>8.3.0</version>
  31.         </dependency>
  32.         <dependency>
  33.             <groupId>org.projectlombok</groupId>
  34.             <artifactId>lombok</artifactId>
  35.             <optional>true</optional>
  36.         </dependency>
  37.         <dependency>
  38.             <groupId>org.springframework.boot</groupId>
  39.             <artifactId>spring-boot-starter-data-redis</artifactId>
  40.         </dependency>
  41.     </dependencies>
  42.     <build>
  43.         <plugins>
  44.             <plugin>
  45.                 <groupId>org.springframework.boot</groupId>
  46.                 <artifactId>spring-boot-maven-plugin</artifactId>
  47.                 <configuration>
  48.                     <excludes>
  49.                         <exclude>
  50.                             <groupId>org.projectlombok</groupId>
  51.                             <artifactId>lombok</artifactId>
  52.                         </exclude>
  53.                     </excludes>
  54.                 </configuration>
  55.             </plugin>
  56.         </plugins>
  57.     </build>
  58. </project>
复制代码
4.2.3 application.yml

  1. spring:
  2.   application:
  3.     name: lock
  4.   datasource:
  5.     driver-class-name: com.mysql.cj.jdbc.Driver
  6.     url: jdbc:mysql://localhost:3306/lock_?useSSL=false&serverTimezone=UTC
  7.     username: root
  8.     password: 123456
  9.   data:
  10.     redis:
  11.       host: 192.168.186.77
  12.       port: 6379
  13.       password: 123456
  14. mybatis:
  15.   configuration:
  16.     map-underscore-to-camel-case: true
  17. server:
  18.   port: 8001
复制代码
4.2.4 LockApplication.java

  1. package org.example;
  2. import org.mybatis.spring.annotation.MapperScan;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. @SpringBootApplication
  6. @MapperScan("org.example.mapper")
  7. public class LockApplication {
  8.     public static void main(String[] args) {
  9.         SpringApplication.run(LockApplication.class, args);
  10.     }
  11. }
复制代码
4.2.5 RedisConfig.java

  1. package org.example.config;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.data.redis.connection.RedisConnectionFactory;
  6. import org.springframework.data.redis.core.RedisTemplate;
  7. import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
  8. import org.springframework.data.redis.serializer.StringRedisSerializer;
  9. @Configuration
  10. public class RedisConfig {
  11.     @Bean
  12.     public RedisTemplate<String, Object> redisTemplate(@Autowired RedisConnectionFactory redisConnectionFactory) {
  13.         RedisTemplate<String, Object> template = new RedisTemplate<>();
  14.         template.setConnectionFactory(redisConnectionFactory);
  15.         // 使用StringRedisSerializer来序列化和反序列化redis的key
  16.         template.setKeySerializer(new StringRedisSerializer());
  17.         // 使用GenericJackson2JsonRedisSerializer来序列化和反序列化redis的value
  18.         template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
  19.         // 同样设置HashKey和HashValue序列化方式
  20.         template.setHashKeySerializer(new StringRedisSerializer());
  21.         template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
  22.         template.afterPropertiesSet();
  23.         return template;
  24.     }
  25. }
复制代码
4.2.6 Product.java

  1. package org.example.model;
  2. import lombok.Data;
  3. @Data
  4. public class Product {
  5.     private Integer productId;
  6.     private Integer stock;
  7. }
复制代码
4.2.7 ProductMapper.java

  1. package org.example.mapper;
  2. import org.apache.ibatis.annotations.Select;
  3. import org.apache.ibatis.annotations.Update;
  4. import org.springframework.stereotype.Repository;
  5. @Repository
  6. public interface ProductMapper {
  7.     @Select("SELECT stock FROM product WHERE product_id = #{productId} FOR UPDATE")
  8.     int getStockByProductId(int productId);
  9.     @Update("UPDATE product SET stock =stock-1 WHERE product_id = #{productId}")
  10.     void updateProductStock(int productId);
  11. }
复制代码
4.2.8 ProductService.java

  1. package org.example.service;
  2. import org.example.mapper.ProductMapper;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Service;
  5. @Service
  6. public class ProductService {
  7.     @Autowired
  8.     private ProductMapper productMapper;
  9.     public boolean reduceStock(int productId) {
  10.         // 获取当前库存
  11.         int stock = productMapper.getStockByProductId(productId);
  12.         if (stock > 0) {
  13.             // 扣减库存
  14.             productMapper.updateProductStock(productId);
  15.             System.out.println( "库存扣减成功,剩余库存:" + (stock-1));
  16.             return true;
  17.         } else {
  18.             return false;
  19.         }
  20.     }
  21. }
复制代码
4.2.9 RedisDistributedLockService.java

  1. package org.example.service;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.data.redis.core.RedisTemplate;
  5. import org.springframework.stereotype.Service;
  6. import java.util.UUID;
  7. import java.util.concurrent.TimeUnit;
  8. @Service
  9. public class RedisDistributedLockService {
  10.     @Autowired
  11.     private RedisTemplate<String, Object> redisTemplate;
  12.     @Autowired
  13.     private ProductService productService;
  14.     private static final String LOCK_KEY_PREFIX = "distributed_lock_";
  15.     @Value("${server.port}")
  16.     private String port;
  17.     public boolean acquireLock(String lockKey, String clientId, long expireTime) {
  18.         Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, clientId);
  19.         if (Boolean.TRUE.equals(success)) {
  20.             redisTemplate.expire(lockKey, expireTime, TimeUnit.SECONDS);
  21.             return true;
  22.         }
  23.         return false;
  24.     }
  25.     public void releaseLock(String lockKey, String clientId) {
  26.         String currentValue = (String) redisTemplate.opsForValue().get(lockKey);
  27.         if (clientId.equals(currentValue)) {
  28.             redisTemplate.delete(lockKey);
  29.         }
  30.     }
  31.     public void execute() {
  32.         String lockKey = LOCK_KEY_PREFIX+1;
  33.         String clientId = UUID.randomUUID().toString();
  34.         long expireTime = 10;
  35.         try {
  36.             boolean lockAcquired = acquireLock(lockKey, clientId, expireTime);
  37.             if (lockAcquired) {
  38.                 System.out.println(port + " - 成功获取锁,执行任务。");
  39.                 // 扣减库存
  40.                 boolean success = productService.reduceStock(1); // 假设每次扣减1个库存
  41.                 if(!success) {
  42.                     System.out.println("库存不足。");
  43.                 }
  44.             } else {
  45.                 System.out.println(port + " - 未能获取锁,任务已被其他节点处理。");
  46.             }
  47.         } finally {
  48.             releaseLock(lockKey, clientId);
  49.         }
  50.     }
  51. }
复制代码
4.2.10 RedisDistributedLockController.java

  1. package org.example.controller;
  2. import org.example.service.RedisDistributedLockService;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. @RestController
  7. public class RedisDistributedLockController {
  8.     @Autowired
  9.     private RedisDistributedLockService redisDistributedLockService;
  10.     @GetMapping("/order")
  11.     public String executeTask() {
  12.         redisDistributedLockService.execute();
  13.         return "任务请求已提交";
  14.     }
  15. }
复制代码
4.2.11 测试验证

条件预备:同目录3.1.12完全一致。
运行效果8001:
   8001 - 乐成获取锁,执行任务。
库存扣减乐成,剩余库存:4
8001 - 乐成获取锁,执行任务。
库存扣减乐成,剩余库存:2
8001 - 乐成获取锁,执行任务。
库存扣减乐成,剩余库存:0
8001 - 乐成获取锁,执行任务。
库存不足。
  …
  运行效果8002:
   8002 - 乐成获取锁,执行任务。
库存扣减乐成,剩余库存:3
8002 - 乐成获取锁,执行任务。
库存扣减乐成,剩余库存:1
8002 - 乐成获取锁,执行任务。
库存不足。
  …
  4.3 Redis+悲观锁实现秒杀(适合单一节点)

4.3.1 实现原理

1. Redis


  • 目的:利用 Redis 的高性能来快速处理惩罚库存的扣减操作,减少对数据库的直接访问,从而提升系统的并发处理惩罚能力。
  • 过程:在应用启动时,将数据库中的库存数据加载到 Redis 中。秒杀时,所有的库存操作起首在 Redis 中举行,如许可以明显减少数据库的压力。
2. Redis 原子操作 (decrement)


  • 目的:确保在并发情况下,多个线程对同一个库存的扣减操作不会发生辩说,从而防止超卖。
  • 过程:每次秒杀哀求到达时,直接通过 Redis 的 decrement 操作原子性地减少库存。如果库存不足(stock < 0),则直接返回失败。
3. 数据库的悲观锁


  • 目的:进一步确保数据库中的库存数据与 Redis 中的数据一致,防止并发情况下的库存不一致问题。
  • 过程:如果 Redis 中的库存扣减乐成,则利用悲观锁(SELECT ... FOR UPDATE)在数据库中锁定库存行,举行库存更新。悲观锁确保在锁定的库存更新完成之前,其他事务无法修改该库存数据。
4.回滚机制


  • 目的:确保在任何异常情况下,Redis 和数据库的库存数据一致。
  • 过程:如果在数据库操作中发现库存不足,大概在执行过程中发生异常,会回滚 Redis 中的库存操作(即通过 increment 恢复 Redis 库存),并抛出异常或返回操作失败。
  1. CREATE TABLE product
  2. (
  3.     product_id INT PRIMARY KEY,
  4.     stock      INT
  5. );
  6. -- 初始化库存为5
  7. INSERT INTO product (product_id, stock)
  8. VALUES (1, 5);
  9. CREATE TABLE orders(    order_id   INT PRIMARY KEY AUTO_INCREMENT,    product_id INT,    time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
复制代码
4.3.2 项目结构


4.3.3 pom.xml

同:目录4.2.2完全一致。
4.3.4 application.yml

同:目录4.2.3完全一致。
4.3.5 LockApplication.java

同:目录4.2.4完全一致。
4.3.7 RedisConfig.java

同:目录4.2.5完全一致。
4.3.6 Product.java

同:目录4.2.6完全一致。
4.3.7 OrderMapper.java

  1. package org.example.mapper;
  2. import org.apache.ibatis.annotations.Insert;
  3. import org.springframework.stereotype.Repository;
  4. @Repository
  5. public interface OrderMapper {
  6.     @Insert("INSERT INTO orders (product_id) VALUES (#{productId})")
  7.     void insertOrder(int productId);
  8. }
复制代码
4.3.8 ProductMapper.java

  1. package org.example.mapper;
  2. import org.apache.ibatis.annotations.Select;
  3. import org.apache.ibatis.annotations.Update;
  4. import org.example.model.Product;
  5. import org.springframework.stereotype.Repository;
  6. import java.util.List;
  7. @Repository
  8. public interface ProductMapper {
  9.     @Select("SELECT * FROM product")
  10.     List<Product> getAllProducts();
  11.     @Select("SELECT * FROM product WHERE product_id = #{productId} FOR UPDATE")
  12.     Product getStockByProductId(int productId);
  13.     @Update("UPDATE product SET stock =stock-1 WHERE product_id = #{productId}")
  14.     void updateProductStock(int productId);
  15. }
复制代码
4.3.9 ProductService.java

  1. package org.example.service;
  2. import jakarta.annotation.PostConstruct;
  3. import org.example.mapper.OrderMapper;
  4. import org.example.mapper.ProductMapper;
  5. import org.example.model.Product;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.data.redis.core.RedisTemplate;
  8. import org.springframework.stereotype.Service;
  9. import org.springframework.transaction.annotation.Transactional;
  10. import java.util.List;
  11. @Service
  12. public class ProductService {
  13.     @Autowired
  14.     private RedisTemplate<String, Object> redisTemplate;
  15.     @Autowired
  16.     private ProductMapper productMapper;
  17.     @Autowired
  18.     private OrderMapper orderMapper;
  19.     private static final String PRODUCT_STOCK_KEY = "product_stock_";
  20.     // 初始化时从数据库加载库存到 Redis
  21.     @PostConstruct
  22.     public void loadProductStockToRedis() {
  23.         List<Product> products = productMapper.getAllProducts();
  24.         for (Product product : products) {
  25.             redisTemplate.opsForValue().set(PRODUCT_STOCK_KEY + product.getProductId(), product.getStock());
  26.             System.out.println("已加载商品ID:" + product.getProductId() + " 的库存到 Redis,库存为:" + product.getStock());
  27.         }
  28.     }
  29.     @Transactional
  30.     public boolean reduceStock(int productId) {
  31.         String stockKey = PRODUCT_STOCK_KEY + productId;
  32.         // 1. 从 Redis 中扣减库存,确保原子操作
  33.         Long stock = redisTemplate.opsForValue().decrement(stockKey);
  34.         if (stock == null || stock < 0) {
  35.             // 如果库存不足或扣减失败,回滚 Redis 库存并返回失败
  36.             redisTemplate.opsForValue().increment(stockKey);
  37.             System.out.println("秒杀失败!商品ID:" + productId + ",库存不足。");
  38.             return false;
  39.         }
  40.         // 2. 使用数据库的悲观锁检查并扣减库存
  41.         try {
  42.             Product product = productMapper.getStockByProductId(productId);
  43.             if (product.getStock() >= 1) {
  44.                 // 更新数据库库存
  45.                 productMapper.updateProductStock(productId);
  46.                 // 创建订单
  47.                 orderMapper.insertOrder(productId);
  48.                 System.out.println("秒杀成功!商品ID:" + productId + ",剩余库存:" + stock);
  49.                 return true;
  50.             } else {
  51.                 // 如果数据库库存不足,回滚 Redis 库存并返回失败
  52.                 redisTemplate.opsForValue().increment(stockKey);
  53.                 System.out.println("秒杀失败!商品ID:" + productId + ",数据库库存不足。");
  54.                 return false;
  55.             }
  56.         } catch (Exception e) {
  57.             // 发生异常时回滚 Redis 库存并抛出异常
  58.             redisTemplate.opsForValue().increment(stockKey);
  59.             throw e;
  60.         }
  61.     }
  62. }
复制代码
4.3.10 ProductController.java

  1. package org.example.controller;
  2. import org.example.service.ProductService;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. @RestController
  7. public class ProductController {
  8.     @Autowired
  9.     private ProductService productService;
  10.     @GetMapping("/order")
  11.     public String order() {
  12.         boolean success = productService.reduceStock(1);
  13.         return success ? "秒杀成功" : "秒杀失败";
  14.     }
  15. }
复制代码
4.3.12 测试验证

条件预备:修改商品的stock(余额)为10,举行秒杀模仿。
运行效果:
   已加载商品ID:1 的库存到 Redis,库存为:10
秒杀乐成!商品ID:1,剩余库存:3
秒杀乐成!商品ID:1,剩余库存:8
秒杀乐成!商品ID:1,剩余库存:0
秒杀乐成!商品ID:1,剩余库存:1
秒杀乐成!商品ID:1,剩余库存:5
秒杀乐成!商品ID:1,剩余库存:6
秒杀乐成!商品ID:1,剩余库存:7
秒杀失败!商品ID:1,库存不足。
秒杀乐成!商品ID:1,剩余库存:9
秒杀失败!商品ID:1,库存不足。
秒杀乐成!商品ID:1,剩余库存:2
秒杀失败!商品ID:1,库存不足。
秒杀乐成!商品ID:1,剩余库存:4
秒杀失败!商品ID:1,库存不足。
  


说明:订单是并发下的,同时抢购,同时产物表没有出现超卖。
5. 基于 Zookeeper

5.1 安装 Zookeeper

5.1.1 拉取 Zookeeper Docker 镜像

  1. docker pull zookeeper
复制代码
5.1.2 运行 Zookeeper 容器

  1. docker run -d --name zookeeper -p 2181:2181 zookeeper
复制代码
5.1.3 验证是否启动乐成

  1. docker exec -it zookeeper /bin/bash
复制代码
5.1.4 连接到Zookeeper 客户端

  1. zkCli.sh -server localhost:2181
复制代码
5.2 分布式锁

​ Zookeeper 是一个分布式协调服务,它为分布式系统提供了一种强一致性的机制。Zookeeper 集群中的所有节点(通常为奇数个)通过一致性算法(如 ZAB 协议)来保证数据的一致性和可靠性。
暂时节点(Ephemeral Node):暂时节点是 Zookeeper 的一种特别节点,它在客户端会话有用期间存在,一旦客户端断开连接(例如瓦解或超时),暂时节点将主动删除。暂时节点的特性确保了锁在客户端失效后可以大概被主动开释,从而避免了死锁的发生。
顺序节点(Sequential Node):顺序节点是在创建节点时,Zookeeper 会在节点名称后附加一个全局递增的序号。每次哀求创建顺序节点时,Zookeeper 会天生一个具有唯一序号的节点。通过顺序节点,可以为多个客户端竞争锁的哀求排序,实现公平锁。
步骤

  • 创建锁节点:客户端尝试在 Zookeeper 的某个路径下创建一个带有唯一序号的暂时顺序节点(如 /locks/lock-000000001)。
  • 判断锁的持有者:客户端获取当前所有节点的列表,判断自己创建的节点是否是序号最小的节点。如果是,则认为自己获取到了锁;如果不是,则监听比自己序号小的节点(即前一个节点)的删除事件。
  • 等候锁开释:如果当前节点不是序号最小的节点,客户端会进入等候状态,直到它监听的前一个节点被删除。当前一个节点被删除时,客户端重新检查自己是否是序号最小的节点,如果是,则获取锁。
  • 开释锁:当客户端完成对共享资源的操作后,删除它创建的暂时顺序节点,从而开释锁。
  • 关照下一个客户端:锁的开释会触发下一个序号节点的监听事件,该客户端随即尝试获取锁并执行相应操作。
5.3 简单案例

  1. CREATE TABLE product
  2. (
  3.     product_id INT PRIMARY KEY,
  4.     stock      INT
  5. );
  6. -- 初始化库存为5
  7. INSERT INTO product (product_id, stock)
  8. VALUES (1, 5);
复制代码
5.3.1 项目结构


5.3.2 pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4.     <modelVersion>4.0.0</modelVersion>
  5.     <parent>
  6.         <groupId>org.springframework.boot</groupId>
  7.         <artifactId>spring-boot-starter-parent</artifactId>
  8.         <version>3.3.2</version>
  9.         <relativePath/> <!-- lookup parent from repository -->
  10.     </parent>
  11.     <groupId>org.example</groupId>
  12.     <artifactId>lock</artifactId>
  13.     <version>0.0.1-SNAPSHOT</version>
  14.     <properties>
  15.         <java.version>17</java.version>
  16.     </properties>
  17.     <dependencies>
  18.         <dependency>
  19.             <groupId>org.springframework.boot</groupId>
  20.             <artifactId>spring-boot-starter-web</artifactId>
  21.         </dependency>
  22.         <dependency>
  23.             <groupId>org.mybatis.spring.boot</groupId>
  24.             <artifactId>mybatis-spring-boot-starter</artifactId>
  25.             <version>3.0.3</version>
  26.         </dependency>
  27.         <dependency>
  28.             <groupId>com.mysql</groupId>
  29.             <artifactId>mysql-connector-j</artifactId>
  30.             <version>8.3.0</version>
  31.         </dependency>
  32.         <dependency>
  33.             <groupId>org.projectlombok</groupId>
  34.             <artifactId>lombok</artifactId>
  35.             <optional>true</optional>
  36.         </dependency>
  37.         <dependency>
  38.             <groupId>org.apache.curator</groupId>
  39.             <artifactId>curator-recipes</artifactId>
  40.             <version>5.3.0</version>
  41.         </dependency>
  42.         <!-- Zookeeper client -->
  43.         <dependency>
  44.             <groupId>org.apache.zookeeper</groupId>
  45.             <artifactId>zookeeper</artifactId>
  46.             <version>3.7.0</version>
  47.         </dependency>
  48.     </dependencies>
  49.     <build>
  50.         <plugins>
  51.             <plugin>
  52.                 <groupId>org.springframework.boot</groupId>
  53.                 <artifactId>spring-boot-maven-plugin</artifactId>
  54.                 <configuration>
  55.                     <excludes>
  56.                         <exclude>
  57.                             <groupId>org.projectlombok</groupId>
  58.                             <artifactId>lombok</artifactId>
  59.                         </exclude>
  60.                     </excludes>
  61.                 </configuration>
  62.             </plugin>
  63.         </plugins>
  64.     </build>
  65. </project>
复制代码
说明:选择符合的版本zookeeper依赖。
5.3.3 application.yml

  1. spring:
  2.   application:
  3.     name: lock
  4.   datasource:
  5.     driver-class-name: com.mysql.cj.jdbc.Driver
  6.     url: jdbc:mysql://localhost:3306/lock_?useSSL=false&serverTimezone=UTC
  7.     username: root
  8.     password: 123456
  9. mybatis:
  10.   configuration:
  11.     map-underscore-to-camel-case: true
  12. zookeeper:
  13.   connect-string: 192.168.186.77:2181
  14.   session-timeout-ms: 5000
  15.   connection-timeout-ms: 3000
  16.   retry:
  17.     base-sleep-time-ms: 1000
  18.     max-retries: 3
  19. server:
  20.   port: 8001
复制代码
5.3.4 LockApplication.java

  1. package org.example;
  2. import org.mybatis.spring.annotation.MapperScan;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. @SpringBootApplication
  6. @MapperScan("org.example.mapper")
  7. public class LockApplication {
  8.     public static void main(String[] args) {
  9.         SpringApplication.run(LockApplication.class, args);
  10.     }
  11. }
复制代码
5.3.5 ZookeeperConfig.java

  1. package org.example.config;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.retry.ExponentialBackoffRetry;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class ZookeeperConfig {
  10.     @Value("${zookeeper.connect-string}")
  11.     private String connectString;
  12.     @Value("${zookeeper.session-timeout-ms}")
  13.     private int sessionTimeoutMs;
  14.     @Value("${zookeeper.connection-timeout-ms}")
  15.     private int connectionTimeoutMs;
  16.     @Value("${zookeeper.retry.base-sleep-time-ms}")
  17.     private int baseSleepTimeMs;
  18.     @Value("${zookeeper.retry.max-retries}")
  19.     private int maxRetries;
  20.     @Bean
  21.     public CuratorFramework curatorFramework() {
  22.         CuratorFramework client = CuratorFrameworkFactory.newClient(
  23.                 connectString,
  24.                 sessionTimeoutMs,
  25.                 connectionTimeoutMs,
  26.                 new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries)
  27.         );
  28.         client.start();
  29.         return client;
  30.     }
  31. }
复制代码
5.3.6 Product.java

  1. package org.example.model;
  2. import lombok.Data;
  3. @Data
  4. public class Product {
  5.     private Integer productId;
  6.     private Integer stock;
  7. }
复制代码
5.3.7 ProductMapper.java

  1. package org.example.mapper;
  2. import org.apache.ibatis.annotations.Select;
  3. import org.apache.ibatis.annotations.Update;
  4. import org.springframework.stereotype.Repository;
  5. @Repository
  6. public interface ProductMapper {
  7.     @Select("SELECT stock FROM product WHERE product_id = #{productId} FOR UPDATE")
  8.     int getStockByProductId(int productId);
  9.     @Update("UPDATE product SET stock =stock-1 WHERE product_id = #{productId}")
  10.     void updateProductStock(int productId);
  11. }
复制代码
5.3.8 ProductService.java

  1. package org.example.service;
  2. import org.example.mapper.ProductMapper;
  3. import org.example.model.Product;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. @Service
  7. public class ProductService {
  8.     @Autowired
  9.     private ProductMapper productMapper;
  10.     public void reduceStock(int productId) {
  11.         // 获取当前库存
  12.         int stock = productMapper.getStockByProductId(productId);
  13.         if (stock > 0) {
  14.             // 扣减库存
  15.             productMapper.updateProductStock(productId);
  16.             System.out.println("库存扣减成功,剩余库存:" + (stock-1));
  17.         } else {
  18.             System.out.println("库存不足。");
  19.         }
  20.     }
  21. }
复制代码
5.3.9 ZookeeperLockService.java

  1. package org.example.service;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. import java.util.concurrent.TimeUnit;
  7. @Service
  8. public class ZookeeperLockService {
  9.     private final CuratorFramework client;
  10.     @Autowired
  11.     public ZookeeperLockService(CuratorFramework client) {
  12.         this.client = client;
  13.     }
  14.     /**
  15.      * 获取分布式锁
  16.      *
  17.      * @param lockPath  锁路径
  18.      * @param timeout   获取锁的超时时间
  19.      * @param timeUnit  时间单位
  20.      * @return 返回锁实例,如果成功获取锁,否则返回null
  21.      */
  22.     public InterProcessMutex acquireLock(String lockPath, long timeout, TimeUnit timeUnit) {
  23.         InterProcessMutex lock = new InterProcessMutex(client, lockPath);
  24.         try {
  25.             if (lock.acquire(timeout, timeUnit)) {
  26.                 return lock; // 返回成功获取锁的实例
  27.             }
  28.         } catch (Exception e) {
  29.             e.printStackTrace();
  30.         }
  31.         return null; // 如果未能获取锁,返回null
  32.     }
  33.     /**
  34.      * 释放分布式锁
  35.      *
  36.      * @param lock 锁实例
  37.      */
  38.     public void releaseLock(InterProcessMutex lock) {
  39.         if (lock != null) {
  40.             try {
  41.                 lock.release();
  42.                 System.out.println("锁释放成功!");
  43.             } catch (Exception e) {
  44.                 e.printStackTrace();
  45.             }
  46.         }
  47.     }
  48. }
复制代码
说明:InterProcessMutex 是 Apache Curator 库中提供的一种实现分布式锁的工具类,它基于 Zookeeper 来实现锁的互斥性。InterProcessMutex 提供了一种跨历程、跨节点的锁机制,确保在分布式环境中,同一时间只有一个客户端可以大概获得锁,其他客户端需要等候该锁被开释后才气继续操作。
5.3.10 LockController.java

  1. package org.example.controller;
  2. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  3. import org.example.service.ProductService;
  4. import org.example.service.ZookeeperLockService;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import java.util.concurrent.TimeUnit;
  10. @RestController
  11. public class LockController {
  12.     @Autowired
  13.     private ZookeeperLockService zookeeperLockService;
  14.     @Autowired
  15.     private ProductService productService;
  16.     @Value("${server.port}")
  17.     private String port;
  18.     @GetMapping("/order")
  19.     public void executeTaskWithLock() {
  20.         int productId = 1;
  21.         String lockPath = "/zk-lock/" + productId;
  22.         InterProcessMutex lock = zookeeperLockService.acquireLock(lockPath, 5, TimeUnit.SECONDS);
  23.         if (lock != null) {
  24.             try {
  25.                 System.out.println(port+"-成功获取锁");
  26.                 productService.reduceStock(1);
  27.             } finally {
  28.                 // 释放锁
  29.                 zookeeperLockService.releaseLock(lock);
  30.                 System.out.println(port+"-成功释放锁");
  31.             }
  32.         } else {
  33.             System.out.println(port+"-未能获取锁,任务已被其他节点处理");
  34.         }
  35.     }
  36. }
复制代码
5.3.11 测试验证

条件预备:同目录3.1.12完全一致。
运行效果8001:
   8001-乐成获取锁
库存扣减乐成,剩余库存:4
锁开释乐成!
8001-乐成开释锁
8001-乐成获取锁
库存扣减乐成,剩余库存:2
锁开释乐成!
8001-乐成开释锁
8001-乐成获取锁
库存扣减乐成,剩余库存:0
锁开释乐成!
8001-乐成开释锁
  …
  运行效果8002:
   8002-乐成获取锁
库存扣减乐成,剩余库存:3
锁开释乐成!
8002-乐成开释锁
8002-乐成获取锁
库存扣减乐成,剩余库存:1
锁开释乐成!
8002-乐成开释锁
8002-乐成获取锁
库存不足。
  …
  6. 总结

​ 简单的模仿包括基于数据库的乐观锁、悲观锁、以及利用数据库的唯一性束缚的分布式锁,以及基于Redis、Zookeeper的分布式锁,仅供学习参考。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

立山

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表