【Java RPC】利用netty手写一个RPC框架 结合新特性 假造线程 ...

打印 上一主题 下一主题

主题 881|帖子 881|积分 2643

【手写RPC框架】如何利用netty手写一个RPC框架 结合新特性 假造线程

什么是RPC框架

RPC(Remote Procedure Call)远程过程调用,是一种通过网络从远程计算机步调上请求服务,而不必要了解底层网络技术的协议。RPC框架是一种远程调用的框架,它可以让你像调用当地方法一样调用远程方法。
避免了开发人员自己去封装网络请求、连接管理、序列化、反序列化等操作,进步了开发效率。
Netty是什么?为什么利用Netty

Netty是一个基于NIO的客户、服务器端编程框架,利用Netty可以快速开发网络应用,例如服务器和客户端。Netty是一个高性能、异步事件驱动的网络应用框架,它简化了网络编程,提供了一种新的方式来处置惩罚网络通信。
大白话粗略理解:因为Java的NIO的API利用起来比较复杂,Netty是对NIO的封装,利用起来更加简朴。
以是这也是为什么我们利用Netty来实现RPC框架的原因,netty也被很多框架证明白它的稳定性和性能。
Java假造线程

Java假造线程是一个轻量级的线程,它不必要操作系统的线程支持,可以在一个线程中运行多个假造线程。Java假造线程是一个用户态的线程,它不必要操作系统的线程支持,可以在一个线程中运行多个假造线程。
假造线程现实上是通过传统的线程来管理多个假造线程,在Java的平台上去调理这些假造线程,从而实现了轻量级的线程称为假造线程,想要了解更加细节的可以去看下我的另一篇文章:【假造线程】Java假造线程 VirtualThread 是什么黑科技
假造线程的优势:


  • 轻量级:假造线程是轻量级的线程,可以在一个线程中运行多个假造线程。
  • 高效:假造线程是用户态的线程,不必要操作系统的线程支持,可以在一个线程中运行多个假造线程,线程的切换不涉及内核态和用户态的切换,效率更高。
适合的场景:


  • 高并发:假造线程适合高并发的场景,可以在一个线程中运行多个假造线程,减少线程的创建和销毁,进步性能。
  • IO麋集型:假造线程适合IO麋集型的场景,可以在一个线程中运行多个假造线程,减少线程的创建和销毁,进步性能。
  • 任务短暂:假造线程适合任务短暂的场景,可以在一个线程中运行多个假造线程,减少线程的创建和销毁,进步性能。
写一个RPC框架必要哪些步调

既然我们要写一个RPC框架,那么我们必要明白一下我们必要做哪些事变。
我们是从A服务调用B服务,那么就代表我们的服务A是客户端,服务B是服务端。但是我们的系统正常来说要调用别的服务,也会被别的服务调用,
以是我们的服务A也是服务端,服务B也是客户端。以是我们的系统要同时具备客户端和服务端的功能。

  • 客户端的功能:发现服务、请求(负载均衡、发起连接、发送请求)、吸收响应、关闭连接。
  • 服务端的功能:注册服务、吸收请求(吸收连接、吸收请求)、发送响应、关闭连接。
其实根据上面可以发现,服务端和客户端所做的事变是对应的,是一个镜像的关系。以是我们就是对应放在一起讲。
注意注意注意⚠️:

  • 示例中的代码为了方便理解,我只摘取了主要逻辑,且做了简略,具体的实现可以看我放在最后的项目源码。
  • 这里我们只是简朴的实现一个RPC框架,以是我们只是实现了最基本的功能,现实的RPC框架还有很多功能,比如:熔断、限流、监控等等,这些功能可以根据实陫的需求来实现扩展。
1 发现服务、注册服务

注册服务:服务端想告诉别人我提供了哪些服务(接口的方法),我的地址是什么。
发现服务:客户端必要知道我调用的一些服务(接口的方法)有哪些地址(ip + 端口)可以调用。
服务发现和注册的方式有很多种,比如:zookeeper、nacos、consul、etcd等等。本次我们以zookeeper为例。
注册服务代码示例:
  1.     private static CuratorFramework client;
  2.    
  3.     // 这里使用Curator框架来操作zookeeper
  4.     public ZookeeperRegistryCenter() {
  5.        final var zookeeper = PROPERTIES_THREAD_LOCAL.get().getRegistry().getZookeeper();
  6.    
  7.        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  8.        final var builder = CuratorFrameworkFactory.builder()
  9.                .connectString(zookeeper.getAddress())
  10.                .namespace(zookeeper.getRootPath());
  11.        client = builder.build();
  12.     }
  13.     // 创建一个zk客户端
  14.     private static void create(String path, CreateMode mode) throws Exception {
  15.         client.create()
  16.                 .creatingParentsIfNeeded()
  17.                 .withMode(mode)
  18.                 .forPath(path);
  19.     }
复制代码
发现服务代码示例:
  1.     // 发现服务,只要监听注册中心的变化
  2.     public void watch() {
  3.    
  4.         // 观察者模式,监听注册中心的变化
  5.         registryCenter.watch((change, providerInfo) -> {
  6.             switch (change) {
  7.                 case Change.ADD -> addServiceAddress(providerInfo);
  8.                 case Change.UPDATE -> updateServiceAddress(providerInfo);
  9.                 case Change.REMOVE -> deleteServiceAddress(providerInfo);
  10.             }
  11.         });
  12.     }
  13.     private void addOrUpdateServiceAddress(String methodStr, Pair<String, Integer> address) {
  14.         // 这里使用SERVICE_ADDRESS_MAP(ConcurrentHashMap)本地缓存服务地址,key是接口名+方法名,value是服务地址
  15.         SERVICE_ADDRESS_MAP.computeIfAbsent(methodStr, _ -> new CopyOnWriteArraySet<>())
  16.                 .add(address);
  17.     }
复制代码
2 请求、吸收

请求代码示例:
  1.     // 请求
  2.     public Object send(RpcRequestMessage msg, Method method, Set<Pair<String, Integer>> addressSet) throws LRPCTimeOutException {
  3.         // 负载均衡选择服务地址
  4.         final var address = clazzToAddress(method, addressSet);
  5.         // 获取连接池
  6.         final var channelPool = getChannelPool(address);
  7.         // 在连接池中执行请求
  8.         return channelManager.executeWithChannelPool(channelPool, channelExeFunction, msg);
  9.     }
复制代码
2.1 负载均衡

负载均衡:客户端在发现了服务的地址之后,可能有多个服务的地址,这时候必要做负载均衡,选择一个服务的地址来调用。
  1.     // 选择服务地址,负载均衡
  2.     private Pair<String, Integer> clazzToAddress(Method method, Set<Pair<String, Integer>> addressSet) {
  3.         if (addressSet != null && !addressSet.isEmpty()) {
  4.             // 若指定了服务地址,则在指定的服务地址中选择
  5.             return loadBalancer.selectServiceAddress(method, addressSet);
  6.         }
  7.         addressSet = serviceManager.getServiceAddress(method);
  8.         // 若未指定服务地址,则在注册中心的服务地址中选择
  9.         return loadBalancer.selectServiceAddress(method, addressSet);
  10.     }
复制代码
2.2 发起连接、吸收连接

因为我们的rpc的调用会比较频仍,以是我们必要保持长连接,避免频仍的创建连接和断开,这里我们利用连接池来管理连接。
发起连接:客户端在知道了服务的地址之后,必要和服务端创建连接,创建连接后,再发送请求。
吸收连接:服务端必要吸收客户端的连接,吸收到连接后,再吸收请求。
  1.     private FixedChannelPool getChannelPool(Pair<String, Integer> address) {
  2.         final var host = address.left;
  3.         final var port = address.right;
  4.         return serviceManager.getChannelPool(address,
  5.                 // 创建连接池
  6.                 _ -> LrpcChannelPoolFactory.createFixedChannelPool(host, port, lrpcProperties.getClient().getAddressMaxConnection()));
  7.     }
  8.     public FixedChannelPool getChannelPool(Pair<String, Integer> address, Function<String, FixedChannelPool> mappingFunction) {
  9.         final var host = address.left;
  10.         final var port = address.right;
  11.         return ADDRESS_POOL_MAP.computeIfAbsent(host + ":" + port, mappingFunction);
  12.     }
复制代码
吸收连接其实就是bossGroup的处置惩罚逻辑,这里就不贴代码了,可以看最后我贴的项目源码。
2.3 发送请求、吸收请求

发送请求:客户端在创建连接后,在调用服务的方法时,必要发送报文体,发送当地必要保存请求ID和Promise(用于吸收调用效果,netty包装一层的future)的映射关系,用来吸收响应时,根据请求ID找到对应的请求。
吸收请求:服务端在吸收到客户端的连接后,必要吸收到客户端的请求,解析请求,调用对应的方法。
我们本次利用自定义协议,以是必要约定好报文体的格式
  1. 报文体:16字节协议约定内容 + 请求体;
  2. 16字节协议约定内容:
  3.   (1):4个字节的长度来表示协议的魔数:就是一个固定的值,用来标识这是我们自定义的协议,这里使用'L'、'R'、'P'、'C'。
  4.   (2):1个字节的版本号:标识这个协议的版本号,这里因为是第一个版本,所以使用1。
  5.   (3):1个字节的序列化算法:标识这个协议使用的序列化算法,对应了序列化算法在枚举中的数组下标,这里使用的是0,表示使用JSON序列化。
  6.   (4):4个字节的请求ID:标识这个请求的ID,用来标识这个请求的唯一性,这里使用UUID生成,可以在客户端和服务端都保存一个Map,用来保存请求ID和请求的映射关系。
  7.   (5):1个字节的消息类型:标识这个消息的类型,是请求还是响应,这里使用1表示请求消息,2表示响应消息。
  8.   (6):4个字节的请求体的长度:使用Integer类型,表示请求体的长度,在接收请求时,根据这个长度来解析请求体。
  9.   (7):1个字节的补充位;无实际意义,只是为了对齐16字节。
  10. 请求体:序列化后转成字节数组,内容有:接口名 + 方法名 + 返回参数类型 + 请求参数类型数组 + 请求参数值数组。
复制代码
按刚刚上面约定好的协议格式解析,然后将请求体的内容反序列化,得到消息类型,利用LengthFieldBasedFrameDecoder解码器,解决粘包和拆包问题,得到请求体的字节数组,然后反序列化,
得到消息后,获取到接口名、方法名、返回参数类型、请求参数类型数组、请求参数值数组,利用动态署理调用对应的方法,得到返回值。
  1.     public <T> T getProxy(Class<T> clazz, Set<Pair<String, Integer>> serviceAddress) {
  2.         // 使用代理的方式,调用方法
  3.         final var proxyInstance = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, (proxy, method, args) -> {
  4.             RpcRequestMessage msg = buildRpcRequestMessage(clazz, method, args);
  5.             return consumerManager.send(msg, method, serviceAddress);
  6.         });
  7.         return clazz.cast(proxyInstance);
  8.     }
  9.    
  10.     public Object executeWithChannelPool(ChannelPool channelPool,
  11.                                                   BiFunction<Channel, RpcRequestMessage, Promise<Object>> function,
  12.                                                   RpcRequestMessage msg) throws LRPCTimeOutException {
  13.         // 1. 从连接池中获取连接,等待超市时间,未获取连接则抛出异常
  14.         final Future<Channel> future = channelPool.acquire();
  15.         Channel channel = future.get();
  16.         final var promise = function.apply(channel, msg);
  17.         try {
  18.             return getResult(promise, msg.getMessageId());
  19.         } finally {
  20.             // 这里的释放需要放在拿到结果之后,否则会导臃连接被释放
  21.             channelPool.release(channel);
  22.         }
  23.     }
  24.    
  25.     private static BiFunction<Channel, RpcRequestMessage, Promise<Object>> channelExeFunction() {
  26.         // 发送请求,且处理写失败
  27.         return (channel, msg) -> {
  28.             final var promise = new DefaultPromise<>(channel.eventLoop());
  29.             RpcRespHandler.addPromise(msg.getMessageId(), promise);
  30.             // 发送请求,且处理写失败
  31.             final var channelFuture = channel.writeAndFlush(msg);
  32.             channelFuture.addListener(processAftermath(promise, msg));
  33.             return promise;
  34.         };
  35.     }
复制代码
吸收处置惩罚请求
  1.     @Override
  2.     protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) {
  3.         log.info("接收到消息 {}", JSON.toJSON(msg));
  4.         final var interfaceName = msg.getInterfaceName();
  5.         final var methodName = msg.getMethodName();
  6.         
  7.         // 根据接口名获取服务的本地实例
  8.         final var service = serviceManager.getService(interfaceName);
  9.         final var response = new RpcResponseMessage();
  10.         response.setMessageId(msg.getMessageId());
  11.         try {
  12.             // 使用反射调用方法
  13.             final Class<?> aClass = service.getClass();
  14.             final var method = aClass.getMethod(methodName, msg.getParameterTypes());
  15.             final var result = method.invoke(service, msg.getParameterValues());
  16.             response.setReturnValue(result);
  17.         } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
  18.             log.error("e : ", e);
  19.             response.setExceptionValue(new Error(e.getCause().getMessage()));
  20.         }
  21.         
  22.         // 以下属于发送响应的逻辑
  23.         ctx.writeAndFlush(response).addListener(future -> {
  24.             if (future.isSuccess()) {
  25.                 log.info("消息响应成功 {}", JSON.toJSON(msg));
  26.                 return;
  27.             }
  28.             log.error("发送消息时有错误发生: ", future.cause());
  29.         });
  30.     }
复制代码
4 发送响应、吸收响应

得到第6步的返回值后,必要将返回值封装成响应报文体,发送给客户端。
这里发送响应的方式其实是和发送请求的方式是一样的,只是消息类型不一样,这里是响应消息。
客户端吸收到响应后,根据请求ID找到对应的请求,将响应的内容返回给调用方。
发送响应
  1.         // 在刚刚接收请求处理的channelRead0函数中,处理发送响应的逻辑
  2.         ctx.writeAndFlush(response).addListener(future -> {
  3.             if (future.isSuccess()) {
  4.                 log.info("消息响应成功 {}", JSON.toJSON(msg));
  5.                 return;
  6.             }
  7.             log.error("发送消息时有错误发生: ", future.cause());
  8.         });
复制代码
吸收响应
  1.     private static Object getResult(Promise<Object> promise, Integer messageId) throws LRPCTimeOutException {
  2.         try {
  3.             // 超时等待
  4.             if (promise.await(5, TimeUnit.SECONDS)) {
  5.                 if (promise.isSuccess()) {
  6.                     return promise.getNow();
  7.                 } else {
  8.                     throw new RuntimeException(promise.cause());
  9.                 }
  10.             } else {
  11.                 throw new LRPCTimeOutException("请求超时");
  12.             }
  13.         } catch (InterruptedException e) {
  14.             throw new RuntimeException("操作被中断", e);
  15.         } finally {
  16.             // 确保 promise 被移除
  17.             RpcRespHandler.removePromise(messageId);
  18.         }
  19.     }
复制代码
5 关闭连接

关闭连接:客户端和服务端在完成请求和响应后,会把连接放回连接池,等待下一次的调用,等连接池关闭时,会关闭连接,服务端感应到连接关闭,会关闭连接。
怎么将假造线程和Netty结合起来

分析

前面我们说过,假造线程适合高并发、IO麋集型的场景,可以在一个线程中运行多个假造线程,减少线程的创建和销毁,进步性能。
看一下netty的服务端网络通信的架构简图:

在netty中,一个NioEventLoop中有一个Selector,一个Selector可以注册多个Channel,一个Channel对应一个连接,一个线程可以处置惩罚多个连接,这就是netty的高性能的原因。
在每次循环中,Selector就会阻塞监听Channel的事件,当有事件发生时,就会处置惩罚这个事件。
以是在这过程中,线程的数量,影响着Selector的数量,影响着Channel的数量,但是在传统的线程中,线程的数量是有限的,以是这就限制了Selector的数量,影响着Channel的数量,影响着性能,
以是我们可以利用假造线程来解决这个问题,假造线程可以在一个线程中运行多个假造线程,且假造线程会在其中一个假造线程阻塞时,会切换到其他假造线程,且没有系统级别的上下文切换,以是可以带来更高的性能。
以是我们这里主要是改变workerGroup的线程模型,利用假造线程来代替workerGroup里的传统的线程。
实现

根据netty的NioEventGroup的源码,线程来自三个地方:

  • 构造函数的入参的线程工厂;
  • 构造参数的入参的executor;
  • 父类io.netty.channel.MultithreadEventLoopGroup#newDefaultThreadFactory()方法返回的线程工厂;
    这里我们以重写父类的newDefaultThreadFactory()方法为例,来实现假造线程。
  1.     private NioEventLoopGroup getWorker() {
  2.         final var workerMax = lrpcProperties.getServer().getWorkerMax();
  3.         // 创建workerGroup
  4.         return new NioEventLoopGroup(workerMax) {
  5.             // 直接在创建的时候重写newDefaultThreadFactory()方法
  6.             @Override
  7.             protected ThreadFactory newDefaultThreadFactory() {
  8.                 return new VirtualThreadFactory(NioEventLoopGroup.class, Thread.MAX_PRIORITY);
  9.             }
  10.         };
  11.     }
  12. // 这里是重写的ThreadFactory
  13. public class VirtualThreadFactory extends DefaultThreadFactory {
  14.    public VirtualThreadFactory(Class<?> poolType, int priority) {
  15.       super(poolType, priority);
  16.    }
  17.    @Override
  18.    protected Thread newThread(Runnable r, String name) {
  19.       // 这里使用FastThreadLocalThread,是因为FastThreadLocalThread是netty提供的一个线程,里面的方法有些功能,所以我们这里直接继承它,然后重写start()方法
  20.       return new FastThreadLocalThread(threadGroup, r, name){
  21.          // 这里的Thread.ofVirtual().unstarted(this)是创建一个虚拟线程
  22.          @Override
  23.          public void start() {
  24.             final var unstarted = Thread.ofVirtual().unstarted(this);
  25.             unstarted.setName(this.getName());
  26.             unstarted.start();
  27.          }
  28.       };
  29.    }
  30. }
复制代码
总结

本次我们实现了一个简朴的RPC框架,利用了netty作为底层通信框架,利用了zookeeper作为服务发现和注册中央,利用了假造线程代替服务端的workerGroup的线程模型,扩展了可管控的Selector的数量,且在线程的切换上,没有系统级别的上下文切换,进步了性能。

这里只是一个简朴的实现,现实的RPC框架还有很多功能,比如:熔断、限流、监控等等,这些功能可以根据实陫的需求来实现,而且在现实的实现过程中,还会遇到很多问题,比如:序列化和反序列化扩展、线程安全问题等等,都值得我们去深入研究。
这里分享一下我的实现的代码,麻烦老哥们帮忙点个star
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

盛世宏图

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表