Dubbo源码(五) - 服务目录

打印 上一主题 下一主题

主题 866|帖子 866|积分 2598

前言

本文基于Dubbo2.6.x版本,中文注释版源码已上传github:xiaoguyu/dubbo
今天,来聊聊Dubbo的服务目录(Directory)。下面是官方文档对服务目录的定义:
服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。
服务目录持有Invoker对象集合,Dubbo的服务调用均由Invoker发起。
当服务提供者信息发生变化时(比如某一个服务挂了),服务目录也需要动态调整。
继承体系


服务目录目前内置的实现有两个,分别为 StaticDirectory 和 RegistryDirectory。它们均继承自AbstractDirectory,而 AbstractDirectory 实现了 Directory 接口。Directory 接口提供了list(Invocation invocation) 方法,这个方法就是用来获取 invoker 集合的。
再看 RegistryDirectory 实现了 NotifyListener 接口,这个接口中只有一个方法,notify(List urls),当注册中心节点信息发生变化后,触发此方法调整服务目录中的配置信息以及 invoker 集合。
源码分析

上面我们讲了,服务调用需求用到 invoker,而服务目录持有 invoker 集合,并通过 list 方法提供 invoker。下面放上服务消费者Demo中DemoService#sayHello 方法的调用路径

AbstractDirectory 实现了 Directory 接口的 list 方法
  1. public List<Invoker<T>> list(Invocation invocation) throws RpcException {
  2.     if (destroyed) {
  3.         throw new RpcException("Directory already destroyed .url: " + getUrl());
  4.     }
  5.     // 调用 doList 方法列举 Invoker,doList 是模板方法,由子类实现
  6.     List<Invoker<T>> invokers = doList(invocation);
  7.     // 获取路由 Router 列表
  8.     List<Router> localRouters = this.routers; // local reference
  9.     if (localRouters != null && !localRouters.isEmpty()) {
  10.         for (Router router : localRouters) {
  11.             try {
  12.                 // 获取 runtime 参数,并根据参数决定是否进行路由
  13.                 if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
  14.                     // 进行服务路由
  15.                     invokers = router.route(invokers, getConsumerUrl(), invocation);
  16.                 }
  17.             } catch (Throwable t) {
  18.                 logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
  19.             }
  20.         }
  21.     }
  22.     return invokers;
  23. }
复制代码
此方法就两段逻辑:

  • 通过 doList 获取 invoker 集合
  • 通过路由选择合适的 invoker
路由非本文重点,略过。
doList 是模板方法,由子类实现。
StaticDirectory

StaticDirectory 是一个静态服务目录,其 invokers 集合通过构造方法注入,不应被改变。
  1. // StaticDirectory的doList啥都没做,直接返回持有的invokers
  2. protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
  3.     // 列举 Inovker,也就是直接返回 invokers 成员变量
  4.     return invokers;
  5. }
复制代码
StaticDirectory 的其它方法就不分析了,同样很简单。
RegistryDirectory

RegistryDirectory 是动态调整的服务目录,其持有的 invokers 有内部方法生成。
订阅节点

在上篇博文《Dubbo源码(四) - 服务引用(消费者)》中,我留了一个坑,也就是服务引用过程中,创建了注册中心之后,如何订阅节点数据。在RegistryProtocol#doRefer方法中。
其中调用了RegistryDirectory#subscribe(URL url)方法
  1. public void subscribe(URL url) {
  2.     setConsumerUrl(url);
  3.     registry.subscribe(url, this);
  4. }
复制代码
我们用的注册中心是 zookeeper,所以 registry 是 ZookeeperRegistry,而 subscribe 方法的实现在其父类FailbackRegistry中
  1. public void subscribe(URL url, NotifyListener listener) {
  2.     super.subscribe(url, listener);
  3.     removeFailedSubscribed(url, listener);
  4.     try {
  5.         // Sending a subscription request to the server side
  6.         doSubscribe(url, listener);
  7.     } catch (Exception e) {
  8.         ......
  9.         // 订阅失败处理
  10.         addFailedSubscribed(url, listener);
  11.     }
  12. }
复制代码
模板方法,调用子类的 doSubscribe 方法
  1. protected void doSubscribe(final URL url, final NotifyListener listener) {
  2.     try {
  3.         if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
  4.             ...
  5.         } else {
  6.             List<URL> urls = new ArrayList<URL>();
  7.             // 切割路径(providers、configurators、routers等)
  8.             for (String path : toCategoriesPath(url)) {
  9.                 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
  10.                 if (listeners == null) {
  11.                     zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
  12.                     listeners = zkListeners.get(url);
  13.                 }
  14.                 // 缓存操作,获取节点监听器
  15.                 ChildListener zkListener = listeners.get(listener);
  16.                 if (zkListener == null) {
  17.                     listeners.putIfAbsent(listener, new ChildListener() {
  18.                         @Override
  19.                         public void childChanged(String parentPath, List<String> currentChilds) {
  20.                             // 这里和方法末尾的 notify(url, listener, urls); 是调用的同一个方法
  21.                             // 节点变更时触发变更操作
  22.                             ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
  23.                         }
  24.                     });
  25.                     zkListener = listeners.get(listener);
  26.                 }
  27.                 zkClient.create(path, false);
  28.                 // 注册节点监听器
  29.                 List<String> children = zkClient.addChildListener(path, zkListener);
  30.                 if (children != null) {
  31.                     urls.addAll(toUrlsWithEmpty(url, path, children));
  32.                 }
  33.             }
  34.             // 触发节点变更操作
  35.             notify(url, listener, urls);
  36.         }
  37.     } catch (Throwable e) {
  38.         throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
  39.     }
  40. }
复制代码
订阅方法做了3个操作:

  • 切割url,拆分订阅路径
  • 创建节点监听器
  • 触发节点变更操作
这里注意下,订阅时节点数据并没有发生变更,所以需要手动触发 notify 方法。
下面继续看节点变更操作做了什么,调用路径有点深,就不一步一步调试了,直接把路径写在注释上。
  1. // FailbackRegistry#notify(URL url, NotifyListener listener, List<URL> urls) ->
  2. // FailbackRegistry#doNotify(URL url, NotifyListener listener, List<URL> urls) ->
  3. // AbstractRegistry#notify(URL url, NotifyListener listener, List<URL> urls)
  4. protected void notify(URL url, NotifyListener listener, List<URL> urls) {
  5.     ......
  6.     Map<String, List<URL>> result = new HashMap<String, List<URL>>();
  7.     // 将urls按分类分组转成map
  8.     ......
  9.     for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
  10.         String category = entry.getKey();
  11.         List<URL> categoryList = entry.getValue();
  12.         categoryNotified.put(category, categoryList);
  13.         saveProperties(url);
  14.         listener.notify(categoryList);
  15.     }
  16. }
复制代码
此处的listener变量,就是本节的主角RegistryDirectory,下面来分析 listener.notify(categoryList)
  1. public synchronized void notify(List<URL> urls) {
  2.     // 定义三个集合,分别用于存放服务提供者 url,路由 url,配置器 url
  3.     List<URL> invokerUrls = new ArrayList<URL>();
  4.     List<URL> routerUrls = new ArrayList<URL>();
  5.     List<URL> configuratorUrls = new ArrayList<URL>();
  6.     // 根据 category 参数分别对3种url进行处理
  7.     ......
  8.     // 刷新 Invoker 列表
  9.     refreshInvoker(invokerUrls);
  10. }
复制代码
此方法分别对服务提供者 url,路由 url,配置器 url各自进行了处理,这里我省略了对路由 url 和配置器 url 的处理,感兴趣的自行去看源码。咱们聚焦在 Invoker 的处理中
  1. private void refreshInvoker(List<URL> invokerUrls) {
  2.     // invokerUrls 仅有一个元素,且 url 协议头为 empty,此时表示禁用所有服务
  3.     if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
  4.             && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
  5.         // 设置 forbidden 为 true
  6.         this.forbidden = true; // Forbid to access
  7.         this.methodInvokerMap = null; // Set the method invoker map to null
  8.         // 销毁所有 Invoker
  9.         destroyAllInvokers(); // Close all invokers
  10.     } else {
  11.         this.forbidden = false; // Allow to access
  12.         Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
  13.         if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
  14.             // 添加缓存 url 到 invokerUrls 中
  15.             invokerUrls.addAll(this.cachedInvokerUrls);
  16.         } else {
  17.             this.cachedInvokerUrls = new HashSet<URL>();
  18.             // 缓存 invokerUrls
  19.             this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
  20.         }
  21.         if (invokerUrls.isEmpty()) {
  22.             return;
  23.         }
  24.         // 将 url 转成 Invoker
  25.         Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
  26.         // 将 newUrlInvokerMap 转成方法名到 Invoker 列表的映射
  27.         Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
  28.         // state change
  29.         // If the calculation is wrong, it is not processed.
  30.         // 转换出错,直接打印异常,并返回
  31.         if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
  32.             logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
  33.             return;
  34.         }
  35.         // 合并多个组的 Invoker
  36.         this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
  37.         this.urlInvokerMap = newUrlInvokerMap;
  38.         try {
  39.             // 销毁无用 Invoker
  40.             destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
  41.         } catch (Exception e) {
  42.             logger.warn("destroyUnusedInvokers error. ", e);
  43.         }
  44.     }
  45. }
