马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
分布式锁的实现,就需要找到一个可以让所有的 JVM 访问到的公共组件,好比数据库,Redis等。
1 Redis 实现分布式锁
使用 Redis 实现分布式锁,实现原理:
- 根据差异的业务可以定名为差异的锁,好比某条商品库存的主键可视为键名,随机值作为键值,设置在 Redis 中,别的 JVM 访问到了商品库存信息时,要到 Redis 中查看是否有如许对应键值对,如果有则视为该信息已被上锁
- 随机值作为键值,它的用于开释锁时的校验。由于有一种环境发生,锁存入 Redis 时需要设置过期时间,这是由于业务中有可能发生异常导致没法正常开释锁,而加上过期时间就可以避免别的进程永远无法得到锁,但这时就又引入另一个题目就是,当业务是正常运作的,但运行时间过长导致键值对失效,因此另一个进程业务可以得到锁即重新设置了一条新的键值对,那么此时运行时间过长的那个业务如果不通过校验逻辑就会开释了锁,导致混乱了
- 如许的实现方法主要是由于 Redis 是一个单线程,同时它设置键值对时有一个 NX 特性,它表示当 key 不存在时设置成功,key存在时设置不成功,它是一个原子性操作从而达到上锁操作的原子性
- @GetMapping("/index")
- public String index() {
- String key = "idOfOrderTable";
- String value = UUID.randomUUID().toString();
- log.info("等待获取锁。。。");
- redisOperator.setIfAbsent(key, value, 15);
- // 检查是否当前进程成功上锁
- String data = redisOperator.get(key);
- while (!value.equals(data)) {
- redisOperator.setIfAbsent(key, value, 15);
- data = redisOperator.get(key);
- }
- log.info("------- 成功获取锁 -------");
- try {
- log.info("执行业务逻辑。。。");
- Thread.sleep(10000);
- log.info("------- 释放锁 -------");
- data = redisOperator.get(key);
- if (data != null && data.equals(value)) {
- redisOperator.del(key);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "result";
- }
复制代码 1.1 Redisson
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson-spring-boot-starter</artifactId>
- <version>3.11.2</version>
- </dependency>
复制代码 Redisson 是一个高级的分布式协调Redis客服端,例子代码:
- @Autowired
- private RedissonClient redissonClient;
- @GetMapping("/index5")
- public String index5() {
- log.info("------- 进入方法 -------");
- RLock rLock = redissonClient.getLock("idOfOrderTable");
- try {
- rLock.lock(30, TimeUnit.SECONDS); // 传入锁的过期时间
- log.info("------- 成功获取锁 -------");
- log.info("执行业务逻辑。。。");
- Thread.sleep(10000);
- log.info("执行完业务逻辑。。。");
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- log.info("------- 释放锁 -------");
- rLock.unlock();
- }
- return "result";
- }
复制代码 2 Zookeeper 实现分布式锁
使用 Zookeeper 实现分布式锁,要理解 Zookeeper 存储数据的布局,它的数据布局是一个树形文件布局,它拥有两种范例的节点(每个节点都可以存储数据):
- 长期节点
- 瞬时节点:瞬时节点不可有子节点,会话竣事后瞬时节点自动消散
Zookeeper 能实现分布式锁主要利用 Zookeeper 的瞬时有序节点的特性,多线程高并发创建有序瞬时节点的方式,根据先后次序生成名称包罗序号的节点,每个节点可以添加观察器监听事件,监听本序号的前一个序号节点的状态。
生成最早节点的线程视为已经得到锁,其余线程在生成节点之后进入休眠状态等待后续激活,当执行完业务逻辑后,删除对应的节点(视为开释锁),此时序号紧接的节点触发了监听事件,该监听事件的主要作用就是重新激活线程的休眠状态(视为得到了锁)。
观察器只能监控一次,再监控需要重新设置
引入的包需要和服务器的 Zookeeper 一致:
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.6</version>
- </dependency>
复制代码 工具类代码:
- /**
- * Zookeeper 线程锁的工具类,实现了资源自动关闭接口和 Zookeeper 的观察器接口
- */
- @Slf4j
- public class ZkLock implements AutoCloseable, Watcher {
- private ZooKeeper zooKeeper; // zookeeper 客户端
- private String connectStr = "192.168.212.128:2181"; // zookeeper 地址
- private String zNode; // 子节点路径
- /**
- * 构造函数
- * @throws IOException
- */
- public ZkLock() throws IOException {
- // 初始化 zookeeper 客户端,因为本类实现了 watcher 接口,所以传入 watcher 时,可以直接传入 this
- this.zooKeeper = new ZooKeeper(connectStr, 10000, this);
- }
- /**
- * 获取锁
- * @param bussinessCode 业务代码
- * @return
- */
- public Boolean getLock(String bussinessCode) {
- // 创建业务节点
- String fatherPath = "/" + bussinessCode; // 父节点路径
- // 创建持久化业务根节点(就像归类存储文件一样,比如说商品信息,可以先建一个商品根节点作为类别区分,详情信息就全是其子节点)
- try {
- // 根节点是否存在
- Stat stat = zooKeeper.exists(fatherPath, false); // 第二个参数为是否添加观察器
- // 如果根节点不存在,则创建一个
- if (stat == null){
- // 参数分别为:路径,数据内容,权限(此时设置无需账号即可访问),节点类型(持久节点)
- zooKeeper.create(
- fatherPath, // 节点路径
- fatherPath.getBytes(), // 数据内容,可有可无
- ZooDefs.Ids.OPEN_ACL_UNSAFE, // 权限,设置无需账号即可访问
- CreateMode.PERSISTENT // 节点类型,持久节点
- );
- }
- // 创建瞬时有序节点
- String sonPath = fatherPath + "/" + bussinessCode + "_"; // 瞬时有序节点路径,当节点创建成功后,zookeeper 会在节点名称后添加序号
- zNode = zooKeeper.create(
- sonPath,
- sonPath.getBytes(),
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL_SEQUENTIAL // 瞬时有序节点
- ); // 返回节点路径,如 /order/order_0001
- // 判断本节点是否为序号最小的节点
- List<String> childrenNodes = zooKeeper.getChildren(fatherPath, false);
- Collections.sort(childrenNodes);
- String firstNode = childrenNodes.get(0); // 排序后,最小序号的节点名称,注意不是完整路径,如 order_0001
- // 如果是序号最小的节点,直接返回 true,调用方获取 true 就意味着获得了锁
- if (zNode.endsWith(firstNode)) {
- return true;
- }
- // 如果不是序号最小的节点,则添加观察器后进入休眠状态,等待观察器被触发,重新激活,此时调用方会进入阻塞状态,等待获取锁
- else {
- String lastNode = firstNode;
- // 添加观察器
- for (String nodeItem : childrenNodes)
- {
- if (zNode.endsWith(nodeItem))
- {
- // 添加前一个节点观察器,观察其是否还存在
- zooKeeper.exists(sonPath + lastNode, true);
- break;
- }
- lastNode = nodeItem;
- }
- }
- // 将该线程挂起(将该线程加入到本实例对象的等待队列中)
- synchronized (this) {
- wait();
- }
- // 线程挂起被激活后,继续执行操作,返回 true,表示调用方获取锁
- return true;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return false;
- }
- /**
- * 实现资源自动关闭接口,这样调用方就可以直接使用 try (ZkLock zklock = new ZkLock()) {...} 的形式上锁,这样执行完代码块后,会自动调用 close 方法
- * @throws Exception
- */
- @Override
- public void close() throws Exception {
- // 删除本节点,从而触发观察器逻辑
- // 第二个参数为版本号,用于校验的,这里直接使用 -1,表示匹配所有版本号
- zooKeeper.delete(zNode, -1);
- zooKeeper.close();
- log.info("------- 释放锁 -------");
- }
- /**
- * 实现 zookeeper 观察器接口
- * @param watchedEvent
- */
- @Override
- public void process(WatchedEvent watchedEvent) {
- // 当检测到节点被删除,激活线程(释放锁,并将本实例对象的等待列表中随意一个线程提取到入口列表中去)
- // 正因为是随机的,所以使用本工具类上锁时,每次需要新创建一个 ZkLock 对象,而不能使用一个公共变量存储 ZkLock 对象这样的形式去调用
- if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
- synchronized (this) {
- this.notify();
- }
- }
- }
- }
复制代码 调用方:
- @GetMapping("/index")
- public String index() {
- log.info("------- 进入方法 -------");
- try (ZkLock zkLock = new ZkLock()) {
- Boolean getLock = zkLock.getLock("order");
- if (getLock) {
- log.info("------- 成功获取锁 -------");
- log.info("执行业务逻辑。。。");
- Thread.sleep(10000);
- log.info("执行完业务逻辑。。。");
- } else {
- log.info("------- 获取锁失败 -------");
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return "result";
- }
复制代码 2.1 curator
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>4.2.0</version>
- </dependency>
复制代码 curator 是 apache 封装 Zookeeper 的一个高级包,例子代码如下:
- @GetMapping("/index3")
- public String index3() {
- log.info("------- 进入方法 -------");
- // 获取连接
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); // 重试策略
- CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.212.128:2181", retryPolicy); // 客户端连接
- client.start();
- InterProcessMutex lock = new InterProcessMutex(client, "/order");
- // 获取锁,传入超时时间
- try {
- if (lock.acquire(30, TimeUnit.SECONDS)) {
- log.info("------- 成功获取锁 -------");
- log.info("执行业务逻辑。。。");
- Thread.sleep(10000);
- log.info("执行完业务逻辑。。。");
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- finally {
- try {
- lock.release();
- log.info("------- 释放锁 -------");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- client.close();
- return "result";
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |