建议阅读:
2025最新java面试题,十次面试,九次会问!文章浏览阅读125次。JDK:Java Development Kit 的简称,java 开辟工具包,提供了 java 的开辟情况和运行情况。JRE:Java Runtime Environment 的简称,java 运行情况,为 java 的运行提供了所需情况具体来说 JDK 其实包含了 JRE,同时还包含了编译 java 源码的编译器 javac,还包含了很多 java 程序调试和分析的工具。简单来说如果你需要运行 java 程序,只需安装 JRE 就可以了,如果你需要编写 java 程序,需要安装 JDK。https://blog.csdn.net/x1ao_fe1/article/details/145192376?sharetype=blogdetail&sharerId=145192376&sharerefer=PC&sharesource=x1ao_fe1&spm=1011.2480.3001.8118
创建会话
客户端可以通过创建一个Zookeeper实例来毗连Zookeeper服务器,那么首先我们来看看Zookeeper的构造函数:
- //第一个构造
- public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
- throws IOException
- {
- this(connectString, sessionTimeout, watcher, false);
- }
- //第二个构造
- public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
- boolean canBeReadOnly) throws IOException {
- this(connectString, sessionTimeout, watcher, canBeReadOnly,
- createDefaultHostProvider(connectString));
- }
- //第三个构造
- public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
- boolean canBeReadOnly, HostProvider aHostProvider)
- throws IOException {
- this(connectString, sessionTimeout, watcher, canBeReadOnly,
- aHostProvider, null);
- }
- //第四个构造
- public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
- boolean canBeReadOnly, HostProvider aHostProvider,
- ZKClientConfig clientConfig) throws IOException {
- LOG.info("Initiating client connection, connectString=" + connectString
- + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
- if (clientConfig == null) {
- clientConfig = new ZKClientConfig();
- }
- this.clientConfig = clientConfig;
- watchManager = defaultWatchManager();
- watchManager.defaultWatcher = watcher;
- ConnectStringParser connectStringParser = new ConnectStringParser(
- connectString);
- hostProvider = aHostProvider;
- cnxn = createConnection(connectStringParser.getChrootPath(),
- hostProvider, sessionTimeout, this, watchManager,
- getClientCnxnSocket(), canBeReadOnly);
- cnxn.start();
- }
复制代码 可以看到Zookeeper的源码中总共提供了四个构造函数,参数从三个到六个不等,我们分别来看看这几个参数的说明:
connectString
connectString代表Zookeeper的服务器列表,每个zk服务器使用host:port字符串组成,多个zk服务器之间使用英文输入法的逗号分隔,需要注意的是我们还可以在毗连中指定根目次,例如
192.168.1.3 :2181/zk-root,则当客户端毗连被骗前zk服务器的时候,后续所有的操作都会基于当前目次作为根目次举行操作
sessionTimeout
sessionTimeout指的是会话时间,单元是毫秒,zk引入了会话的概念,在一个会话周期内,zk会通过心跳检测机制来维持会话的有用性,一旦在当前指定的时间内没有完成心跳检测,则视为当前会话已经失效
watcher
watcher是zk中提供的事件通知机制,在zk中答应传入当前范例的接口实例来作为客户端默认的事件通知处置惩罚器,如果我们当前不需要引入事件通知机制,当前参数通报null即可
canBeReadOnly
canBeReadOnly是zk中提供的一个保护机制,默认情况下zk集群如果出现大半的机器失去网络毗连(宕机),那么zk将进入保护,不再处置惩罚任何客户端请求(包罗读),但是如果我们需要继续提供读服务操作,则需要将当前的参数设置为true
aHostProvider
aHostProvider是zk提供的支持客户端的自界说行为操作,内部提供next、onConnected以及updateServerList等操作
clientConfig
clientConfig是zk提供的用来灵活配置的配置类,可以将毗连的信息配置进去,例如zk超时时间,毗连方式,毗连信息等
了解了Zookeeper的构造函数,我们来编写第一个demo,创建一个zk会话实例:
- /**
- * 会话连接demo
- */
- public class SessionDemo implements Watcher {
- private static CountDownLatch latch = new CountDownLatch(1);
- public static void main(String[] args) throws IOException, InterruptedException {
- ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181",5000,new SessionDemo());
- System.out.println(zooKeeper.getState());
- latch.await();
- }
- @Override
- public void process(WatchedEvent event) {
- System.out.println("触发了监听事件:"+event);
- //如果当前是异步连接状态
- if(Event.KeeperState.SyncConnected == event.getState()){
- latch.countDown();
- }
- }
- }
复制代码 点击运行以后,可以看到控制台的输出:
- CONNECTING
- 触发了监听事件:WatchedEvent state:SyncConnected type:None path:null
复制代码 创建节点
zookeeper提供了两种创建节点的方式,一种是同步创建,一种是异步,我们首先来看看create方法:
- //创建同步节点
- public String create(final String path, byte data[], List<ACL> acl,CreateMode createMode)throws KeeperException, InterruptedException
- //创建异步节点
- public void create(final String path, byte data[], List<ACL> acl,CreateMode createMode, StringCallback cb, Object ctx)
复制代码 可以看到异步创建节点的接口多了两个参数,一个是callback回调,一个是通报的object对象,接下来我们针对这些参数举行一下说明:
path
path是创建zk节点的时候指定的对应路径,如:/root
data[]
data[]是创建节点的时候对应存储的内容
acl
acl是创建节点的时候的策略
createMode
createMode是创建节点的时候指定的节点范例,通常指定四种范例(实际不止):
1.持久化节点--PERSISTENT
2.持久化顺序节点--PERSISTENT _ S E QUENTIAL
3.临时节点--EPHEMERAL
4.临时顺序节点-- EPHEMERAL _ S EQUENTIAL
cb
cb是注册一个异步回调函数,范例为StringCallback,一样平常重写void processResult( int rc , String path , Object ctx , String
n ame ) ;当zk创建节点完毕以后,会自动调用这个方法,可以在当前方法内处置惩罚对应的业务逻辑
ctx
ctx在创建节点的时候可以通报的对象,在StringCallback的回调函数执行的时候使用(通报),通常会通报上下文(Context)
注意:这里需要注意的是,zk默认创建的时候不会给我们举行序列化,需要我们手动序列化转换为字节数组通报节点存储的内容,并且zk默认不答应跨节点创建,即如果上层路径不存在的情况下,直接创建子节点,并且如果当前路径对应节点已经创建,再去创建并不会覆盖原来的内容,会抛出NodeExistsException异常
创建同步节点
接下来,我们来创建一个同步节点:
- /**
- *创建同步节点demo
- **/
- public class CreateDemo implements Watcher {
- private static final CountDownLatch LATCH = new CountDownLatch(1);
- public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
- ZooKeeper zk = new ZooKeeper("127.0.0.1:2181",
- 5000,new CreateDemo());
- LATCH.await();
- //等待连接上zk以后
- String path1 = zk.create("/test01", "test01".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- System.out.println("创建无序节点:"+path1);
- String path2 = zk.create("/test02", "test02".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
- System.out.println("创建有序节点:"+path2);
- }
- @Override
- public void process(WatchedEvent event) {
- if(Event.KeeperState.SyncConnected == event.getState()){
- LATCH.countDown();
- }
- }
- }
复制代码 我们来运行一下,可以看到控制台输出的内容:
- 创建无序节点:/test01
- 创建有序节点:/test020000000002
复制代码 创建异步节点
创建异步节点,我们需要注意通报ctx参数,以及StringCallback的实例,并且我们需要注意的是,create方法并不会阻塞后续业务执行:
- /**
- *创建异步节点demo
- **/
- public class CreateAyncDemo implements Watcher {
- private static final CountDownLatch LATCH = new CountDownLatch(1);
- public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
- ZooKeeper zk = new ZooKeeper("127.0.0.1:2181",
- 5000,new CreateAyncDemo());
- LATCH.await();
- //等待连接上zk以后
- zk.create("/testAsyn001", "testAsyn001".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
- new IStringCallback(),"我是上下文");
- zk.create("/testAsyn002", "testAsyn002".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
- new IStringCallback(),"我是上下文");
- Thread.sleep(Integer.MAX_VALUE);
- }
- @Override
- public void process(WatchedEvent event) {
- if(Event.KeeperState.SyncConnected == event.getState()){
- LATCH.countDown();
- }
- }
- }
- class IStringCallback implements AsyncCallback.StringCallback{
- @Override
- public void processResult(int rc, String path, Object ctx, String name) {
- System.out.println("创建节点完毕的回调函数:[rc:"+rc+",path:"+path+",ctx:"+ctx+",name:"+name+"]");
- }
- }
复制代码 我们使用Thread.sleep阻塞一下当前的业务,运行可以看到控制台乐成输出:
- 创建节点完毕的回调函数:[rc:0,path:/testAsyn001,ctx:我是上下文,name:/testAsyn001]
- 创建节点完毕的回调函数:[rc:0,path:/testAsyn002,ctx:我是上下文,name:/testAsyn0020000000009]
复制代码 删除节点
客户端通过delete方法来删除某一个节点,delete方法如下:
- //同步删除
- public void delete(final String path, int version)
- throws InterruptedException, KeeperException
- //异步删除
- public void delete(final String path, int version, VoidCallback cb,Object ctx)
复制代码 可以看到删除节点操作和创建节点一样,支持同步和异步操作,但是这里我们需要注意的是,在Zookeeper中,只运行按照顺序删除节点,即子节点存在无法直接删除父节点,必须优先删除下面的所有子节点!接下来我们编写一个案例,删除节点:
- public class DeleteDemo implements Watcher {
- private static final CountDownLatch LATCH = new CountDownLatch(1);
- public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
- ZooKeeper zk = new ZooKeeper("127.0.0.1:2181",
- 5000,new DeleteDemo());
- LATCH.await();
- //等待连接上zk以后
- String path1 = zk.create("/test01", "test01".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- System.out.println("创建无序节点:"+path1);
- String path2 = zk.create("/test02", "test02".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
- System.out.println("创建有序节点:"+path2);
- //触发同步删除
- zk.delete("/test01",0);
- //触发异步删除
- zk.delete("/test02",0,new IVoidCallback(),"我是触发异步删除的数据");
- Thread.sleep(Integer.MAX_VALUE);
- }
- @Override
- public void process(WatchedEvent event) {
- if(Event.KeeperState.SyncConnected == event.getState()){
- LATCH.countDown();
- }
- }
- }
- class IVoidCallback implements AsyncCallback.VoidCallback{
- @Override
- public void processResult(int rc, String path, Object ctx) {
- System.out.println("异步删除节点触发:[path:"+path+",rc:"+rc+",ctx:"+ctx+"]");
- }
- }
复制代码 运行以后,可以看到控制台的输出如下:
- 创建无序节点:/test01
- 创建有序节点:/test020000000011
- 异步删除节点触发:[path:/test02,rc:-101,ctx:我是触发异步删除的数据]
复制代码 获取数据
在Zookeeper中有两种获取数据的接口,一种是获取当前节点的相关数据信息,一种是获取当前节点下的子节点相关的数据信息,接下来我们来看看这两种Api接口
getData
客户端通过getData方法可以获取当前节点的相关数据内容,方法如下:
- public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException
- //第二个
- public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException
- //第三个
- public void getData(final String path, Watcher watcher,DataCallback cb, Object ctx)
- //第四个
- public void getData(String path, boolean watch, DataCallback cb, Object ctx)
复制代码 可见这里也提供了同步获取数据和异步获取数据的方法,接下来我们来看看对应参数的说明:
path
path是需要获取数据的节点所在的路径
watcher
watcher是用来注册的通知接口,如果节点发生变更,就会通过当前的接口通知客户端
stat
stat是用来形貌该数据节点的状态信息,如果我们需要获取当前节点的状态信息,可以在客户端创建该变量传入方法中,有服务端更换该变量
cb
cb是异步获取数据的时候注册的回调函数接口,范例为DataCallback
ctx
ctx是在异步获取数据过程中通报给异步回调函数接口的实例,一样平常用来通报上下文
接下来,我们来编写代码获取一下节点的内容数据:
- public class GetDataDemo implements Watcher {
- private static final CountDownLatch LATCH = new CountDownLatch(1);
- public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
- ZooKeeper zk = new ZooKeeper("127.0.0.1:2181",5 * 1000,
- new GetDataDemo());
- LATCH.await();
- String path = zk.create("/testGetData01", "/testGetData01".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- System.out.println("创建节点成功:"+path);
- //同步调用查询接口
- Stat stat = new Stat();
- byte[] data = zk.getData("/testGetData01", true, stat);
- System.out.println("同步获取当前节点内容:"+new String(data)+",stat:"+stat);
- //异步调用查询接口
- zk.getData("/testGetData01", true,new IDataCallback(),"我是异步获取数据的回调函数");
- Thread.sleep(5 * 1000);
- }
- @Override
- public void process(WatchedEvent event) {
- if(Event.KeeperState.SyncConnected == event.getState()){
- LATCH.countDown();
- }
- }
- }
- class IDataCallback implements AsyncCallback.DataCallback{
- @Override
- public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
- System.out.println("异步获取节点数据:[path"+path+",ctx:"+ctx+",rc:"+rc+",data:"+new String(data)+","+data.toString()+",stat:"+stat+"]");
- }
- }
复制代码 运行以后,控制台的输出如下:
- 创建节点成功:/testGetData01
- 同步获取当前节点内容:/testGetData01,stat:40,40,1579166587745,1579166587745,0,0,0,72059329994162178,14,0,40
- 异步获取节点数据:[path/testGetData01,ctx:我是异步获取数据的回调函数,rc:0,data:/testGetData01,[B@22987e0b,stat:40,40,1579166587745,1579166587745,0,0,0,72059329994162178,14,0,40
- ]
复制代码 getChildren
如果当前客户端想要获取当前节点下的子节点的内容数据,就需要使用getChildren方法,方法的界说如下:
- public List<String> getChildren(final String path, Watcher watcher) throws KeeperException, InterruptedException
- //第二个
- public List<String> getChildren(String path, boolean watch)throws KeeperException, InterruptedException
- //第三个
- public void getChildren(final String path, Watcher watcher,ChildrenCallback cb, Object ctx)
- //第四个
- public void getChildren(String path, boolean watch, ChildrenCallback cb,Object ctx)
- //第五个
- public List<String> getChildren(final String path, Watcher watcher,Stat stat) hrows KeeperException, InterruptedException
- //第六个
- public List<String> getChildren(String path, boolean watch, Stat stat) hrows KeeperException, InterruptedException
- //第七个
- public void getChildren(final String path, Watcher watcher,Children2Callback cb, Object ctx)
- //第八个
- public void getChildren(String path, boolean watch, Children2Callback cb,Object ctx)
复制代码 可见与getData等Api一样,都提供了多个场景下使用的api接口,并且都有同步异步两种Api的使用,而这里的参数前面都有介绍,可以参照上面的参数说明。接下来,我们来看看同步获取子节点数据与异步Api的使用:
- public class GetChildrenDemo implements Watcher {
- private static final CountDownLatch LATCH = new CountDownLatch(1);
- public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
- ZooKeeper zk = new ZooKeeper("127.0.0.1:2181",5 * 1000,
- new GetChildrenDemo());
- LATCH.await();
- String path = zk.create("/testGetChildren01", "/testGetChildren01".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- System.out.println("创建节点成功:"+path);
- //同步调用查询接口
- Stat stat = new Stat();
- List<String> data = zk.getChildren("/testGetChildren01", true);
- System.out.println("同步获取当前节点下的子节点:"+ data);
- //异步调用查询接口
- zk.getChildren("/testGetChildren01", true,new IChildrenCallback(),"我是异步获取子节点的回调函数");
- Thread.sleep(5 * 1000);
- }
- @Override
- public void process(WatchedEvent event) {
- if(Event.KeeperState.SyncConnected == event.getState()){
- LATCH.countDown();
- }
- }
- }
- class IChildrenCallback implements AsyncCallback.ChildrenCallback{
- @Override
- public void processResult(int rc, String path, Object ctx, List<String> children) {
- System.out.println("异步获取获取子节点:[rc:"+rc+",path:"+path+",ctx:"+ctx+",children:"+children);
- }
- }
复制代码 可以看到当前的节点刚刚创建,并没有子节点,以是得到的子节点列表为空:
- 创建节点成功:/testGetChildren01
- 同步获取当前节点下的子节点:[]
- 异步获取获取子节点:[rc:0,path:/testGetChildren01,ctx:我是异步获取子节点的回调函数,children:[]
复制代码 修改节点
在创建节点以后,如果客户端想要更改节点的内容,这个时候就需要使用修改的Api,在zookeeper中修改节点的内容使用setData方法,如下:
- //同步修改节点数据
- Stat setData(final String path, byte d ata[], in t version)
- //异步修改节点数据
- void setData(final String path, byte data[], in t version, StatCallback cb, Object
- ctx)
复制代码 zookeeper的修改Api仅有两个,对应同步和异步操作,并且在操作过程中渴望传入对应的版本,防止误操作,接下来我们来编码实践:
- public class SetDataDemo implements Watcher {
- private static final CountDownLatch LATCH = new CountDownLatch(1);
- public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
- ZooKeeper zk = new ZooKeeper("127.0.0.1:2181",5 * 1000,
- new SetDataDemo());
- LATCH.await();
- Stat stat = new Stat();
- String path = zk.create("/testSetData", "/testSetData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,stat);
- System.out.println("创建节点成功:"+path);
- //同步修改数据
- Stat stat1 = zk.setData(path, "/testSetData1".getBytes(), stat.getVersion());
- byte[] data = zk.getData(path, true, stat);
- System.out.println("修改后的内容:"+new String(data)+",stat:"+stat1);
- //异步修改数据
- zk.setData(path,"/testSetData2".getBytes(),stat1.getVersion(),new IStatCallback(),"状态异步修改");
- Thread.sleep(5 * 1000);
- }
- @Override
- public void process(WatchedEvent event) {
- if(Event.KeeperState.SyncConnected == event.getState()){
- LATCH.countDown();
- }
- }
- }
- class IStatCallback implements AsyncCallback.StatCallback{
- @Override
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- System.out.println("修改数据成功:[rc:"+rc+",path:"+path+",ctx:"+ctx+",stat:"+stat+"]");
- }
- }
复制代码 将我们创建的节点举行修改后,再次查询,并且再次异步修改,控制台输出如下,可见的确修改乐成了:
- 创建节点成功:/testSetData
- 修改后的内容:/testSetData1,stat:54,55,1579243770843,1579243770939,1,0,0,72058810458177539,13,0,54
- 修改数据成功:[rc:0,path:/testSetData,ctx:状态异步修改,stat:54,56,1579243770843,1579243771008,2,0,0,72058810458177539,13,0,54
- ]
复制代码 总结
前面我们通过编码实践来学习了Zookeeper的原生Api,也发现了原生Api的一些良好的特点,例如:
zookeeper中对节点的操作全部提供了同步和异步两种操作方式,并且有多种不怜悯况下的方法重载
但同时我们在操作Zookeeper的过程中也发现了一些很不灵活的点,一不警惕就有大概导致开辟过程中出现漏洞,例如:
1.原生Api的创建节点,必须按照层级创建,因此建议创建之前我们也要检查该节点是否存在
2.Zookeeper中节点的删除操作也必须按照层级举行删除,即当前节点下存在子节点,必须优先删除子节点,再去删除当前节点,建议删除之前优先检查一下是否存在子节点
除此之外,如果我们对某个节点举行监听,如果细致的话也会发现,原生的Api实现中,监听触发一次以后就不会再触发了。可见Zookeeper的原生Api实现对于企业级开辟而言,较为复杂和不人性化,那么我们能不能将这些操作举行封装,变成简易操作?答案是肯定的,基于 Zookeeper的Api举行封装的主流框架重要有两个:
一个是个人开辟的ZkClient,一个是目前Apache的开源项目-Curator,并且由于Curator版本维护更稳定,而且除了常见的zk节点操作以外,包罗一些分布式方案,如选举,分布式锁,分布式队列等常见分布式操作都有举行封装,使得操作更加简单便利,因此更推荐使用Curator来操作Zookeeper
建议阅读:
2025最新java面试题,十次面试,九次会问!文章浏览阅读125次。JDK:Java Development Kit 的简称,java 开辟工具包,提供了 java 的开辟情况和运行情况。JRE:Java Runtime Environment 的简称,java 运行情况,为 java 的运行提供了所需情况具体来说 JDK 其实包含了 JRE,同时还包含了编译 java 源码的编译器 javac,还包含了很多 java 程序调试和分析的工具。简单来说如果你需要运行 java 程序,只需安装 JRE 就可以了,如果你需要编写 java 程序,需要安装 JDK。https://blog.csdn.net/x1ao_fe1/article/details/145192376?sharetype=blogdetail&sharerId=145192376&sharerefer=PC&sharesource=x1ao_fe1&spm=1011.2480.3001.8118
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |