Nacos 2.x在服务端与客户端直接增加了GRPC通信方式,本文通过2.0.2版本源码,简单分析GRPC通信方式:
- 服务器启动
- 客户端连接
- 客户端心跳
- 服务器监控检查
服务器
proto文件
api/src/main/proto/nacos_grpc_service.proto文件:- syntax = "proto3";
- import "google/protobuf/any.proto";
- import "google/protobuf/timestamp.proto";
- option java_multiple_files = true;
- option java_package = "com.alibaba.nacos.api.grpc.auto";
- message Metadata {
- string type = 3; // 请求/响应的真实类型
- string clientIp = 8;
- map<string, string> headers = 7;
- }
- // GRPC通信层请求/响应体
- message Payload {
- Metadata metadata = 2;
- // 业务层的请求/响应体,需要使用type做反序列化
- google.protobuf.Any body = 3;
- }
- service RequestStream {
- // build a streamRequest
- rpc requestStream (Payload) returns (stream Payload) {
- }
- }
- service Request {
- // Sends a commonRequest
- rpc request (Payload) returns (Payload) {
- }
- }
- service BiRequestStream {
- // Sends a commonRequest
- rpc requestBiStream (stream Payload) returns (stream Payload) {
- }
- }
复制代码 文件定义了通信层的service和message结构,业务层请求响应的序列化和反序列化是Nacos在RequestAcceptor/Connection中使用工具类实现的,业务层请求处理是在RequestAcceptor中进行的转发。
服务器启动
Server类继承关系
- BaseRpcServer
- |-- BaseGrpcServer
- |-- GrpcSdkServer
- |-- GrpcClusterServer
复制代码 此处介绍一下GrpcSdkServer实现。
GrpcSdkServer类
- @Service
- public class GrpcSdkServer extends BaseGrpcServer {
- // 所以SDK服务器的监听端口是9848
- private static final int PORT_OFFSET = 1000;
- @Override
- public int rpcPortOffset() {
- return PORT_OFFSET;
- }
- @Override
- public ThreadPoolExecutor getRpcExecutor() {
- return GlobalExecutor.sdkRpcExecutor;
- }
- }
复制代码 大部分的启动逻辑在BaseGrpcServer中。
BaseGrpcServer类
GRPC服务器的启动逻辑大部分都在这个类的startServer方法。
- 将处理请求的RequestAcceptor注册到HandlerRegistry
- GrpcRequestAcceptor用于处理普通业务请求
- GrpcBiStreamRequestAcceptor用于处理连接建立请求,获取Channel创建GrpcConnection并注册到ConnectionManager中,后续向客户端发送消息都是使用GrpcConnection做的
- 创建GRPC的Server对象
- 设置port和executor
- 设置HandlerRegistry
- 添加ServerTransportFilter在连接建立和断开时做一些业务操作
- 启动Server
GrpcRequestAcceptor类
这个类对GRPC做了扩展,重写了request方法:
- 解析Payload获取请求体的数据类型
- 从RequestHandlerRegistry获取适配的RequestHandler处理器
- 将请求体反序列化成请求体类型对象
- 调用handleRequest方法处理请求返回响应
处理请求代码:- Request request = (Request) parseObj;
- try {
- // 获取Connection
- Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
- RequestMeta requestMeta = new RequestMeta();
- requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
- requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
- requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
- requestMeta.setLabels(connection.getMetaInfo().getLabels());
- // 刷新活跃时间,后续的健康检查会使用到这个时间戳
- connectionManager.refreshActiveTime(requestMeta.getConnectionId());
- // 使用RequestHandler处理请求
- Response response = requestHandler.handleRequest(request, requestMeta);
- Payload payloadResponse = GrpcUtils.convert(response);
- traceIfNecessary(payloadResponse, false);
- responseObserver.onNext(payloadResponse);
- responseObserver.onCompleted();
- } catch (Throwable e) {
- Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(
- (e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
- e.getMessage()));
- traceIfNecessary(payloadResponse, false);
- responseObserver.onNext(payloadResponse);
- responseObserver.onCompleted();
- }
复制代码 RequestHandler处理器
RequestHandler抽象类是Nacos在业务层处理GRPC请求的抽象类:- public abstract class RequestHandler<T extends Request, S extends Response> {
- @Autowired
- private RequestFilters requestFilters;
- /**
- * Handler request.
- */
- public Response handleRequest(T request, RequestMeta meta) throws NacosException {
- for (AbstractRequestFilter filter : requestFilters.filters) {
- try {
- Response filterResult = filter.filter(request, meta, this.getClass());
- if (filterResult != null && !filterResult.isSuccess()) {
- return filterResult;
- }
- } catch (Throwable throwable) {
- Loggers.REMOTE.error("filter error", throwable);
- }
- }
- return handle(request, meta);
- }
- /**
- * Handler request.
- */
- public abstract S handle(T request, RequestMeta meta) throws NacosException;
- }
复制代码 实现类:
Nacos使用RequestHandlerRegistry管理所有的RequestHandler,是一个Map结构:- // key是Request类型的简单名
- // value是RequestHandler实现类对象
- Map<String, RequestHandler> registryHandlers = new HashMap<String, RequestHandler>();
复制代码 RequestHandlerRegistry会扫描Spring容器里面所有的RequestHandler对象,解析RequestHandler实现类处理的Request类型的简单名,将其注册到registryHandlers中。
GrpcRequestAcceptor类获取适配的RequestHandler处理器使用的就是RequestHandlerRegistry类的getByRequestType方法:- public RequestHandler getByRequestType(String requestType) {
- return registryHandlers.get(requestType);
- }
复制代码 建立连接
在Server初始化的时候,Nacos注册了ServerInterceptor和ServerTransportFilter组件,这些组件会在连接建立时将conn_id、remote_ip、remote_port、local_port、ctx_channel等绑定到Context上。
创建GrpcConnection
客户端在连接建立之后会发送一个ConnectionSetupRequest请求,服务器使用GrpcBiStreamRequestAcceptor处理该请求:
- 获取到conn_id、remote_ip、remote_port、local_port等
- 解析请求获取clienIp
- 封装GrpcConnection对象,包括:conn_id、remote_ip、remote_port、local_port、clientIp、客户端版本等基础信息,以及StreamObserver和Channel
- 将GrpcConnection注册到ConnectionManager上
创建Client
ConnectionManager的注册操作会触发ConnectionBasedClientManager的clientConnected方法来创建Client对象:- public void clientConnected(Connection connect) {
- // grpc类型
- String type = connect.getMetaInfo().getConnectType();
- // 此处获取到的是ConnectionBasedClientFactory对象
- ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);
- // 此处创建的是ConnectionBasedClient对象
- clientConnected(clientFactory.newClient(connect.getMetaInfo().getConnectionId()));
- }
- public boolean clientConnected(Client client) {
- if (!clients.containsKey(client.getClientId())) {
- // 注册到client集
- // 使用Map维护clientId->client对象关系
- clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client);
- }
- return true;
- }
复制代码 健康检查
ConnectionManager连接管理器
这个类管理客户端连接,提供注册连接、移除连接等功能:- // 管理IP -> 连接数,用于实现ConnectionLimitRule
- private Map<String, AtomicInteger> connectionForClientIp = new ConcurrentHashMap<String, AtomicInteger>(16);
- // 管理connectionId -> Connection
- Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
复制代码 Connection抽象类实现了Requester接口,能够向客户端发送请求、管理连接状态。
GrpcConnection实现了Connection抽象类。
在连接建立后,客户端会发送一个ConnectionSetupRequest请求,服务端收到该请求后,会解析出connectionId、客户端IP、客户端端口、客户端版本、Channel等封装成GrpcConnection对象,然后注册到ConnectionManager中。
健康检查周期任务
ConnectionManager在启动阶段会启动一个周期任务来检查IP连接数和连接的活跃状态,每3秒执行一次:
- 遍历连接集,使用connectionLimitRule查找需要重置的连接,向这些客户端发reset请求重置连接
- 获取连接的最后活跃时间(客户端每次请求都会更新这个时间),如果超过20秒不活跃,则向客户端发送一个探测请求,如果请求失败则断开连接
断开连接
业务处理流程
GRPC连接层检测到连接断开之后,会触发GrpcServer的transportTerminated事件:- public void transportTerminated(Attributes transportAttrs) {
- String connectionId = null;
- try {
- connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
- } catch (Exception e) {
- // Ignore
- }
- if (StringUtils.isNotBlank(connectionId)) {
- // 使用ConnectionManager移除连接
- connectionManager.unregister(connectionId);
- }
- }
复制代码 ConnectionManager移除连接:- public synchronized void unregister(String connectionId) {
- // 从Connection集移除连接
- Connection remove = this.connections.remove(connectionId);
- if (remove != null) {
- String clientIp = remove.getMetaInfo().clientIp;
- AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
- // IP连接数--
- if (atomicInteger != null) {
- int count = atomicInteger.decrementAndGet();
- if (count <= 0) {
- connectionForClientIp.remove(clientIp);
- }
- }
- remove.close();
- // 通知ClientManager层移除client对象
- clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
- }
- }
复制代码 之后需要为Client进行初始化:
- 设置ServerListFactory,用于选择服务器地址
- 注册ServerRequestHandler处理器,用于处理服务端发送的请求,比如服务订阅的回调、配置文件变化通知
- 注册ConnectionEventListener监听器
- public boolean clientDisconnected(String clientId) {
- ConnectionBasedClient client = clients.remove(clientId);
- if (null == client) {
- return true;
- }
- client.release();
- // 推送一个ClientDisconnectEvent事件
- NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
- return true;
- }
复制代码 - 启动Client
- 启动ConnectionEvent处理线程
- 启动健康检查(心跳)线程
- 创建GrpcConnection
创建GrpcConnection
- 创建GRPC的RequestFutureStub和BiRequestStreamStub
- 发一个ServerCheckRequest请求验证服务端的可用性
- 创建GrpcConnection对象,封装serverInfo和executor、connectionId、channel等
- 为BiRequestStreamStub绑定请求处理逻辑:使用ServerRequestHandler处理器处理服务端发送过来的请求
- 发送ConnectionSetupRequest请求,让服务端创建并注册GrpcConnection
- public interface ServerListFactory {
- // 选择一个可用的服务器地址 ip:port格式
- String genNextServer();
- // 返回当前使用的服务器地址 ip:port格式
- String getCurrentServer();
- // 返回服务器集合
- List<String> getServerList();
- }
复制代码 发送请求
Requester接口
这个接口定义了发送请求的方法:- RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
复制代码 GrpcConnection实现
GrpcConnection类实现了Requester接口的三个request方法,使用的是GRPC的Stub发送请求,以request方法为例:- public static RpcClient createClient(String clientName,
- ConnectionType connectionType,
- Map<String, String> labels) {
- return CLIENT_MAP.compute(clientName, (clientNameInner, client) -> {
- if (client == null) {
- if (ConnectionType.GRPC.equals(connectionType)) {
- // 创建的是GrpcSdkClient对象
- client = new GrpcSdkClient(clientNameInner);
- }
- if (client == null) {
- throw new UnsupportedOperationException(
- "unsupported connection type :" + connectionType.getType());
- }
- client.labels(labels);
- }
- return client;
- });
- }
复制代码 对于另外两个方法:
- requestFuture方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,创建一个RequestFuture返回
- asyncRequest方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,为requestFuture添加监听回调
心跳healthCheck
前文介绍过,在启动RpcClient阶段,会启动健康检查任务,该任务每5秒执行一次,对当前客户端封装的connection做健康检查:- rpcClient.serverListFactory(serverListFactory);
- rpcClient.start();
- rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
- rpcClient.registerConnectionListener(namingGrpcConnectionEventListener);
复制代码 healthCheck方法:- if (grpcExecutor == null) {
- int threadNumber = ThreadUtils.getSuitableThreadCount(8);
- grpcExecutor = new ThreadPoolExecutor(threadNumber, threadNumber, 10L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(10000),
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("nacos-grpc-client-executor-%d")
- .build());
- grpcExecutor.allowCoreThreadTimeOut(true);
- }
- // 8848+1000
- int port = serverInfo.getServerPort() + rpcPortOffset();
- RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);
- // 发一个ServerCheckRequest请求验证服务端的可用性
- Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
- if (response == null || !(response instanceof ServerCheckResponse)) {
- shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
- return null;
- }
- BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
- .newStub(newChannelStubTemp.getChannel());
- // 创建GrpcConnection对象,封装serverInfo和executor、connectionId、channel等
- GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
- grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
- // create stream request and bind connection event to this connection
- StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
- // stream observer to send response to server
- grpcConn.setPayloadStreamObserver(payloadStreamObserver);
- grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
- grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
- // send a setup request
- ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
- conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
- conSetupRequest.setLabels(super.getLabels());
- conSetupRequest.setAbilities(super.clientAbilities);
- conSetupRequest.setTenant(super.getTenant());
- grpcConn.sendRequest(conSetupRequest);
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |