马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
1. 创建 zk 的客户端类:
- 实现与 zk Server 的毗连、
- 根据路径及URL(ip+port)创建结点
- 由结点路径获取data (ip+port) 信息
- #pragma once
- #include<semaphore.h>
- #include<zookeeper/zookeeper.h>
- #include<string>
- //封装zk客户端类
- class ZkClient{
- public:
- ZkClient();
- ~ZkClient();
- //zkCli 启动连接zkServer
- void Star();
- //在zkServer 根据指定路径(/Servicename/functionname : ip+port)创建znode结点
- void Create (const char *path,const char*data ,int datalen,int state=0 );
- //根据参数指定路径参数、值
- std::string GetData(const char*path);
- private:
- //客户端句柄
- zhandle_t *m_zhandle;
- };
复制代码
1.1. 创建客户端与服务器的毗连:
void ZkClient::Star()代码解析
- //全局观察器,获取zkServer 给zkClien 的回复通知
- void global_watcher(zhandle_t *zh, int type, int state, const char *path,void *watcherCtx)
- {
- if(type==ZOO_SESSION_EVENT)//回调消息类型是会话相关的类型
- {
- if(state==ZOO_CONNECTED_STATE)
- {
- sem_t*sem=(sem_t*)zoo_get_context(zh);
- sem_post(sem);
- }
- }
- }
- // zkCli 启动连接zkServer
- void ZkClient::Star()
- {
- std::string host =MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");
- std::string port =MprpcApplication::GetInstance().GetConfig().Load("zookeeperport");
- //param host comma separated host:port pairs, each corresponding to a zk
- //server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
- std::string connstr=host+":"+port;
- m_zhandle=zookeeper_init(connstr.c_str(),global_watcher,3000,nullptr,nullptr,0);
- if(m_zhandle==nullptr)
- {
- LOG_INFO("zookeeper_init error!");
- std::cout<<"zookeeper_init error!"<<std::endl;
- exit(EXIT_FAILURE);
- }
- /* This method creates a new handle and a zookeeper session that correspondsto that handle.
- Session establishment is asynchronous:
- meaning that the session should not be considered established
- until (and unless) anevent of state ZOO_CONNECTED_STATE is received.*/
- sem_t sem;
- sem_init(&sem,0,0);
- zoo_set_context(m_zhandle,&sem);
- sem_wait(&sem);
- LOG_ERROR("zookeeper_init success!");
- }
复制代码 我们需要调用 zkCli 的接口API 创建与 server 的毗连:zookeeper_init
- ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn,
- int recv_timeout, const clientid_t *clientid, void *context, int flags);
- /*
- const char *host:ZooKeeper 服务器的地址 IP :Port
- param host comma separated host:port pairs
- server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
- */
- /*
- watcher_fn fn ,回调函数:返回 server 与 Cli 的连接状态信息
- */
- /*
- int recv_timeout:超时时间(默认30000 ms->30s)
- */
- /*
- const clientid_t *clientid: 一个指向 clientid_t 结构的指针包含了客户端的会话 ID 和会话密码
- 如果这是客户端的第一次连接(即没有现有的会话),则此参数应该为 NULL。
- */
- /*
- void *context: 一个用户定义的上下文指针,被传递给 watcher_fn 函数
- 这允许用户在回调函数中访问特定于应用程序的数据
- */
- /*
- int flags: 初始化连接的标志:这些标志的具体含义取决于 ZooKeeper 客户端库的版本
- 连接选项,如是否要启用只读模式等。
- */
复制代码 zookeeper_init 基于zookeeper_mt(多线程),实现异步毗连
主线程:执行当前的zookeeper_init内容
pthread_create:
子线程1:网络I/O线程 由于客户端无高并发要求,采用 poll
子线程2:watcher回调线程返回server毗连后的状态信息,判定是否乐成
主线程:
zookeeper_init该函数返回一个创建的 zkCli 句柄,
若返回值为空,则说明该语句执行失败
返回值不为空,并不能说明毗连已经创建,而表现该语句执行乐成并创建了m_zhandle 的内存空间
- zookeeper_init 在创建完 m_zhandle 的句柄后,设置信号量并初始为0(资源不可用)
- 将信号量设置为 m_zhandle 的上下文 (新增长的关联信息)
- 在 sem_wait(&sem) 调用时,由于信号量的值为 0 线程将会阻塞,直到在watcher线程中判定毗连乐成后调用了 sem_post(&sem) 来增长信号量的值被叫醒,至此毗连结束
子线程1:网络I/O线程
进行网络的数据收发,发送客户端的毗连请求、结点的创建请求,获取server返回cli的watcher的回调信息
子线程2:watcher回调线程
- type==ZOO_SESSION_EVENT//回调消息类型是会话相关的类型
- state==ZOO_CONNECTED_STATE)//判断连接状态
- //连接成功后:
- sem_t*sem=(sem_t*)zoo_get_context(zh);//由句柄指针获取句柄的上下文,关联信息sem
- sem_post(sem);//增加信号量
复制代码
1.2. 创建结点并写入数据信息:
- // 在zkServer 根据指定路径(/Servicename/functionname : ip+port)创建znode结点
- void ZkClient::Create(const char *path, const char *data, int datalen, int state)
- {
- char path_buf[128];
- int path_len=sizeof(path_buf);
- int flag;
- //判断结点是否已经存在,避免重重复创建
- flag=zoo_exists(m_zhandle,path,0,nullptr);
- if(ZNONODE==flag)//结点不存在
- {
- flag=zoo_create(m_zhandle,path,data,datalen,&ZOO_OPEN_ACL_UNSAFE,state,path_buf,path_len);
- if(flag==ZOK)
- {
- LOG_INFO("znode create success...path:%s:",path);
- }
- else
- {
- LOG_ERROR("znode create error,path:%s,flag:%d",path,flag);
- exit(EXIT_FAILURE);
- }
- }
-
- }
复制代码
1.3. 获取结点数据:
- // 根据参数指定路径->获取结点值
- std::string ZkClient::GetData(const char *path)
- {
- char buff[64];
- int buff_len=sizeof(buff);
- //调用接口实现由结点路径、获取该结点的data 写入buff
- int flag=zoo_get(m_zhandle,path,0,buff,&buff_len,nullptr);
- if(flag!=ZOK)//返回值为ZOK 表示成功!
- {
- std::cout<<"get znode errr ... path:"<<path<<std::endl;
- return "";
- }
- return buff;
- }
复制代码
构造析构函数:
- ZkClient::ZkClient():m_zhandle(nullptr)
- {
- }
- ZkClient::~ZkClient()
- {
- if(m_zhandle!=nullptr)
- zookeeper_close(m_zhandle);
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |