ToB企服应用市场:ToB评测及商务社交产业平台

标题: 详解ZooKeeper在微服务注册中心的应用 [打印本页]

作者: 雁过留声    时间: 2024-2-27 00:01
标题: 详解ZooKeeper在微服务注册中心的应用
本文分享自华为云社区《SpringCloud ZooKeeper 详解,以及与Go、Rust等非Java服务的集成》,作者: 张俭。
ZooKeeper,是一个开源的分布式协调服务,不仅支持分布式选举、任务分配,还可以用于微服务的注册中心和配置中心。本文,我们将深入探讨ZooKeeper用做微服务注册中心的场景。
ZooKeeper中的服务注册路径

SpringCloud ZooKeeper遵循特定的路径结构进行服务注册
  1. /services/${spring.application.name}/${serviceId}
复制代码
示例:
  1. /services/provider-service/d87a3891-1173-45a0-bdfa-a1b60c71ef4e
复制代码
/services和/${spring.application.name}是ZooKeeper中的永久节点,/${serviceId}是临时节点,当服务下线时,ZooKeeper会自动删除该节点。
注:当微服务的最后一个实例下线时,SpringCloud ZooKeeper框架会删除/${spring.application.name}节点。
ZooKeeper中的服务注册数据

下面是一个典型的服务注册内容示例:
  1. {
  2. "name":"provider-service",
  3. "id":"d87a3891-1173-45a0-bdfa-a1b60c71ef4e",
  4. "address":"192.168.0.105",
  5. "port":8080,
  6. "sslPort":null,
  7. "payload":{
  8. "@class":"org.springframework.cloud.zookeeper.discovery.ZookeeperInstance",
  9. "id":"provider-service",
  10. "name":"provider-service",
  11. "metadata":{
  12. "instance_status":"UP"
  13. }
  14. },
  15. "registrationTimeUTC":1695401004882,
  16. "serviceType":"DYNAMIC",
  17. "uriSpec":{
  18. "parts":[
  19. {
  20. "value":"scheme",
  21. "variable":true
  22. },
  23. {
  24. "value":"://",
  25. "variable":false
  26. },
  27. {
  28. "value":"address",
  29. "variable":true
  30. },
  31. {
  32. "value":":",
  33. "variable":false
  34. },
  35. {
  36. "value":"port",
  37. "variable":true
  38. }
  39. ]
  40. }
  41. }
复制代码
其中,address、port和uriSpec是最核心的数据。uriSpec中的parts区分了哪些内容是可变的,哪些是固定的。
SpringCloud 服务使用OpenFeign互相调用

一旦两个微服务都注册到了ZooKeeper,那么它们就可以通过OpenFeign互相调用了。简单的示例如下
服务提供者

创建SpringBoot项目

创建SpringBoot项目,并添加spring-cloud-starter-zookeeper-discovery和spring-boot-starter-web依赖。
配置application.yaml
  1. spring:
  2. application:
  3. name: provider-service
  4. cloud:
  5. zookeeper:
  6. connect-string: localhost:2181
  7. server:
  8. port: 8082
复制代码
注册到ZooKeeper

在启动类上添加@EnableDiscoveryClient注解。
创建一个简单的REST接口
  1. @RestController
  2. public class ProviderController {
  3. @GetMapping("/hello")
  4. public String hello() {
  5. return "Hello from Provider Service!";
  6. }
  7. }
复制代码
服务消费者

创建SpringBoot项目

创建SpringBoot项目,并添加spring-cloud-starter-zookeeper-discovery、spring-cloud-starter-openfeign和spring-boot-starter-web依赖。
配置application.yaml
  1. spring:
  2. application:
  3. name: consumer-service
  4. cloud:
  5. zookeeper:
  6. connect-string: localhost:2181
  7. server:
  8. port: 8081
复制代码
注册到ZooKeeper

在启动类上添加@EnableDiscoveryClient注解。
创建一个REST接口,通过OpenFeign调用服务提供者
  1. @RestController
  2. public class ConsumerController {
  3. @Autowired
  4. private ProviderClient providerClient;
  5. @GetMapping("/getHello")
  6. public String getHello() {
  7. return providerClient.hello();
  8. }
  9. }
复制代码
运行效果
  1. curl localhost:8081/getHello -i
  2. HTTP/1.1 200
  3. Content-Type: text/plain;charset=UTF-8
  4. Content-Length: 28
  5. Date: Wed, 18 Oct 2023 02:40:57 GMT
  6. Hello from Provider Service!
复制代码
非Java服务在SpringCloud ZooKeeper中注册

可能有些读者乍一看觉得有点奇怪,为什么要在SpringCloud ZooKeeper中注册非Java服务呢?没有这个应用场景。
当然,这样的场景比较少,常见于大部分项目都是用SpringCloud开发,但有少部分项目因为种种原因,不得不使用其他语言开发,比如Go、Rust等。这时候,我们就需要在SpringCloud ZooKeeper中注册非Java服务了。
对于非JVM语言开发的服务,只需确保它们提供了Rest/HTTP接口并正确地注册到ZooKeeper,就可以被SpringCloud的Feign客户端所调用。
Go服务在SpringCloud ZooKeeper

example代码组织:
  1. ├── consumer
  2. │ └── consumer.go
  3. ├── go.mod
  4. ├── go.sum
  5. └── provider
  6. └── provider.go
复制代码
Go服务提供者在SpringCloud ZooKeeper

注:该代码的质量为demo级别,实际生产环境需要更加严谨的代码,如重连机制、超时机制、更优秀的服务ID生成算法等。
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "net/http"
  6. "time"
  7. "encoding/json"
  8. "github.com/gin-gonic/gin"
  9. "github.com/samuel/go-zookeeper/zk"
  10. )
  11. const (
  12. zkServers = "localhost:2181" // Zookeeper服务器地址
  13. )
  14. func main() {
  15. // 初始化gin框架
  16. r := gin.Default()
  17. // 添加一个简单的hello接口
  18. r.GET("/hello", func(c *gin.Context) {
  19. c.String(http.StatusOK, "Hello from Go service!")
  20. })
  21. // 注册服务到zookeeper
  22. registerToZookeeper()
  23. // 启动gin服务器
  24. r.Run(":8080")
  25. }
  26. func registerToZookeeper() {
  27. conn, _, err := zk.Connect([]string{zkServers}, time.Second*5)
  28. if err != nil {
  29. panic(err)
  30. }
  31. // 检查并创建父级路径
  32. ensurePathExists(conn, "/services")
  33. ensurePathExists(conn, "/services/provider-service")
  34. // 构建注册的数据
  35. data, _ := json.Marshal(map[string]interface{}{
  36. "name": "provider-service",
  37. "address": "127.0.0.1",
  38. "port": 8080,
  39. "sslPort": nil,
  40. "payload": map[string]interface{}{"@class": "org.springframework.cloud.zookeeper.discovery.ZookeeperInstance", "id": "provider-service", "name": "provider-service", "metadata": map[string]string{"instance_status": "UP"}},
  41. "serviceType": "DYNAMIC",
  42. "uriSpec": map[string]interface{}{
  43. "parts": []map[string]interface{}{
  44. {"value": "scheme", "variable": true},
  45. {"value": "://", "variable": false},
  46. {"value": "address", "variable": true},
  47. {"value": ":", "variable": false},
  48. {"value": "port", "variable": true},
  49. },
  50. },
  51. })
  52. // 在zookeeper中注册服务
  53. path := "/services/provider-service/" + generateServiceId()
  54. _, err = conn.Create(path, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
  55. if err != nil {
  56. log.Fatalf("register service error: %s", err)
  57. } else {
  58. log.Println(path)
  59. }
  60. }
  61. func ensurePathExists(conn *zk.Conn, path string) {
  62. exists, _, err := conn.Exists(path)
  63. if err != nil {
  64. log.Fatalf("check path error: %s", err)
  65. }
  66. if !exists {
  67. _, err := conn.Create(path, []byte{}, 0, zk.WorldACL(zk.PermAll))
  68. if err != nil {
  69. log.Fatalf("create path error: %s", err)
  70. }
  71. }
  72. }
  73. func generateServiceId() string {
  74. // 这里简化为使用当前时间生成ID,实际生产环境可能需要更复杂的算法
  75. return fmt.Sprintf("%d", time.Now().UnixNano())
  76. }
复制代码
调用效果
  1. curl localhost:8081/getHello -i
  2. HTTP/1.1 200
  3. Content-Type: text/plain;charset=UTF-8
  4. Content-Length: 28
  5. Date: Wed, 18 Oct 2023 02:43:52 GMT
  6. Hello from Go Service!
复制代码
Go服务消费者在SpringCloud ZooKeeper
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "log"
  7. "net/http"
  8. "time"
  9. "github.com/samuel/go-zookeeper/zk"
  10. )
  11. const (
  12. zkServers = "localhost:2181" // Zookeeper服务器地址
  13. )
  14. var conn *zk.Conn
  15. func main() {
  16. // 初始化ZooKeeper连接
  17. initializeZookeeper()
  18. // 获取服务信息
  19. serviceInfo := getServiceInfo("/services/provider-service")
  20. fmt.Println("Fetched service info:", serviceInfo)
  21. port := int(serviceInfo["port"].(float64))
  22. resp, err := http.Get(fmt.Sprintf("http://%s:%d/hello", serviceInfo["address"], port))
  23. if err != nil {
  24. panic(err)
  25. }
  26. body, err := io.ReadAll(resp.Body)
  27. if err != nil {
  28. panic(err)
  29. }
  30. fmt.Println(string(body))
  31. }
  32. func initializeZookeeper() {
  33. var err error
  34. conn, _, err = zk.Connect([]string{zkServers}, time.Second*5)
  35. if err != nil {
  36. log.Fatalf("Failed to connect to ZooKeeper: %s", err)
  37. }
  38. }
  39. func getServiceInfo(path string) map[string]interface{} {
  40. children, _, err := conn.Children(path)
  41. if err != nil {
  42. log.Fatalf("Failed to get children of %s: %s", path, err)
  43. }
  44. if len(children) == 0 {
  45. log.Fatalf("No services found under %s", path)
  46. }
  47. // 这里只获取第一个服务节点的信息作为示例,实际上可以根据负载均衡策略选择一个服务节点
  48. data, _, err := conn.Get(fmt.Sprintf("%s/%s", path, children[0]))
  49. if err != nil {
  50. log.Fatalf("Failed to get data of %s: %s", children[0], err)
  51. }
  52. var serviceInfo map[string]interface{}
  53. if err := json.Unmarshal(data, &serviceInfo); err != nil {
  54. log.Fatalf("Failed to unmarshal data: %s", err)
  55. }
  56. return serviceInfo
  57. }
复制代码
Rust服务在SpringCloud ZooKeeper

example代码组织:
  1. ├── Cargo.lock
  2. ├── Cargo.toml
  3. └── src
  4. └── bin
  5. ├── consumer.rs
  6. └── provider.rs
复制代码
Rust服务提供者在SpringCloud ZooKeeper
  1. use std::collections::HashMap;
  2. use std::time::Duration;
  3. use serde_json::Value;
  4. use warp::Filter;
  5. use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher, ZooKeeper};
  6. static ZK_SERVERS: &str = "localhost:2181";
  7. static mut ZK_CONN: Option<ZooKeeper> = None;
  8. struct LoggingWatcher;
  9. impl Watcher for LoggingWatcher {
  10. fn handle(&self, e: WatchedEvent) {
  11. println!("WatchedEvent: {:?}", e);
  12. }
  13. }
  14. #[tokio::main]
  15. async fn main() {
  16. let hello = warp::path!("hello").map(|| warp::reply::html("Hello from Rust service!"));
  17. register_to_zookeeper().await;
  18. warp::serve(hello).run(([127, 0, 0, 1], 8083)).await;
  19. }
  20. async fn register_to_zookeeper() {
  21. unsafe {
  22. ZK_CONN = Some(ZooKeeper::connect(ZK_SERVERS, Duration::from_secs(5), LoggingWatcher).unwrap());
  23. let zk = ZK_CONN.as_ref().unwrap();
  24. let path = "/services/provider-service";
  25. if zk.exists(path, false).unwrap().is_none() {
  26. zk.create(path, vec![], Acl::open_unsafe().clone(), CreateMode::Persistent).unwrap();
  27. }
  28. let service_data = get_service_data();
  29. let service_path = format!("{}/{}", path, generate_service_id());
  30. zk.create(&service_path, service_data, Acl::open_unsafe().clone(), CreateMode::Ephemeral).unwrap();
  31. }
  32. }
  33. fn get_service_data() -> Vec<u8> {
  34. let mut data: HashMap<&str, Value> = HashMap::new();
  35. data.insert("name", serde_json::Value::String("provider-service".to_string()));
  36. data.insert("address", serde_json::Value::String("127.0.0.1".to_string()));
  37. data.insert("port", serde_json::Value::Number(8083.into()));
  38. serde_json::to_vec(&data).unwrap()
  39. }
  40. fn generate_service_id() -> String {
  41. format!("{}", chrono::Utc::now().timestamp_nanos())
  42. }
复制代码
Rust服务消费者在SpringCloud ZooKeeper
  1. use std::collections::HashMap;
  2. use std::time::Duration;
  3. use zookeeper::{WatchedEvent, Watcher, ZooKeeper};
  4. use reqwest;
  5. use serde_json::Value;
  6. static ZK_SERVERS: &str = "localhost:2181";
  7. struct LoggingWatcher;
  8. impl Watcher for LoggingWatcher {
  9. fn handle(&self, e: WatchedEvent) {
  10. println!("WatchedEvent: {:?}", e);
  11. }
  12. }
  13. #[tokio::main]
  14. async fn main() {
  15. let provider_data = fetch_provider_data_from_zookeeper().await;
  16. let response = request_provider(&provider_data).await;
  17. println!("Response from provider: {}", response);
  18. }
  19. async fn fetch_provider_data_from_zookeeper() -> HashMap<String, Value> {
  20. let zk = ZooKeeper::connect(ZK_SERVERS, Duration::from_secs(5), LoggingWatcher).unwrap();
  21. let children = zk.get_children("/services/provider-service", false).unwrap();
  22. if children.is_empty() {
  23. panic!("No provider services found!");
  24. }
  25. // For simplicity, we just take the first child (i.e., service instance).
  26. // In a real-world scenario, load balancing strategies would determine which service instance to use.
  27. let data = zk.get_data(&format!("/services/provider-service/{}", children[0]), false).unwrap();
  28. serde_json::from_slice(&data.0).unwrap()
  29. }
  30. async fn request_provider(provider_data: &HashMap<String, Value>) -> String {
  31. let address = provider_data.get("address").unwrap().as_str().unwrap();
  32. let port = provider_data.get("port").unwrap().as_i64().unwrap();
  33. let url = format!("http://{}:{}/hello", address, port);
  34. let response = reqwest::get(&url).await.unwrap();
  35. response.text().await.unwrap()
  36. }
复制代码
 
点击关注,第一时间了解华为云新鲜技术~
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4