Nacos源码 (5) Grpc服务端和客户端

打印 上一主题 下一主题

主题 566|帖子 566|积分 1698

Nacos 2.x在服务端与客户端直接增加了GRPC通信方式,本文通过2.0.2版本源码,简单分析GRPC通信方式:

  • 服务器启动
  • 客户端连接
  • 客户端心跳
  • 服务器监控检查
服务器

proto文件

api/src/main/proto/nacos_grpc_service.proto文件:
  1. syntax = "proto3";
  2. import "google/protobuf/any.proto";
  3. import "google/protobuf/timestamp.proto";
  4. option java_multiple_files = true;
  5. option java_package = "com.alibaba.nacos.api.grpc.auto";
  6. message Metadata {
  7.   string type = 3; // 请求/响应的真实类型
  8.   string clientIp = 8;
  9.   map<string, string> headers = 7;
  10. }
  11. // GRPC通信层请求/响应体
  12. message Payload {
  13.   Metadata metadata = 2;
  14.   // 业务层的请求/响应体,需要使用type做反序列化
  15.   google.protobuf.Any body = 3;
  16. }
  17. service RequestStream {
  18.   // build a streamRequest
  19.   rpc requestStream (Payload) returns (stream Payload) {
  20.   }
  21. }
  22. service Request {
  23.   // Sends a commonRequest
  24.   rpc request (Payload) returns (Payload) {
  25.   }
  26. }
  27. service BiRequestStream {
  28.   // Sends a commonRequest
  29.   rpc requestBiStream (stream Payload) returns (stream Payload) {
  30.   }
  31. }
复制代码
文件定义了通信层的service和message结构,业务层请求响应的序列化和反序列化是Nacos在RequestAcceptor/Connection中使用工具类实现的,业务层请求处理是在RequestAcceptor中进行的转发。
服务器启动

Server类继承关系
  1. BaseRpcServer
  2.   |-- BaseGrpcServer
  3.      |-- GrpcSdkServer
  4.      |-- GrpcClusterServer
复制代码
此处介绍一下GrpcSdkServer实现。
GrpcSdkServer类
  1. @Service
  2. public class GrpcSdkServer extends BaseGrpcServer {
  3.     // 所以SDK服务器的监听端口是9848
  4.     private static final int PORT_OFFSET = 1000;
  5.     @Override
  6.     public int rpcPortOffset() {
  7.         return PORT_OFFSET;
  8.     }
  9.     @Override
  10.     public ThreadPoolExecutor getRpcExecutor() {
  11.         return GlobalExecutor.sdkRpcExecutor;
  12.     }
  13. }
复制代码
大部分的启动逻辑在BaseGrpcServer中。
BaseGrpcServer类

GRPC服务器的启动逻辑大部分都在这个类的startServer方法。

  • 将处理请求的RequestAcceptor注册到HandlerRegistry

    • GrpcRequestAcceptor用于处理普通业务请求
    • GrpcBiStreamRequestAcceptor用于处理连接建立请求,获取Channel创建GrpcConnection并注册到ConnectionManager中,后续向客户端发送消息都是使用GrpcConnection做的

  • 创建GRPC的Server对象

    • 设置port和executor
    • 设置HandlerRegistry
    • 添加ServerTransportFilter在连接建立和断开时做一些业务操作

  • 启动Server
GrpcRequestAcceptor类

这个类对GRPC做了扩展,重写了request方法:

  • 解析Payload获取请求体的数据类型
  • 从RequestHandlerRegistry获取适配的RequestHandler处理器
  • 将请求体反序列化成请求体类型对象
  • 调用handleRequest方法处理请求返回响应
处理请求代码:
  1. Request request = (Request) parseObj;
  2. try {
  3.     // 获取Connection
  4.     Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
  5.     RequestMeta requestMeta = new RequestMeta();
  6.     requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
  7.     requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
  8.     requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
  9.     requestMeta.setLabels(connection.getMetaInfo().getLabels());
  10.     // 刷新活跃时间,后续的健康检查会使用到这个时间戳
  11.     connectionManager.refreshActiveTime(requestMeta.getConnectionId());
  12.     // 使用RequestHandler处理请求
  13.     Response response = requestHandler.handleRequest(request, requestMeta);
  14.     Payload payloadResponse = GrpcUtils.convert(response);
  15.     traceIfNecessary(payloadResponse, false);
  16.     responseObserver.onNext(payloadResponse);
  17.     responseObserver.onCompleted();
  18. } catch (Throwable e) {
  19.     Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(
  20.             (e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
  21.             e.getMessage()));
  22.     traceIfNecessary(payloadResponse, false);
  23.     responseObserver.onNext(payloadResponse);
  24.     responseObserver.onCompleted();
  25. }
复制代码
RequestHandler处理器

RequestHandler抽象类是Nacos在业务层处理GRPC请求的抽象类:
  1. public abstract class RequestHandler<T extends Request, S extends Response> {
  2.     @Autowired
  3.     private RequestFilters requestFilters;
  4.     /**
  5.      * Handler request.
  6.      */
  7.     public Response handleRequest(T request, RequestMeta meta) throws NacosException {
  8.         for (AbstractRequestFilter filter : requestFilters.filters) {
  9.             try {
  10.                 Response filterResult = filter.filter(request, meta, this.getClass());
  11.                 if (filterResult != null && !filterResult.isSuccess()) {
  12.                     return filterResult;
  13.                 }
  14.             } catch (Throwable throwable) {
  15.                 Loggers.REMOTE.error("filter error", throwable);
  16.             }
  17.         }
  18.         return handle(request, meta);
  19.     }
  20.     /**
  21.      * Handler request.
  22.      */
  23.     public abstract S handle(T request, RequestMeta meta) throws NacosException;
  24. }
复制代码
实现类:
Nacos使用RequestHandlerRegistry管理所有的RequestHandler,是一个Map结构:
  1. // key是Request类型的简单名
  2. // value是RequestHandler实现类对象
  3. Map<String, RequestHandler> registryHandlers = new HashMap<String, RequestHandler>();
复制代码
RequestHandlerRegistry会扫描Spring容器里面所有的RequestHandler对象,解析RequestHandler实现类处理的Request类型的简单名,将其注册到registryHandlers中。
GrpcRequestAcceptor类获取适配的RequestHandler处理器使用的就是RequestHandlerRegistry类的getByRequestType方法:
  1. public RequestHandler getByRequestType(String requestType) {
  2.     return registryHandlers.get(requestType);
  3. }
复制代码
建立连接

在Server初始化的时候,Nacos注册了ServerInterceptor和ServerTransportFilter组件,这些组件会在连接建立时将conn_id、remote_ip、remote_port、local_port、ctx_channel等绑定到Context上。
创建GrpcConnection

客户端在连接建立之后会发送一个ConnectionSetupRequest请求,服务器使用GrpcBiStreamRequestAcceptor处理该请求:

  • 获取到conn_id、remote_ip、remote_port、local_port等
  • 解析请求获取clienIp
  • 封装GrpcConnection对象,包括:conn_id、remote_ip、remote_port、local_port、clientIp、客户端版本等基础信息,以及StreamObserver和Channel
  • 将GrpcConnection注册到ConnectionManager上
创建Client

ConnectionManager的注册操作会触发ConnectionBasedClientManager的clientConnected方法来创建Client对象:
  1. public void clientConnected(Connection connect) {
  2.     // grpc类型
  3.     String type = connect.getMetaInfo().getConnectType();
  4.     // 此处获取到的是ConnectionBasedClientFactory对象
  5.     ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);
  6.     // 此处创建的是ConnectionBasedClient对象
  7.     clientConnected(clientFactory.newClient(connect.getMetaInfo().getConnectionId()));
  8. }
  9. public boolean clientConnected(Client client) {
  10.     if (!clients.containsKey(client.getClientId())) {
  11.         // 注册到client集
  12.         // 使用Map维护clientId->client对象关系
  13.         clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client);
  14.     }
  15.     return true;
  16. }
复制代码
健康检查

ConnectionManager连接管理器

这个类管理客户端连接,提供注册连接、移除连接等功能:
  1. // 管理IP -> 连接数,用于实现ConnectionLimitRule
  2. private Map<String, AtomicInteger> connectionForClientIp = new ConcurrentHashMap<String, AtomicInteger>(16);
  3. // 管理connectionId -> Connection
  4. Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
复制代码
Connection抽象类实现了Requester接口,能够向客户端发送请求、管理连接状态。
GrpcConnection实现了Connection抽象类。
在连接建立后,客户端会发送一个ConnectionSetupRequest请求,服务端收到该请求后,会解析出connectionId、客户端IP、客户端端口、客户端版本、Channel等封装成GrpcConnection对象,然后注册到ConnectionManager中。
健康检查周期任务

ConnectionManager在启动阶段会启动一个周期任务来检查IP连接数和连接的活跃状态,每3秒执行一次:

  • 遍历连接集,使用connectionLimitRule查找需要重置的连接,向这些客户端发reset请求重置连接
  • 获取连接的最后活跃时间(客户端每次请求都会更新这个时间),如果超过20秒不活跃,则向客户端发送一个探测请求,如果请求失败则断开连接
断开连接

业务处理流程

GRPC连接层检测到连接断开之后,会触发GrpcServer的transportTerminated事件:
  1. public void transportTerminated(Attributes transportAttrs) {
  2.     String connectionId = null;
  3.     try {
  4.         connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
  5.     } catch (Exception e) {
  6.         // Ignore
  7.     }
  8.     if (StringUtils.isNotBlank(connectionId)) {
  9.         // 使用ConnectionManager移除连接
  10.         connectionManager.unregister(connectionId);
  11.     }
  12. }
复制代码
ConnectionManager移除连接:
  1. public synchronized void unregister(String connectionId) {
  2.     // 从Connection集移除连接
  3.     Connection remove = this.connections.remove(connectionId);
  4.     if (remove != null) {
  5.         String clientIp = remove.getMetaInfo().clientIp;
  6.         AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
  7.         // IP连接数--
  8.         if (atomicInteger != null) {
  9.             int count = atomicInteger.decrementAndGet();
  10.             if (count <= 0) {
  11.                 connectionForClientIp.remove(clientIp);
  12.             }
  13.         }
  14.         remove.close();
  15.         // 通知ClientManager层移除client对象
  16.         clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
  17.     }
  18. }
复制代码
之后需要为Client进行初始化:

  • 设置ServerListFactory,用于选择服务器地址
  • 注册ServerRequestHandler处理器,用于处理服务端发送的请求,比如服务订阅的回调、配置文件变化通知
  • 注册ConnectionEventListener监听器
    1. public boolean clientDisconnected(String clientId) {
    2.     ConnectionBasedClient client = clients.remove(clientId);
    3.     if (null == client) {
    4.         return true;
    5.     }
    6.     client.release();
    7.     // 推送一个ClientDisconnectEvent事件
    8.     NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
    9.     return true;
    10. }
    复制代码
  • 启动Client

    • 启动ConnectionEvent处理线程
    • 启动健康检查(心跳)线程
    • 创建GrpcConnection

创建GrpcConnection


  • 创建GRPC的RequestFutureStub和BiRequestStreamStub
  • 发一个ServerCheckRequest请求验证服务端的可用性
  • 创建GrpcConnection对象,封装serverInfo和executor、connectionId、channel等
  • 为BiRequestStreamStub绑定请求处理逻辑:使用ServerRequestHandler处理器处理服务端发送过来的请求
  • 发送ConnectionSetupRequest请求,让服务端创建并注册GrpcConnection
  1. public interface ServerListFactory {
  2.     // 选择一个可用的服务器地址 ip:port格式
  3.     String genNextServer();
  4.     // 返回当前使用的服务器地址 ip:port格式
  5.     String getCurrentServer();
  6.     // 返回服务器集合
  7.     List<String> getServerList();
  8. }
复制代码
发送请求

Requester接口

这个接口定义了发送请求的方法:
  1. RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
复制代码
GrpcConnection实现

GrpcConnection类实现了Requester接口的三个request方法,使用的是GRPC的Stub发送请求,以request方法为例:
  1. public static RpcClient createClient(String clientName,
  2.                                      ConnectionType connectionType,
  3.                                      Map<String, String> labels) {
  4.     return CLIENT_MAP.compute(clientName, (clientNameInner, client) -> {
  5.         if (client == null) {
  6.             if (ConnectionType.GRPC.equals(connectionType)) {
  7.                 // 创建的是GrpcSdkClient对象
  8.                 client = new GrpcSdkClient(clientNameInner);
  9.             }
  10.             if (client == null) {
  11.                 throw new UnsupportedOperationException(
  12.                     "unsupported connection type :" + connectionType.getType());
  13.             }
  14.             client.labels(labels);
  15.         }
  16.         return client;
  17.     });
  18. }
复制代码
对于另外两个方法:

  • requestFuture方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,创建一个RequestFuture返回
  • asyncRequest方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,为requestFuture添加监听回调
心跳healthCheck

前文介绍过,在启动RpcClient阶段,会启动健康检查任务,该任务每5秒执行一次,对当前客户端封装的connection做健康检查:
  1. rpcClient.serverListFactory(serverListFactory);
  2. rpcClient.start();
  3. rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
  4. rpcClient.registerConnectionListener(namingGrpcConnectionEventListener);
复制代码
healthCheck方法:
  1. if (grpcExecutor == null) {
  2.     int threadNumber = ThreadUtils.getSuitableThreadCount(8);
  3.     grpcExecutor = new ThreadPoolExecutor(threadNumber, threadNumber, 10L, TimeUnit.SECONDS,
  4.             new LinkedBlockingQueue<>(10000),
  5.             new ThreadFactoryBuilder().setDaemon(true).setNameFormat("nacos-grpc-client-executor-%d")
  6.                     .build());
  7.     grpcExecutor.allowCoreThreadTimeOut(true);
  8. }
  9. // 8848+1000
  10. int port = serverInfo.getServerPort() + rpcPortOffset();
  11. RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);
  12. // 发一个ServerCheckRequest请求验证服务端的可用性
  13. Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
  14. if (response == null || !(response instanceof ServerCheckResponse)) {
  15.     shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
  16.     return null;
  17. }
  18. BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
  19.         .newStub(newChannelStubTemp.getChannel());
  20. // 创建GrpcConnection对象,封装serverInfo和executor、connectionId、channel等
  21. GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
  22. grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
  23. // create stream request and bind connection event to this connection
  24. StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
  25. // stream observer to send response to server
  26. grpcConn.setPayloadStreamObserver(payloadStreamObserver);
  27. grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
  28. grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
  29. // send a setup request
  30. ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
  31. conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
  32. conSetupRequest.setLabels(super.getLabels());
  33. conSetupRequest.setAbilities(super.clientAbilities);
  34. conSetupRequest.setTenant(super.getTenant());
  35. grpcConn.sendRequest(conSetupRequest);
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

乌市泽哥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表