【手写RPC框架】如何利用netty手写一个RPC框架 结合新特性 假造线程
什么是RPC框架
RPC(Remote Procedure Call)远程过程调用,是一种通过网络从远程计算机步调上请求服务,而不必要了解底层网络技术的协议。RPC框架是一种远程调用的框架,它可以让你像调用当地方法一样调用远程方法。
避免了开发人员自己去封装网络请求、连接管理、序列化、反序列化等操作,进步了开发效率。
Netty是什么?为什么利用Netty
Netty是一个基于NIO的客户、服务器端编程框架,利用Netty可以快速开发网络应用,例如服务器和客户端。Netty是一个高性能、异步事件驱动的网络应用框架,它简化了网络编程,提供了一种新的方式来处置惩罚网络通信。
大白话粗略理解:因为Java的NIO的API利用起来比较复杂,Netty是对NIO的封装,利用起来更加简朴。
以是这也是为什么我们利用Netty来实现RPC框架的原因,netty也被很多框架证明白它的稳定性和性能。
Java假造线程
Java假造线程是一个轻量级的线程,它不必要操作系统的线程支持,可以在一个线程中运行多个假造线程。Java假造线程是一个用户态的线程,它不必要操作系统的线程支持,可以在一个线程中运行多个假造线程。
假造线程现实上是通过传统的线程来管理多个假造线程,在Java的平台上去调理这些假造线程,从而实现了轻量级的线程称为假造线程,想要了解更加细节的可以去看下我的另一篇文章:【假造线程】Java假造线程 VirtualThread 是什么黑科技
假造线程的优势:
- 轻量级:假造线程是轻量级的线程,可以在一个线程中运行多个假造线程。
- 高效:假造线程是用户态的线程,不必要操作系统的线程支持,可以在一个线程中运行多个假造线程,线程的切换不涉及内核态和用户态的切换,效率更高。
适合的场景:
- 高并发:假造线程适合高并发的场景,可以在一个线程中运行多个假造线程,减少线程的创建和销毁,进步性能。
- IO麋集型:假造线程适合IO麋集型的场景,可以在一个线程中运行多个假造线程,减少线程的创建和销毁,进步性能。
- 任务短暂:假造线程适合任务短暂的场景,可以在一个线程中运行多个假造线程,减少线程的创建和销毁,进步性能。
写一个RPC框架必要哪些步调
既然我们要写一个RPC框架,那么我们必要明白一下我们必要做哪些事变。
我们是从A服务调用B服务,那么就代表我们的服务A是客户端,服务B是服务端。但是我们的系统正常来说要调用别的服务,也会被别的服务调用,
以是我们的服务A也是服务端,服务B也是客户端。以是我们的系统要同时具备客户端和服务端的功能。
- 客户端的功能:发现服务、请求(负载均衡、发起连接、发送请求)、吸收响应、关闭连接。
- 服务端的功能:注册服务、吸收请求(吸收连接、吸收请求)、发送响应、关闭连接。
其实根据上面可以发现,服务端和客户端所做的事变是对应的,是一个镜像的关系。以是我们就是对应放在一起讲。
注意注意注意⚠️:
- 示例中的代码为了方便理解,我只摘取了主要逻辑,且做了简略,具体的实现可以看我放在最后的项目源码。
- 这里我们只是简朴的实现一个RPC框架,以是我们只是实现了最基本的功能,现实的RPC框架还有很多功能,比如:熔断、限流、监控等等,这些功能可以根据实陫的需求来实现扩展。
1 发现服务、注册服务
注册服务:服务端想告诉别人我提供了哪些服务(接口的方法),我的地址是什么。
发现服务:客户端必要知道我调用的一些服务(接口的方法)有哪些地址(ip + 端口)可以调用。
服务发现和注册的方式有很多种,比如:zookeeper、nacos、consul、etcd等等。本次我们以zookeeper为例。
注册服务代码示例:- private static CuratorFramework client;
-
- // 这里使用Curator框架来操作zookeeper
- public ZookeeperRegistryCenter() {
- final var zookeeper = PROPERTIES_THREAD_LOCAL.get().getRegistry().getZookeeper();
-
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
- final var builder = CuratorFrameworkFactory.builder()
- .connectString(zookeeper.getAddress())
- .namespace(zookeeper.getRootPath());
- client = builder.build();
- }
- // 创建一个zk客户端
- private static void create(String path, CreateMode mode) throws Exception {
- client.create()
- .creatingParentsIfNeeded()
- .withMode(mode)
- .forPath(path);
- }
复制代码 发现服务代码示例:- // 发现服务,只要监听注册中心的变化
- public void watch() {
-
- // 观察者模式,监听注册中心的变化
- registryCenter.watch((change, providerInfo) -> {
- switch (change) {
- case Change.ADD -> addServiceAddress(providerInfo);
- case Change.UPDATE -> updateServiceAddress(providerInfo);
- case Change.REMOVE -> deleteServiceAddress(providerInfo);
- }
- });
- }
- private void addOrUpdateServiceAddress(String methodStr, Pair<String, Integer> address) {
- // 这里使用SERVICE_ADDRESS_MAP(ConcurrentHashMap)本地缓存服务地址,key是接口名+方法名,value是服务地址
- SERVICE_ADDRESS_MAP.computeIfAbsent(methodStr, _ -> new CopyOnWriteArraySet<>())
- .add(address);
- }
复制代码 2 请求、吸收
请求代码示例:- // 请求
- public Object send(RpcRequestMessage msg, Method method, Set<Pair<String, Integer>> addressSet) throws LRPCTimeOutException {
- // 负载均衡选择服务地址
- final var address = clazzToAddress(method, addressSet);
- // 获取连接池
- final var channelPool = getChannelPool(address);
- // 在连接池中执行请求
- return channelManager.executeWithChannelPool(channelPool, channelExeFunction, msg);
- }
复制代码 2.1 负载均衡
负载均衡:客户端在发现了服务的地址之后,可能有多个服务的地址,这时候必要做负载均衡,选择一个服务的地址来调用。- // 选择服务地址,负载均衡
- private Pair<String, Integer> clazzToAddress(Method method, Set<Pair<String, Integer>> addressSet) {
- if (addressSet != null && !addressSet.isEmpty()) {
- // 若指定了服务地址,则在指定的服务地址中选择
- return loadBalancer.selectServiceAddress(method, addressSet);
- }
- addressSet = serviceManager.getServiceAddress(method);
- // 若未指定服务地址,则在注册中心的服务地址中选择
- return loadBalancer.selectServiceAddress(method, addressSet);
- }
复制代码 2.2 发起连接、吸收连接
因为我们的rpc的调用会比较频仍,以是我们必要保持长连接,避免频仍的创建连接和断开,这里我们利用连接池来管理连接。
发起连接:客户端在知道了服务的地址之后,必要和服务端创建连接,创建连接后,再发送请求。
吸收连接:服务端必要吸收客户端的连接,吸收到连接后,再吸收请求。- private FixedChannelPool getChannelPool(Pair<String, Integer> address) {
- final var host = address.left;
- final var port = address.right;
- return serviceManager.getChannelPool(address,
- // 创建连接池
- _ -> LrpcChannelPoolFactory.createFixedChannelPool(host, port, lrpcProperties.getClient().getAddressMaxConnection()));
- }
- public FixedChannelPool getChannelPool(Pair<String, Integer> address, Function<String, FixedChannelPool> mappingFunction) {
- final var host = address.left;
- final var port = address.right;
- return ADDRESS_POOL_MAP.computeIfAbsent(host + ":" + port, mappingFunction);
- }
复制代码 吸收连接其实就是bossGroup的处置惩罚逻辑,这里就不贴代码了,可以看最后我贴的项目源码。
2.3 发送请求、吸收请求
发送请求:客户端在创建连接后,在调用服务的方法时,必要发送报文体,发送当地必要保存请求ID和Promise(用于吸收调用效果,netty包装一层的future)的映射关系,用来吸收响应时,根据请求ID找到对应的请求。
吸收请求:服务端在吸收到客户端的连接后,必要吸收到客户端的请求,解析请求,调用对应的方法。
我们本次利用自定义协议,以是必要约定好报文体的格式- 报文体:16字节协议约定内容 + 请求体;
- 16字节协议约定内容:
- (1):4个字节的长度来表示协议的魔数:就是一个固定的值,用来标识这是我们自定义的协议,这里使用'L'、'R'、'P'、'C'。
- (2):1个字节的版本号:标识这个协议的版本号,这里因为是第一个版本,所以使用1。
- (3):1个字节的序列化算法:标识这个协议使用的序列化算法,对应了序列化算法在枚举中的数组下标,这里使用的是0,表示使用JSON序列化。
- (4):4个字节的请求ID:标识这个请求的ID,用来标识这个请求的唯一性,这里使用UUID生成,可以在客户端和服务端都保存一个Map,用来保存请求ID和请求的映射关系。
- (5):1个字节的消息类型:标识这个消息的类型,是请求还是响应,这里使用1表示请求消息,2表示响应消息。
- (6):4个字节的请求体的长度:使用Integer类型,表示请求体的长度,在接收请求时,根据这个长度来解析请求体。
- (7):1个字节的补充位;无实际意义,只是为了对齐16字节。
- 请求体:序列化后转成字节数组,内容有:接口名 + 方法名 + 返回参数类型 + 请求参数类型数组 + 请求参数值数组。
复制代码 按刚刚上面约定好的协议格式解析,然后将请求体的内容反序列化,得到消息类型,利用LengthFieldBasedFrameDecoder解码器,解决粘包和拆包问题,得到请求体的字节数组,然后反序列化,
得到消息后,获取到接口名、方法名、返回参数类型、请求参数类型数组、请求参数值数组,利用动态署理调用对应的方法,得到返回值。- public <T> T getProxy(Class<T> clazz, Set<Pair<String, Integer>> serviceAddress) {
- // 使用代理的方式,调用方法
- final var proxyInstance = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, (proxy, method, args) -> {
- RpcRequestMessage msg = buildRpcRequestMessage(clazz, method, args);
- return consumerManager.send(msg, method, serviceAddress);
- });
- return clazz.cast(proxyInstance);
- }
-
- public Object executeWithChannelPool(ChannelPool channelPool,
- BiFunction<Channel, RpcRequestMessage, Promise<Object>> function,
- RpcRequestMessage msg) throws LRPCTimeOutException {
- // 1. 从连接池中获取连接,等待超市时间,未获取连接则抛出异常
- final Future<Channel> future = channelPool.acquire();
- Channel channel = future.get();
- final var promise = function.apply(channel, msg);
- try {
- return getResult(promise, msg.getMessageId());
- } finally {
- // 这里的释放需要放在拿到结果之后,否则会导臃连接被释放
- channelPool.release(channel);
- }
- }
-
- private static BiFunction<Channel, RpcRequestMessage, Promise<Object>> channelExeFunction() {
- // 发送请求,且处理写失败
- return (channel, msg) -> {
- final var promise = new DefaultPromise<>(channel.eventLoop());
- RpcRespHandler.addPromise(msg.getMessageId(), promise);
- // 发送请求,且处理写失败
- final var channelFuture = channel.writeAndFlush(msg);
- channelFuture.addListener(processAftermath(promise, msg));
- return promise;
- };
- }
复制代码 吸收处置惩罚请求- @Override
- protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) {
- log.info("接收到消息 {}", JSON.toJSON(msg));
- final var interfaceName = msg.getInterfaceName();
- final var methodName = msg.getMethodName();
-
- // 根据接口名获取服务的本地实例
- final var service = serviceManager.getService(interfaceName);
- final var response = new RpcResponseMessage();
- response.setMessageId(msg.getMessageId());
- try {
- // 使用反射调用方法
- final Class<?> aClass = service.getClass();
- final var method = aClass.getMethod(methodName, msg.getParameterTypes());
- final var result = method.invoke(service, msg.getParameterValues());
- response.setReturnValue(result);
- } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
- log.error("e : ", e);
- response.setExceptionValue(new Error(e.getCause().getMessage()));
- }
-
- // 以下属于发送响应的逻辑
- ctx.writeAndFlush(response).addListener(future -> {
- if (future.isSuccess()) {
- log.info("消息响应成功 {}", JSON.toJSON(msg));
- return;
- }
- log.error("发送消息时有错误发生: ", future.cause());
- });
- }
复制代码 4 发送响应、吸收响应
得到第6步的返回值后,必要将返回值封装成响应报文体,发送给客户端。
这里发送响应的方式其实是和发送请求的方式是一样的,只是消息类型不一样,这里是响应消息。
客户端吸收到响应后,根据请求ID找到对应的请求,将响应的内容返回给调用方。
发送响应- // 在刚刚接收请求处理的channelRead0函数中,处理发送响应的逻辑
- ctx.writeAndFlush(response).addListener(future -> {
- if (future.isSuccess()) {
- log.info("消息响应成功 {}", JSON.toJSON(msg));
- return;
- }
- log.error("发送消息时有错误发生: ", future.cause());
- });
复制代码 吸收响应- private static Object getResult(Promise<Object> promise, Integer messageId) throws LRPCTimeOutException {
- try {
- // 超时等待
- if (promise.await(5, TimeUnit.SECONDS)) {
- if (promise.isSuccess()) {
- return promise.getNow();
- } else {
- throw new RuntimeException(promise.cause());
- }
- } else {
- throw new LRPCTimeOutException("请求超时");
- }
- } catch (InterruptedException e) {
- throw new RuntimeException("操作被中断", e);
- } finally {
- // 确保 promise 被移除
- RpcRespHandler.removePromise(messageId);
- }
- }
复制代码 5 关闭连接
关闭连接:客户端和服务端在完成请求和响应后,会把连接放回连接池,等待下一次的调用,等连接池关闭时,会关闭连接,服务端感应到连接关闭,会关闭连接。
怎么将假造线程和Netty结合起来
分析
前面我们说过,假造线程适合高并发、IO麋集型的场景,可以在一个线程中运行多个假造线程,减少线程的创建和销毁,进步性能。
看一下netty的服务端网络通信的架构简图:
在netty中,一个NioEventLoop中有一个Selector,一个Selector可以注册多个Channel,一个Channel对应一个连接,一个线程可以处置惩罚多个连接,这就是netty的高性能的原因。
在每次循环中,Selector就会阻塞监听Channel的事件,当有事件发生时,就会处置惩罚这个事件。
以是在这过程中,线程的数量,影响着Selector的数量,影响着Channel的数量,但是在传统的线程中,线程的数量是有限的,以是这就限制了Selector的数量,影响着Channel的数量,影响着性能,
以是我们可以利用假造线程来解决这个问题,假造线程可以在一个线程中运行多个假造线程,且假造线程会在其中一个假造线程阻塞时,会切换到其他假造线程,且没有系统级别的上下文切换,以是可以带来更高的性能。
以是我们这里主要是改变workerGroup的线程模型,利用假造线程来代替workerGroup里的传统的线程。
实现
根据netty的NioEventGroup的源码,线程来自三个地方:
- 构造函数的入参的线程工厂;
- 构造参数的入参的executor;
- 父类io.netty.channel.MultithreadEventLoopGroup#newDefaultThreadFactory()方法返回的线程工厂;
这里我们以重写父类的newDefaultThreadFactory()方法为例,来实现假造线程。
- private NioEventLoopGroup getWorker() {
- final var workerMax = lrpcProperties.getServer().getWorkerMax();
- // 创建workerGroup
- return new NioEventLoopGroup(workerMax) {
- // 直接在创建的时候重写newDefaultThreadFactory()方法
- @Override
- protected ThreadFactory newDefaultThreadFactory() {
- return new VirtualThreadFactory(NioEventLoopGroup.class, Thread.MAX_PRIORITY);
- }
- };
- }
- // 这里是重写的ThreadFactory
- public class VirtualThreadFactory extends DefaultThreadFactory {
- public VirtualThreadFactory(Class<?> poolType, int priority) {
- super(poolType, priority);
- }
- @Override
- protected Thread newThread(Runnable r, String name) {
- // 这里使用FastThreadLocalThread,是因为FastThreadLocalThread是netty提供的一个线程,里面的方法有些功能,所以我们这里直接继承它,然后重写start()方法
- return new FastThreadLocalThread(threadGroup, r, name){
- // 这里的Thread.ofVirtual().unstarted(this)是创建一个虚拟线程
- @Override
- public void start() {
- final var unstarted = Thread.ofVirtual().unstarted(this);
- unstarted.setName(this.getName());
- unstarted.start();
- }
- };
- }
- }
复制代码 总结
本次我们实现了一个简朴的RPC框架,利用了netty作为底层通信框架,利用了zookeeper作为服务发现和注册中央,利用了假造线程代替服务端的workerGroup的线程模型,扩展了可管控的Selector的数量,且在线程的切换上,没有系统级别的上下文切换,进步了性能。
这里只是一个简朴的实现,现实的RPC框架还有很多功能,比如:熔断、限流、监控等等,这些功能可以根据实陫的需求来实现,而且在现实的实现过程中,还会遇到很多问题,比如:序列化和反序列化扩展、线程安全问题等等,都值得我们去深入研究。
这里分享一下我的实现的代码,麻烦老哥们帮忙点个star
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |