知者何南 发表于 2025-4-9 19:49:52

zk底子—zk实现分布式功能

1.zk实现数据发布订阅


(1)发布订阅体系一样平常有推模式和拉模式


推模式:服务端主动将更新的数据发送给所有订阅的客户端。

拉模式:客户端主动发起哀求来获取最新数据(定时轮询拉取)。

(2)zk接纳了推拉相结合来实现发布订阅


首先客户端必要向服务端注册自己关注的节点(添加Watcher变乱)。一旦该节点发生变更,服务端就会向客户端发送Watcher变乱通知。客户端吸取到消息通知后,必要主动到服务端获取最新的数据。所以,zk的Watcher机制有一个缺点就是:客户端不能定制服务端回调,必要客户端收到Watcher通知后再次向服务端发起哀求获取数据,多进行一次网络交互。

如果将配置信息放到zk上进行集中管理,那么:
一.应用启动时需主动到zk服务端获取配置信息,然后在指定节点上注册一个Watcher监听。
二.只要配置信息发生变更,zk服务端就会实时通知所有订阅的应用,让应用能实时获取到订阅的配置信息节点已发生变更的消息。

注意:原生zk客户端可以通过getData()、exists()、getChildren()三个方法,向zk服务端注册Watcher监听,而且注册的Watcher监听具有一次性,所以zk客户端得到服务端的节点变更通知后必要再次注册Watcher。

(3)使用zk来实现数据发布订阅总结


步骤一:将配置信息存储到zk的节点上。
步骤二:应用启动时首先从zk节点上获取配置信息,然后再向该zk节点注册一个数据变更的Watcher监听。一旦该zk节点数据发生变更,所有订阅的客户端就能收到数据变更通知。
步骤三:应用收到zk服务端发过来的数据变更通知后重新获取最新数据。

(4)zk原生实现分布式配置(也就是实现注册发现或者数据发布订阅)


配置可以使用数据库、Redis、或者任何一种可以共享的存储位置。使用zk的目的,主要就是使用它的回调机制。任何zk的使用方不必要去轮询zk,Redis或者数据库可能就必要主动轮询去看看数据是否发生改变。使用zk最大的优势是只要对数据添加Watcher,数据发生修改时zk就会回调指定的方法。注意:new一个zk实例和向zk获取数据都是异步的。

如下的做法实在是一种Reactor响应式编程:使用CoundownLatch阻塞及通过调用一次数据来触发回调更新本地的conf。我们并没有每个场景都线性写一个方法堆砌起来,而是用相应的回调和Watcher变乱来粘连起来。实在就是把所有变乱发生前后要做的事情粘连起来,等着回调来触发。

一.先定义一个工具类可以获取zk实例

public class ZKUtils {
    private static ZooKeeper zk;
    private static String address = "192.168.150.11:2181,192.168.150.12:2181,192.168.150.13:2181,192.168.150.14:2181/test";
    private static DefaultWatcher defaultWatcher = new DefaultWatcher();
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
   
    public static ZooKeeper getZK() {
      try {
            zk = new ZooKeeper(address, 1000, defaultWatcher);
            defaultWatcher.setCountDownLatch(countDownLatch);
            //阻塞直到建立好连接拿到可用的zk
            countDownLatch.await();
      } catch (Exception e) {
            e.printStackTrace();
      }
      return zk;
    }
}
二.定义和zk创建毗连时的Watcher

public class DefaultWatcher implements Watcher {
    CountDownLatch countDownLatch ;
   
    public void setCountDownLatch(CountDownLatch countDownLatch) {
      this.countDownLatch = countDownLatch;
    }

    @Override
    public void process(WatchedEvent event) {
      System.out.println(event.toString());
      switch (event.getState()) {
            case Unknown:
                break;
            case Disconnected:
                break;
            case NoSyncConnected:
                break;
            case SyncConnected:
                countDownLatch.countDown();
                break;
            case AuthFailed:
                break;
            case ConnectedReadOnly:
                break;
            case SaslAuthenticated:
                break;
            case Expired:
                break;
      }
    }
}
三.定义分布式配置的焦点类WatcherCallBack

这个WatcherCallBack类不但实现了Watcher,还实现了两个异步回调。

首先通过zk.exists()方法判断配置的znode是否存在并添加监听(自己) + 回调(自己),然后通过countDownLatch.await()方法进行阻塞。

在回调中如果发现存在配置的znode,则设置配置并执行countDown()方法不再进行阻塞。

在监听中如果发现数据变化,则会调用zk.getData()方法获取配置的数据,而且获取配置的数据时也会继续监听(自己) + 回调(自己)。

public class WatcherCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
    ZooKeeper zk;
    MyConf conf;
    CountDownLatch countDownLatch = new CountDownLatch(1);

    public MyConf getConf() {
      return conf;
    }
   
    public void setConf(MyConf conf) {
      this.conf = conf;
    }
   
    public ZooKeeper getZk() {
      return zk;
    }
   
    public void setZk(ZooKeeper zk) {
      this.zk = zk;
    }
   
    //判断配置是否存在并监听配置的znode
    public void aWait(){
      zk.exists("/AppConf", this, this ,"ABC");
      try {
            countDownLatch.await();
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
    }
   
    //回调自己,这是执行完zk.exists()方法或者zk.getData()方法的回调
    //在回调中如果发现存在配置的znode,则设置配置并执行countDown()方法不再进行阻塞。
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
      if (data != null) {
            String s = new String(data);
            conf.setConf(s);
            countDownLatch.countDown();
      }
    }
   
    //监听自己,这是执行zk.exists()方法或者zk.getData()方法时添加的Watcher监听
    //在监听中如果发现数据变化,则会继续调用zk.getData()方法获取配置的数据,并且获取配置的数据时也会继续监听(自己) + 回调(自己)
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
      if (stat != null) {//stat不为空, 代表节点已经存在
            zk.getData("/AppConf", this, this, "sdfs");
      }
    }
   
    @Override
    public void process(WatchedEvent event) {
      switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                //调用一次数据, 这会触发回调更新本地的conf
                zk.getData("/AppConf", this, this, "sdfs");
                break;
            case NodeDeleted:
                //容忍性, 节点被删除, 把本地conf清空, 并且恢复阻塞
                conf.setConf("");
                countDownLatch = new CountDownLatch(1);
                break;
            case NodeDataChanged:
                //数据发生变更, 需要重新获取调用一次数据, 这会触发回调更新本地的conf
                zk.getData("/AppConf", this, this, "sdfs");
                break;
            case NodeChildrenChanged:
                break;
      }
    }
}
分布式配置的焦点配置类:

//MyConf是配置中心的配置
public class MyConf {
    privateString conf ;
    public String getConf() {
      return conf;
    }
   
    public void setConf(String conf) {
      this.conf = conf;
    }
}
四.通过WatcherCallBack的方法判断配置是否存在并实验获取数据

public class TestConfig {
    ZooKeeper zk;
   
    @Before
    public void conn () {
      zk = ZKUtils.getZK();
    }
   
    @After
    public void close () {
      try {
            zk.close();
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
    }
   
    @Test
    public void getConf() {
      WatchCallBack watchCallBack = new WatchCallBack();
      //传入zk和配置conf
      watchCallBack.setZk(zk);
      MyConf myConf = new MyConf();
      watchCallBack.setConf(myConf);

      //节点不存在和节点存在, 都尝试去取数据, 取到了才往下走
      watchCallBack.aWait();
      while(true) {
            if (myConf.getConf().equals("")) {
                System.out.println("conf diu le ......");
                watchCallBack.aWait();
            } else {
                System.out.println(myConf.getConf());
            }
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
      }
    }
}
2.zk实现负载平衡


(1)负载平衡算法


常用的负载平衡算法有:轮询法、随机法、原地点哈希法、加权轮询法、加权随机法、最小毗连数法。

一.轮询法

轮询法是最为简朴的负载平衡算法。当吸取到客户端哀求后,负载平衡服务器会按次序逐个分配给后端服务。好比集群中有3台服务器,分别是server1、server2、server3,轮询法会按照sever1、server2、server3次序依次分发哀求给每个服务器。当第一次轮询结束后,会重新开始下一轮的循环。

二.随机法

随机法是指负载平衡服务器在吸取到来自客户端哀求后,根据随机算法选中后台集群中的一台服务器来处置惩罚这次哀求。由于当集群中的机器变得越来越多时,每台机器被抽中的概率基本相等,因此随机法的现实效果越来越趋近轮询法。

三.原地点哈希法

原地点哈希法是根据客户端的IP地点进行哈希盘算,对盘算结果进行取模,然后根据最终结果选择服务器地点列表中的一台机器来处置惩罚哀求。这种算法每次都会分配同一台服务器来处置惩罚同一IP的客户端哀求。

四.加权轮询法

由于一个分布式体系中的机器可能部署在差别的网络环境中,每台机器的配置性能各不相同,因此其处置惩罚和响应哀求的能力也各不相同。

如果接纳上面几种负载平衡算法,都不太合适。这会造成能力强的服务器在处置惩罚完业务后过早进入空闲状态,而性能差或网络环境不好的服务器不停忙于处置惩罚哀求造成任务积存。

为了解决这个标题,可以接纳加权轮询法。加权轮询法的方式与轮询法的方式很相似,唯一的差别在于选择机器时,不只是单纯按照次序的方式去选择,还要根据机器的配置和性能高低有所偏重,让配置性能好的机器优先分配。

五.加权随机法

加权随机法和上面提到的随机法一样,在接纳随机法选举服务器时,会思量体系性能作为权值条件。

六.最小毗连数法

最小毗连数法是指:根据后台处置惩罚客户端的哀求数,盘算应该把新哀求分配给哪一台服务器。一样平常以为哀求数最少的机器,会作为最优先分配的对象。

(2)使用zk来实现负载平衡


实现负载平衡服务器的关键是:探测和发现业务服务器的运行状态 + 分配哀求给最合适的业务服务器。

一.状态收集之实现zk的业务服务器列表

首先使用zk的临时子节点来标记业务服务器的状态。在业务服务器上线时:通过向zk服务器创建临时子节点来实现服务注册,表示业务服务器已上线。在业务服务器下线时:通过删除临时节点或者与zk服务器断开毗连来进行服务剔除。最后通过统计临时节点的数量,来了解业务服务器的运行情况。

在代码层面的实现中,首先定义一个BlanceSever接口类。该类用于业务服务器启动或关闭后:第一.向zk服务器地点列表注册或注销服务,第二.根据吸取到的哀求动态更新负载平衡情况。

public class BlanceSever {
    //向zk服务器地址列表进行注册服务
    public void register()
    //向zk服务器地址列表进行注销服务
    public void unregister()
    //根据接收到的请求动态更新负载均衡情况
    public void addBlanceCount()
    public void takeBlanceCount()
}
之后创建BlanceSever接口的实现类BlanceSeverImpl,在BlanceSeverImpl类中首先定义:业务服务器运行的Session超时时间、会话毗连超时时间、zk客户端地点、服务器地点列表节点SERVER_PATH等基本参数。并通过构造函数,在类被引用时进行初始化zk客户端对象实例。

public class BlanceSeverImpl implements BlanceSever {
    private static final Integer SESSION_TIME_OUT;
    private static final Integer CONNECTION_TIME_OUT;
    private final ZkClient zkclient;
    private static final SERVER_PATH = "/Severs";
   
    public BlanceSeverImpl() {
      init...
    }
}
接下来定义,业务服务器启动时,向zk注册服务的register方法。

在如下代码中,会通过在SERVER_PATH路径下创建临时子节点的方式来注册服务。首先获取业务服务器的IP地点,然后使用IP地点作为临时节点的path来创建临时节点。

public register() throws Exception {
    //首先获取业务服务器的IP地址
    InetAddress address = InetAddress.getLocalHost();
    String serverIp = address.getHostAddress();
    //然后利用IP地址作为临时节点的path来创建临时节点
    zkclient.createEphemeral(SERVER_PATH + serverIp);
}
接下来定义,业务服务器关机或不对外提供服务时的unregister()方法。通过调用unregister()方法,注销该台业务服务器在zk服务器列表中的信息。注销后的机器不会被负载平衡服务器分发处置惩罚会话。在如下代码中,会通过删除SERVER_PATH路径下临时节点的方式来注销业务服务器。

public unregister() throws Exception {
    zkclient.delete(SERVER_PATH + serverIp);
}
二.哀求分配之如何选择业务服务器

以最小毗连数法为例,来确定如何平衡地分配哀求给业务服务器,整个实现步骤如下:

步骤一:首先负载平衡服务器在吸取到客户端的哀求后,通过getData()方法获取已成功注册的业务服务器列表,也就是"/Servers"节点下的各个临时节点,这些临时节点都存储了当前服务器的毗连数。

步骤二:然后选取毗连数最少的业务服务器作为处置惩罚当前哀求的业务服务器,并通过setData()方法将该业务服务器对应的节点值(毗连数)加1。

步骤三:当该业务服务器处置惩罚完哀求后,调用setData()方法将该节点值(毗连数)减1。

下面定义,当业务服务器吸取到哀求后,增长毗连数的addBlance()方法。在如下代码中,首先通过readData()方法获取服务器最新的毗连数,然后将该毗连数加1。接着通过writeData()方法将最新的毗连数写入到该业务服务器对应的临时节点。

public void addBlance() throws Exception {
    InetAddress address = InetAddress.getLocalHost();
    String serverIp = address.getHostAddress();
    Integer con_count = zkClient.readData(SERVER_PATH + serverIp);
    ++con_count;
    zkClient.writeData(SERVER_PATH + serverIp, con_count);
}
3.zk实现分布式命名服务


命名服务是分布式体系最基本的公共服务之一。在分布式体系中,被命名的实体可以是集群中的机器、提供的服务地点等。比方,Java中的JNDI便是一种典型的命名服务。

(1)ID编码的特性


分布式ID天生器就是通过分布式的方式,主动天生ID编码的程序或服务。天生的ID编码一样平常具有唯一性、递增性、安全性、扩展性这几个特性。

(2)通过UUID方式天生分布式ID


UUID能非常轻便地包管分布式环境中ID的唯一性。它由32位字符和4个短线字符组成,好比e70f1357-f260-46ff-a32d-53a086c57ade。

由于UUID在本地应用中天生,所以天生速率比力快,不依赖其他服务和网络。但缺点是:长度过长、寄义不明、不满足递增性。

(3)通过TDDL天生分布式ID


MySQL的自增主键是一种有序的ID天生方式,还有一种性能更好的数据库序列天生方式:TDDL中的ID天生方式。TDDL是Taobao Distributed Data Layer的缩写,是一种数据库中央件,主要应用于数据库分库分表的应用场景中。

TDDL天生ID编码的大致过程如下:首先数据库中有一张Sequence序列化表,记录当前已被占用的ID最大值。然后每个必要ID编码的客户端在哀求TDDL的ID编码天生器后,TDDL都会返回给该客户端一段ID编码,并更新Sequence表中的信息。

客户端吸取到一段ID编码后,会将该段编码存储在内存中。在本机必要使用ID编码时,会首先使用内存中的ID编码。如果内存中的ID编码已经完全被占用,则再重新向编码服务器获取。

TDDL通过分批获取ID编码的方式,减少了客户端访问服务器的频率,制止了网络颠簸所造成的影响,并减轻了服务器的内存压力。不外TDDL高度依赖数据库,不能作为独立的分布式ID天生器对外提供服务。

(4)通过zk天生分布式ID


每个必要ID编码的业务服务器可以看作是zk的客户端,ID编码天生器可以看作是zk的服务端,可以使用zk数据模型中的次序节点作为ID编码。

客户端通过create()方法来创建一个次序子节点。服务端成功创建节点后会响应客户端哀求,把创建好的节点发送给客户端。客户端以次序节点名称为底子进行ID编码,天生ID后就可以进行业务利用。

(5)SnowFlake算法


SnowFlake算法是Twitter开源的一种用来天生分布式ID的算法,通过SnowFlake算法天生的编码会是一个64位的二进制数。

第一个bit不用,接下来的41个bit用来存储毫秒时间戳,再接下来的10个bit用来存储机器ID,剩余的12个bit用来存储流水号和0。


https://i-blog.csdnimg.cn/img_convert/e1baf637d4be68e7e37c5587bd9f9e9e.png

SnowFlake算法主要的实现手段就是对二进制数位的利用,SnowFlake算法理论上每秒可以天生400多万个ID编码,SnowFlake是业界广泛接纳的分布式ID天生算法。

4.zk实现分布式调和(Master-Worker协同)


(1)Master-Worker架构


Master-Work是一个广泛使用的分布式架构,体系中有一个Master负责监控Worker的状态,并为Worker分配任务。

阐明一:在任何时候,体系中最多只能有一个Master。不可以出现两个Master,多个Master共存会导致脑裂。

阐明二:体系中除了Active状态的Master还有一个Bakcup的Master。如果Active失败,Backup可以很快进入Active状态。

阐明三:Master实时监控Worker的状态,能够及时收到Worker成员变化的通知。Master在收到Worker成员变化通知时,会重新进行任务分配。


https://i-blog.csdnimg.cn/img_convert/2e73ceff9aca1c9b03e3c426a526799d.png

(2)Master-Worker架构示例—HBase


HBase接纳的就是Master-Worker的架构。HMBase是体系中的Master,HRegionServer是体系中的Worker。HMBase会监控HBase Cluster中Worker的成员变化,HMBase会把region分配给各个HRegionServer。体系中有一个HMaster处于Active状态,其他HMaster处于备用状态。


https://i-blog.csdnimg.cn/img_convert/035c364cc5f6a2324672d4b167864f0d.png

(3)Master-Worker架构示例—Kafka


一个Kafka集群由多个Broker组成,这些Borker是体系中的Worker,Kafka会从这些Worker选举出一个Controller。这个Controlle是体系中的Master,负责把Topic Partition分配给各个Broker。


https://i-blog.csdnimg.cn/img_convert/011d77517c63ce7de6ebf8f6660586f9.png

(4)Master-Worker架构示例—HDFS


HDFS接纳的也是一个Master-Worker的架构。NameNode是体系中的Master,DataNode是体系中的Worker。NameNode用来保存整个分布式文件体系的MetaData,并把数据块分配给Cluster中的DataNode进行保存。


https://i-blog.csdnimg.cn/img_convert/4a217fe211fbba08addf66e882befd52.png

(5)如何使用zk实现Master-Worker


步骤1:使用一个临时节点"/master"表示Master。Master在行使Master的职能之前,首先要创建这个znode。如果创建成功,进入Active状态,开始行使Master职能。如果创建失败,则进入Backup状态,并使用Watcher机制监听"/master"。假设体系中有一个Active Master和一个Backup Master。如果Active Master故障了,那么它创建的"/master"就会被zk主动删除。这时Backup Master会收到通知,再次创建"/master"成为新Active Master。

步骤2:使用一个持久节点"/workers"下的临时子节点来表示Worker,Worker通过在"/workers"节点下创建临时节点来加入集群。

步骤3:处于Active状态的Master会通过Watcher机制,监控"/workers"下的子节点列表来实时获取Worker成员的变化。

5.zk实现分布式通讯


在大部分的分布式体系中,机器间的通讯有三种类型:心跳检测、工作进度汇报、体系调度。

(1)心跳检测


机器间的心跳检测是指:在分布式环境中,差别机器间必要检测相互是否还在正常运行。其中,心跳检测有如下三种方法。

方法一:通常会通过机器间是否可以相互PING通来判断对方是否正常运行。

方法二:在机器间创建长毗连,通过TCP毗连固有的心跳检测机制来实现上层机器的心跳检测。

方法三:基于zk的临时子节点来实现心跳检测,让差别的机器都在zk的一个指定节点下创建临时子节点,差别机器间可以根据这个临时子节点来判断对应的客户端是否存活。基于zk的临时节点来实现的心跳检测,可以大大减少体系的耦合。因为检测体系和被检测体系之间不必要直接关联,只必要通过zk临时节点间接关联。

(2)工作进度汇报


在一个任务分发体系中,任务被分发到差别的机器上执行后,必要实时地将自己的任务执行进度汇报给分发体系。

通过zk的临时子节点来实现工作进度汇报:可以在zk上选择一个节点,每个任务机器都在该节点下创建临时子节点。然后通过判断临时子节点是否存在来确定任务机器是否存活,各个任务机器会实时地将自己的任务执行进度写到其对应的临时节点上,以便中央体系能够实时获取到任务的执行进度。

(3)体系调度


一个分布式体系由控制台和一些客户端体系组成,控制台的职责是将一些指令信息发送给所有客户端。

使用zk实现体系调度时:先让控制台的一些利用指令对应到zk的某些节点数据,然后让客户端体系注册对这些节点数据的监听。当控制台进行一些利用时,便会触发修改这些节点的数据,而zk会将这些节点数据的变更以变乱通知的情势发送给监听的客户端。

如许就能省去大量底层网络通讯和协议设计上的重复工作了,也大大降低了体系间的耦合,方便实现异构体系的灵活通讯。

6.zk实现Master选举


Master选举的需求是:在集群的所有机器中选举出一台机器作为Master。

(1)通过创建临时节点实现


集群的所有机器都往zk上创建一个临时节点如"/master"。在这个过程中只会有一个机器能成功创建该节点,则该机器便成为Master。同时其他没有成功创建节点的机器会在"/master"节点上注册Watcher监听,一旦当前Master机器挂了,那么其他机器就会重新往zk上创建临时节点。

(2)通过临时次序子节点来实现


使用临时次序子节点来表示集群中的机器发起的选举哀求,然后让创建最小后缀数字节点的机器成为Master。

7.zk实现分布式锁


可以使用zk的临时节点来解决死锁标题,可以使用zk的Watcher监听机制实现锁释放后重新竞争锁,可以使用zk数据节点的版本来实现乐观锁。

(1)死锁的解决方案


在单机环境下,多线程之间会产存亡锁标题。同样,在分布式体系环境下,也会产生分布式死锁的标题。常用的解决死锁标题的方法有超时方法和死锁检测。

一.超时方法

在解决死锁标题时,超时方法可能是最简朴的处置惩罚方式了。超时方式是在创建分布式线程时,对每个线程都设置一个超时时间。当该线程的超时时间到期后,无论该线程是否执行完毕,都要关闭该线程并释放该线程所占用的体系资源,之后其他线程就可以访问该线程释放的资源,如许就不会造成死锁标题。

但这种设置超时时间的方法最大的缺点是很难设置一个合适的超时时间。如果时间设置过短,可能造成线程未执行完相干的处置惩罚逻辑,就因为超时时间到期就被迫关闭,最终导致程序执行出错。

二.死锁检测

死锁检测是处置惩罚死锁标题的另一种方法,它解决了超时方法的缺陷。死锁检测方法会主动检测发现线程死锁,在控制死锁标题上更加灵活正确。

可以把死锁检测理解为一个运行在各服务器体系上的线程或方法,该方法专门用来发现应用服务上的线程是否发存亡锁。如果发存亡锁,那么就会触发相应的预设处置惩罚方案。

(2)zk如何实现排他锁


一.获取锁

获取排他锁时,所有的客户端都会试图通过调用create()方法,在"/exclusive_lock"节点下创建临时子节点"/exclusive_lock/lock"。zk会包管所有的客户端中只有一个客户端能创建临时节点成功,从而得到锁。没有创建临时节点成功的客户端也就没能得到锁,必要到"/exclusive_lock"节点上,注册一个子节点变更的Watcher监听,以便可以实时监听lock节点的变更情况。

二.释放锁

如果获取锁的客户端宕机,那么zk上的这个临时节点(lock节点)就会被移除。如果获取锁的客户端执行完,也会主动删除自己创建的临时节点(lock节点)。

(3)zk如何实现共享锁(读写锁)


一.获取锁

获取共享锁时,所有客户端会到"/shared_lock"下创建一个临时次序节点。如果是读哀求,那么就创建"/shared_lock/read001"的临时次序节点。如果是写哀求,那么就创建"/shared_lock/write002"的临时次序节点。

二.判断读写次序

步骤一:客户端在创建完节点后,会获取"/shared_lock"节点下的所有子节点,并对"/shared_lock"节点注册子节点变更的Watcher监听。

步骤二:然后确定自己的节点序号在所有子节点中的次序(包括读节点和写节点)。

步骤三:对于读哀求:如果没有比自己序号小的写哀求子节点,或所有比自己小的子节点都是读哀求,那么表明可以成功获取共享锁。如果有比自己序号小的子节点是写哀求,那么就必要进入等待。对于写哀求:如果自己不是序号最小的子节点,那么就必要进入等待。

步骤四:如果客户端在等待过程中吸取到Watcher通知,则重复步骤一。

三.释放锁

如果获取锁的客户端宕机,那么zk上的对应的临时次序节点就会被移除。如果获取锁的客户端执行完,也会主动删除自己创建的临时次序节点。

(4)羊群效应


一.排他锁的羊群效应

如果有大量的客户端在等待锁的释放,那么就会出现大量的Watcher通知。然后这些客户端又会发起创建哀求,但最后只有一个客户端能创建成功。这个Watcher变乱通知实在对绝大部分客户端都不起作用,极端情况可能会出现zk短时间向别的客户端发送大量的变乱通知,这就是羊群效应。出现羊群效应的根源在于:没有找准客户端真正的关注点。

二.共享锁的羊群效应

如果有大量的客户端在等待锁的释放,那么不但会出现大量的Watcher通知,还会出现大量的获取"/shared_lock"的子节点列表的哀求,但最后大部分客户端都会判断出自己并非是序号最小的节点。所以客户端会吸取过多和自己无关的通知和发起过多查询节点列表的哀求,这就是羊群效应。出现羊群效应的根源在于:没有找准客户端真正的关注点。

(5)改进后的排他锁


使用临时次序节点来表示获取锁的哀求,让创建出后缀数字最小的节点的客户端成功拿到锁。

步骤一:首先客户端调用create()方法在"/exclusive_lock"下创建一个临时次序节点。

步骤二:然后客户端调用getChildren()方法返回"/exclusive_lock"下的所有子节点,接着对这些子节点进行排序。

步骤三:排序后,看看是否有后缀比自己小的节点。如果没有,则当前客户端便成功获取到排他锁。如果有,则调用exist()方法对排在自己前面的谁人节点注册Watcher监听。

步骤四:当客户端收到Watcher通知前面的节点不存在,则重复步骤二。

(6)改进后的共享锁


步骤一:客户端调用create()方法在"/shared_lock"节点下创建临时次序节点。如果是读哀求,那么就创建"/shared_lock/read001"的临时次序节点。如果是写哀求,那么就创建"/shared_lock/write002"的临时次序节点。

步骤二:然后调用getChildren()方法返回"/shared_lock"下的所有子节点,接着对这些子节点进行排序。

步骤三:对于读哀求:如果排序后发现有比自己序号小的写哀求子节点,则必要等待,且必要向比自己序号小的最后一个写哀求子节点注册Watcher监听。对于写哀求:如果排序后发现自己不是序号最小的子节点,则必要等待,而且必要向比自己序号小的最后一个哀求子节点注册Watcher监听。注意:这里注册Watcher监听也是调用exist()方法。此外,不满足上述条件则表示成功获取共享锁。

步骤四:如果客户端在等待过程中吸取到Watcher通知,则重复步骤二。

(7)zk原生实现分布式锁的示例


一.分布式锁的实现步骤

步骤一:每个线程都通过"临时次序节点 + zk.create()方法 + 添加回调"去创建节点。

步骤二:线程执行完创建临时次序节点后,先通过CountDownLatch.await()方法进行阻塞。然后在创建成功的回调中,通过zk.getChildren()方法获取根目录并继续回调。

步骤三:某线程在获取根目录成功后的回调中,会对目录排序。排序后如果发现其创建的节点排第一,那么就执行countDown()方法不再阻塞,表示获取锁成功。排序后如果发现其创建的节点不是第一,则通过zk.exists()方法监听前一节点。

步骤四:获取到锁的线程会通过zk.delete()方法来删除其对应的节点实现释放锁。在期待获取锁的线程掉线时其对应的节点也会被删除。而一旦节点被删除,那些监听根目录的线程就会重新zk.getChildren()方法,获取成功后其回调又会进行排序以及通过zk.exists()方法监听前一节点。

二WatchCallBack对分布式锁的详细实现

public class WatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    ZooKeeper zk ;
    String threadName;
    CountDownLatch countDownLatch = new CountDownLatch(1);
    String pathName;
   
    public String getPathName() {
      return pathName;
    }
   
    public void setPathName(String pathName) {
      this.pathName = pathName;
    }
   
    public String getThreadName() {
      return threadName;
    }
   
    public void setThreadName(String threadName) {
      this.threadName = threadName;
    }
   
    public ZooKeeper getZk() {
      return zk;
    }
   
    public void setZk(ZooKeeper zk) {
      this.zk = zk;
    }
   
    public void tryLock() {
      try {
            System.out.println(threadName + " create....");
            //创建一个临时的有序的节点
            zk.create("/lock", threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "abc");
            countDownLatch.await();
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
    }
   
    //当前线程释放锁, 删除节点
    public void unLock() {
      try {
            zk.delete(pathName, -1);
            System.out.println(threadName + " over work....");
      } catch (InterruptedException e) {
            e.printStackTrace();
      } catch (KeeperException e) {
            e.printStackTrace();
      }
    }
   
    //上面zk.create()方法的回调
    //创建临时顺序节点后的回调, 10个线程都能同时创建节点
    //创建完后获取根目录下的子节点, 也就是这10个线程创建的节点列表, 这个不用watch了, 但获取成功后要执行回调
    //这个回调就是每个线程用来执行节点排序, 看谁是第一就认为谁获得了锁
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
      if (name != null ) {
            System.out.println(threadName+ "create node : " +name );
            setPathName(name);
            //一定能看到自己前边的, 所以这里的watch要是false
            zk.getChildren("/", false, this ,"sdf");
      }
    }
   
    //核心方法: 各个线程获取根目录下的节点时, 上面zk.getChildren("/", false, this ,"sdf")的回调
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
      //一定能看到自己前边的节点
      System.out.println(threadName + "look locks...");
      for (String child : children) {
            System.out.println(child);
      }
      //根目录下的节点排序
      Collections.sort(children);
      //获取当前线程创建的节点在根目录中排第几
      int i = children.indexOf(pathName.substring(1));
      //是不是第一个, 如果是则说明抢锁成功; 如果不是, 则watch当前线程创建节点的前一个节点是否被删除(删除);
      if (i == 0) {
            System.out.println(threadName + " i am first...");
            try {
                //这里的作用就是不让第一个线程获得锁释放锁跑得太快, 导致后面的线程还没建立完监听第一个节点就被删了
                zk.setData("/", threadName.getBytes(), -1);
                countDownLatch.countDown();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
      } else {
            //9个没有获取到锁的线程都去调用zk.exists, 去监控各自自己前面的节点, 而没有去监听父节点
            //如果各自前面的节点发生删除事件的时候才回调自己, 并关注被删除的事件(所以会执行process回调)
            zk.exists("/" + children.get(i-1), this, this, "sdf");
      }
    }
   
    //上面zk.exists()的监听
    //监听的节点发生变化的Watcher事件监听
    @Override
    public void process(WatchedEvent event) {
      //如果第一个获得锁的线程释放锁了, 那么其实只有第二个线程会收到回调事件
      //如果不是第一个哥们某一个挂了, 也能造成他后边的收到这个通知, 从而让他后边那个去watch挂掉这个哥们前边的, 保持顺序
      switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                zk.getChildren("/", false, this ,"sdf");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
      }
    }
   
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
      //TODO
    }
}
三.分布式锁的测试类