复制代码
此方法中的逻辑有点多,

  • 判断是否要销毁所有 invoker
  • 创建 invoker
  • 处理映射
  • 销毁无用 invoker
我们关注下 invoker 的创建,toInvokers(invokerUrls)
  1. private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
  2.     ......
  3.     // 获取服务消费端配置的协议
  4.     String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
  5.     for (URL providerUrl : urls) {
  6.         ......
  7.         // 将本地 Invoker 缓存赋值给 localUrlInvokerMap
  8.         Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
  9.         Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
  10.         if (invoker == null) { // Not in the cache, refer again
  11.             try {
  12.                 boolean enabled = true;
  13.                 if (url.hasParameter(Constants.DISABLED_KEY)) {
  14.                     // 获取 disable 配置,取反,然后赋值给 enable 变量
  15.                     enabled = !url.getParameter(Constants.DISABLED_KEY, false);
  16.                 } else {
  17.                     // 获取 enable 配置,并赋值给 enable 变量
  18.                     enabled = url.getParameter(Constants.ENABLED_KEY, true);
  19.                 }
  20.                 if (enabled) {
  21.                     // 调用 refer 获取 Invoker
  22.                     invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
  23.                 }
  24.             } catch (Throwable t) {
  25.                 logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
  26.             }
  27.             if (invoker != null) { // Put new invoker in cache
  28.                 // 缓存 Invoker 实例
  29.                 newUrlInvokerMap.put(key, invoker);
  30.             }
  31.             // 缓存命中
  32.         } else {
  33.             // 将 invoker 存储到 newUrlInvokerMap 中
  34.             newUrlInvokerMap.put(key, invoker);
  35.         }
  36.     }
  37.     keys.clear();
  38.     return newUrlInvokerMap;
  39. }
复制代码
这里的判断有点复杂,会对协议各种判断(是否支持、是否为empty)等,然后如果缓存未命中,则需要创建invoker,也就是protocol.refer(serviceType, url)这一段代码。
此时,我们上一篇文章留下的另一个坑也填上了,也就是DubboProtocol#refer的调用时机。
获取invoker集合
  1. public List<Invoker<T>> doList(Invocation invocation) {
  2.     ......
  3.     List<Invoker<T>> invokers = null;
  4.     // 获取 Invoker 本地缓存
  5.     Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
  6.     if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
  7.         // 获取方法名和参数列表
  8.         String methodName = RpcUtils.getMethodName(invocation);
  9.         Object[] args = RpcUtils.getArguments(invocation);
  10.         // 检测参数列表的第一个参数是否为 String 或 enum 类型
  11.         if (args != null && args.length > 0 && args[0] != null
  12.                 && (args[0] instanceof String || args[0].getClass().isEnum())) {
  13.             // 通过 方法名 + 第一个参数名称 查询 Invoker 列表,具体的使用场景暂时没想到
  14.             invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
  15.         }
  16.         if (invokers == null) {
  17.             // 通过方法名获取 Invoker 列表
  18.             invokers = localMethodInvokerMap.get(methodName);
  19.         }
  20.         if (invokers == null) {
  21.             // 通过星号 * 获取 Invoker 列表
  22.             invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
  23.         }
  24.         // 冗余逻辑,pull request #2861 移除了下面的 if 分支代码
  25.         if (invokers == null) {
  26.             Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
  27.             if (iterator.hasNext()) {
  28.                 invokers = iterator.next();
  29.             }
  30.         }
  31.     }
  32.     // 返回 Invoker 列表
  33.     return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
  34. }
复制代码
这里的逻辑也很简单,就是从类变量 methodInvokerMap 中获取invoker,所有我们需要去看看 methodInvokerMap 的赋值。
我们在上一小节的 refreshInvoker 方法中,讲了 invoker 的生成。refreshInvoker 方法中还有对methodInvokerMap 的处理。也就是 toMethodInvokers(newUrlInvokerMap) 方法
这里面会将 url-invoker 的映射转成 方法名-invoker 的映射。
总结

Dubbo的服务调用,需要通过服务目录拿到 invoker 才能发起。当注册中心发生变化时,服务目录同样需要动态调整,并刷新持有的 invoker 集合。服务目录是 Dubbo 集群容错的一部分,也是比较基础的部分。
PS:以上讲的不包含本地服务调用,别杠
参考资料
Dubbo开发指南

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

一给

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表