public class TestLock {
    ZooKeeper zk;
   
    @Before
    public void conn () {
      zk= ZKUtils.getZK();
    }
   
    @After
    public void close () {
      try {
            zk.close();
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
    }
   
    @Test
    public void lock() {
      //10个线程都去抢锁
      for (int i = 0; i < 10; i++) {
            new Thread() {
                @Override
                public void run() {
                  WatchCallBack watchCallBack = new WatchCallBack();
                  watchCallBack.setZk(zk);
                  String threadName = Thread.currentThread().getName();
                  watchCallBack.setThreadName(threadName);
                  //每一个线程去抢锁
                  watchCallBack.tryLock();
                  //抢到锁之后才能干活
                  System.out.println(threadName + " working...");
                  try {
                        Thread.sleep(1000);
                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
                  //干完活释放锁
                  watchCallBack.unLock();
                }
            }.start();
      }
      while(true) {
      }
    }
}
8.zk实现分布式队列和分布式屏障


(1)分布式队列的实现


步骤一:所有客户端都到"/queue"节点下创建一个临时次序节点。

步骤二:通过调用getChildren()方法来获取"/queue"节点下的所有子节点。

步骤三:客户端确定自己的节点序号在所有子节点中的次序。

步骤四:如果自己不是序号最小的子节点,那么就必要进入等待,同时调用exists()方法向比自己序号小的最后一个节点注册Watcher监听。

步骤五:如果客户端收到Watcher变乱通知,重复步骤二。

(2)分布式屏障的实现


"/barrier"节点是一个已存在的默认节点,"/barrier"节点的值是数字n,表示Barrier值,好比10。

步骤一:首先,所有客户端都必要到"/barrier"节点下创建一个临时节点。

步骤二:然后,客户端通过getData()方法获取"/barrier"节点的数据内容,好比10。

步骤三:接着,客户端通过getChildren()方法获取"/barrier"节点下的所有子节点,同时注册对子节点列表变更的Watcher监听。

步骤四:如果客户端发现"/barrier"节点的子节点个数不足10个,那么就必要进入等待。

步骤五:如果客户端吸取到了Watcher变乱通知,那么就重复步骤三。


   文章转载自:东阳马生架构
原文链接:zk底子—4.zk实现分布式功能 - 东阳马生架构 - 博客园
体验地点:引迈 - JNPF快速开发平台_低代码开发平台_零代码开发平台_流程设计器_表单引擎_工作流引擎_软件架构

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: zk底子—zk实现分布式功能