flink源码系列:RPC通信

瑞星  金牌会员 | 2024-7-25 00:38:57 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 637|帖子 637|积分 1911

https://qwr78tzaus4.feishu.cn/drive/folder/Geumf5oe7lKrnTdAZLzcQn9Bnqc
https://qwr78tzaus4.feishu.cn/docx/K1fsdEbrgo6C0lxYktkc7FOEnQb
https://www.bilibili.com/video/BV16z421m741/?p=2&vd_source=4fd37f941817afccac1a77e31fec6be7
1. 本节课目的

   核心点
1.夺目Flink RPC框架整体设计
2.彻底理解Flink RPC底层是如何通信的
用到的知识点
1.ResourceManager:重要负责Flink集群中的计算资源,其中计算资源重要来自TaskManager注册。
2.TaskManager(TaskExecutor):TaskManager负责向整个集群提供Slot计算资源。TaskManager会调用registerTaskExecutor()方法向ResourceManager注册
  2.开始本节内容

2.1.RPC概念

RPC,即远程过程调用(Remote Procedure Call),是一种通过网络从远程计算机程序上请求服务的技术,而无需了解底层网络技术的协议。在RPC中,客户机和服务器位于不同的呆板上,客户端通过网络调用在服务器端运行的过程,并将结果发送回客户机。这种技术允许程序像调用本地过程一样调用远程过程,使得跨平台、跨呆板的服务调用成为可能。
1.两个进程间的相互调用
2.集群中不同节点服务的通信
3.2.大数据组件常见的RPC实现技术

序号生态圈技术RPC实现1HadoopNIO2SparkSpark1(Akka),Spark2(Netty)3FlinkAkka+Netty(Pekko+Netty) 3.3.Pekko(Akka)

3.3.1. Akka、Pekko基本概念

Flink1.18版本内部RPC通信封装用的是Apache Pekko。Apache Pekko是Akka 2.6.x的一个分支。为什么会改由于Akka将来Apache允许证更改为Business Source License (BSL) v1.1,该协议不是开源的。
Akka、Pekko 用于构建高并发、分布式、可容错、变乱驱动的开发库。
1、提供基于异步非壅闭、高性能的变乱驱动编程模型
2、轻量级的变乱处理(每GB堆内存几百万Actor)
3、使用Akka可以在单机上构建高并发程序,也可以在网络中构建分布式程序。
留意:Akka是基于Actor模型的并发框架,每个Actor的实例在运行时只占用非常少的资源,大约只有300字节。这意味着在1G的内存中可以容纳靠近300万个Actor,这使得Akka在处理大量并发请求时可以大概保持高效的内存使用。
1、ActorSystem 是管理 Actor 生命周期的组件,Actor 是负责举行通信的组件
2、每个 Actor 都有一个 MailBox,别的 Actor 发送给它的消息都起首储存在 MailBox 中,通过这种方式可以实现异步通信。
3、每个 Actor 是单线程的处理方式,不断的从 MailBox 拉取消息执行处理,以是对于 Actor 的消息处理,不得当调用会壅闭的处理方法。
4、Actor 可以改变他自身的状态,可以吸收消息,也可以发送消息,还可以生成新的 Actor
5、每一个 ActorSystem 和 Actor都在启动的时候会给定一个 name,如果要从 ActorSystem 中,获取一个 Actor,则通过以下的方式来举行 Actor 的
获取:pekko.tcp://flink@localhost:6123/user/rpc/resourcemanager_* 来举行定位
6、如果一个 Actor 要和别的一个 Actor 举行通信,则必须先获取对方 Actor 的 ActorRef 对象,然后通过该对象发送消息即可。
7、通过 tell 发送异步消息,不吸收相应,通过 ask 发送异步消息,得到 Future 返回,通过异步回到返回处理结果。
8.如果构建actor举行通信,Pekko版本中必须继承AbstractActor 实现createReceive()方法
3.3.2.Pekko Demo事例

3.3.2.1.PekkoData 类

1.定义了通信的范例信息也就是PekkoData
2.内部声明一个 字符串范例的info
  1. package com.source.pekko;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. @Data
  6. @AllArgsConstructor
  7. @NoArgsConstructor
  8. public class PekkoData {
  9.     private String info;
  10. }
复制代码
3.3.2.2.PekkoRpcReceiverActor类

1.PekkoRpcReceiverActor吸收Actor类继承了AbstractActor
2.也就是说该类可以举行吸收发送消息
3.吸收消息会进入到createReceive
4.根据消息范例匹配进入到handleMessage
5.获取发送者、自身的ActorRef
6.打印信息并向发送者回复消息
  1. package com.source.pekko;
  2. import org.apache.pekko.actor.AbstractActor;
  3. import org.apache.pekko.actor.ActorRef;
  4. import org.apache.pekko.japi.pf.ReceiveBuilder;
  5. /**
  6. * 继承AbstractActor定义自己的actor
  7. * Actor可以发送和接收消息
  8. */
  9. public class PekkoRpcReceiverActor extends AbstractActor {
  10.     /**
  11.      * 实现接收消息
  12.      * @return
  13.      */
  14.     @Override
  15.     public Receive createReceive() {
  16.         return ReceiveBuilder.create()
  17.                 /**接收到PekkoData消息交给handleMessage处理
  18.                  * flink PekkoRpcActor 155行也是这样处理的
  19.                  */
  20.                 .match(PekkoData.class, this::handleMessage)
  21.                 .build();
  22.     }
  23.     /**
  24.      * 处理具体消息
  25.      * @param message
  26.      */
  27.     private void handleMessage(final PekkoData message) {
  28.         /** 获取发送者,发送者对应的就是actorRef */
  29.         ActorRef sender = getSender();
  30.         ActorRef self = getSelf();
  31.         /** 打印 */
  32.         System.out.println("PekkoRpcReceiverActor类收到:" +sender + ":发送者=>" + message.getInfo());
  33.         /** 回复消息 向发送者sender 回复word 的消息 回复者是当前actorRef*/
  34.         /** 4、Actor 可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的 Actor  */
  35.         sender.tell(new PekkoData("word"),self);
  36.     }
  37. }
复制代码
3.3.2.3.PekkoRpcSenderActor 类

1.PekkoRpcSenderActor 发送Actor类继承了AbstractActor
2.也就是说该类可以举行吸收发送消息
3.吸收消息会进入到createReceive
4.根据消息范例匹配进入到handleMessage
5.获取发送者的ActorRef
6.打印信息
  1. package com.source.pekko;
  2. import org.apache.pekko.actor.AbstractActor;
  3. import org.apache.pekko.actor.ActorRef;
  4. import org.apache.pekko.japi.pf.ReceiveBuilder;
  5. /**
  6. * 继承AbstractActor定义自己的actor
  7. * Actor可以发送和接收消息
  8. */
  9. public class PekkoRpcSenderActor extends AbstractActor {
  10.     /**
  11.      * 实现接收消息
  12.      * @return
  13.      */
  14.     @Override
  15.     public Receive createReceive() {
  16.         return ReceiveBuilder.create()
  17.                 /**接收到PekkoData消息交给handleMessage处理
  18.                  * flink PekkoRpcActor 155行也是这样处理的
  19.                  */
  20.                 .match(PekkoData.class, this::handleMessage)
  21.                 .build();
  22.     }
  23.     private void handleMessage(final PekkoData message) {
  24.         /** 获取发送者,发送者对应的就是actorRef */
  25.         ActorRef sender = getSender();
  26.         /** 打印 */
  27.         System.out.println("PekkoRpcSenderActor类收到:" +sender + ":发送者=>" + message.getInfo());
  28.     }
  29. }
复制代码
3.3.2.4. Demo 类

1.创建ActorSystem,名字为flink
2.获取PekkoRpcReceiverActor的ActorRef如许就可以举行发送消息了、吸收消息了
3.获取PekkoRpcSenderActor的ActorRef如许就可以举行发送消息了、吸收消息了
4.通过PekkoRpcSenderActor的actorRef 向PekkoRpcReceiverActor发送消息
5.PekkoRpcReceiverActor类中的createReceive吸收到消息后会匹配范例转入handleMessage
6.打印信息,然后通过自身actorRef 向PekkoRpcSenderActor回复消息
7.PekkoRpcSenderActor的createReceive方法吸收到后转入handleMessage
8.打印回复信息。
9竣事程序。
  1. package com.source.pekko;
  2. import org.apache.pekko.actor.ActorRef;
  3. import org.apache.pekko.actor.ActorSystem;
  4. import org.apache.pekko.actor.Props;
  5. public class Demo {
  6.     public static void main(String[] args) {
  7.         /**创建actorSystem*/
  8.         ActorSystem actorSystem = ActorSystem.create("flink");
  9.         /**构建PekkoRpcActor的ActorRef*/
  10.         ActorRef pekkoRpcRef = actorSystem.actorOf(Props.create(PekkoRpcReceiverActor.class), "PekkoRpcReceiverActor");
  11.         /**构建PekkoRpcSenderActor的ActorRef*/
  12.         ActorRef pekkoRpcSenderRef = actorSystem.actorOf(Props.create(PekkoRpcSenderActor.class), "PekkoRpcSenderActor");
  13.         /** pekkoRpcSenderActor作为发送者 向PekkoRpcActor发送 hello*/
  14.         pekkoRpcRef.tell(new PekkoData("hello"),pekkoRpcSenderRef);
  15.     }
  16. }
复制代码
运行结果

3.4.Flink RPC通信

3.4.1Flink RPC整体架构

Flink RPC框架设计相对比较复杂,底层基于Pekko构建的通信系统,Java 动态代理构建RpcGateway接口的代理类
Flink RPC UML图

如上图 Flink RPC UML图
1.RpcGateway接口Flink RPC底层通信用到的动态代理,动态代理中使用的目的类实现的接口最终都是RpcGateway(也就是说动态代理创建的接口最上层都是RpcGateway)
2.RpcEndpoint消息通信组件,底层都有的通信实体都要继承RpcEndpoint
3.FenceRpcEndpoint类是内部会有一个fenceToken发送消息的时候两个 token一样的时候才气发成功,FencePekkoInvocationHandler、FenceRpcGateway也一样
4.RpcEndpoint 内部使用到了RpcService、RpcService
5.RpcService就是用来服务Flink RPC通信的服务类,内部会创建RpcEndpoint的自身代理,获取远程代理。RpcService实现类是PekkoRpcService
6.RpcService 在详细通信类构建对象的时候super父类构造器也就是RpcEndpoint类的时候会初始化RpcServer代表自身代理。
7.PekkoInvocationHandler、FencePekkoInvocationHandler实现了java InvocationHandler接口,也就是说他们里面肯定有实现的invoke方法
8.Dispatcher及其子类、ResourceManager及其子类、JobMaster最终都继承了RpcEndpoint,也就是说他们都具备了通信的特质
3.4.2.RpcGateway

Rpc网关,用于远程调用的代理接接口,RPC通信的接口都继承RpcGateway,java动态代理类最终创建。
Proxy类:这个类提供了创建动态代理类和实例的静态方法。
  1. public static Object newProxyInstance(ClassLoader loader,                                       Class<?>[] interfaces,                                       InvocationHandler h)  
  2. interfaces=>实现了RpcGateWay的接口
  3. 如:ResourceManagerGateway、JobMasterGateway、TaskExecutorGateway
复制代码
3.4.3.RpcEndpoint

1)RpcEndpoint抽象类中定义了RPC组件的基本实现,所有需要实现RPC服务的组件都会继承RpcEndpoint,
RpcEndpoint内部包罗了endpointId 用来标识当前RPC节点的唯一标识,RpcEndpoint借助RpcService启动RpcServer。
2)FencedRpcEndpoint继承RpcEndpoint,内部增长了fencingToken字段,实现了FencedRpcEndpoint的节点都会有一个fencingToken,当远程RPC调用时,会比较访问者和被访问者的fencingToken是否一致,一致了才会举行后续操纵。
3)FencedRpcEndpoint实现类有ResourceManager、JobMaster、TaskExecutor,RpcEndpoint的实现类有TaskExecutor
3.4.4.RpcService

创建时间ClusterEntrypoint 开始启动集群初始化的时候
  1. private RpcService commonRpcService;
  2. ClusterEntrypoint .runCluster ->
  3. initializeServices ->
  4. commonRpcService =
  5.                     RpcUtils.createRemoteRpcService(
  6.                             rpcSystem,
  7.                             configuration,
  8.                             configuration.get(JobManagerOptions.ADDRESS),
  9.                             getRPCPortRange(configuration),
  10.                             configuration.get(JobManagerOptions.BIND_HOST),
  11.                             configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));
复制代码
内部提供了RpcServer的创建和启动方法,启动RpcServer(startServer)过程中,通过RpcEndpoint地址创建Akka actor实例,并基于Actor实例构建RpcServer接口的动态代理类
connect方法:连接到所提供地址下的远程rpc服务器。返回一个rpc网关(代理对象),该网关可以用于与rpc服务器通信
3.3.5.RpcServer

创建时间RpcEndpoint 构建的时候创建
RpcServer接口通过PekkoInvocationHandler动态代理类实现,所有远程获本地的执行请求,最终都会转换到PekkoInvocationHandler代理类中执行,也就是InvocationHandler的invoke方法
public ResourceManagersuper() ->
protected FencedRpcEndpoint super() ->
RpcEndpoint -> this.rpcServer = rpcService.startServer(this);
核心点:所有RpcEndpoint启动的时候调用start()方法,最终都会流转到RpcEndpoint的onStart()方法
原因如下:
  1. ClusterEntrypoint.dispatcherResourceManagerComponentFactory.create() ->
  2. DefaultDispatcherResourceManagerComponentFactory.create ->
  3. resourceManagerService.start() ->
  4. ResourceManagerServiceImpl.start() ->
  5. StandaloneLeaderElection.startLeaderElection->
  6. ResourceManagerServiceImpl.grantLeadership->
  7. startNewLeaderResourceManager()->
  8. startResourceManagerIfIsLeader->resourceManager.start();
  9. RpcEndpoint.start ->
  10.     public void start() {
  11.         rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
  12.     }
  13.    ===========================================
  14. PekkoRpcActor.createReceive() ->
  15. handleControlMessage() ->
  16. StoppedState.start() ->
  17. RpcEndpoint.internalCallOnStart()->
  18. onStart()
复制代码
3.3.6.PekkoRpcActor

继承了AbstractActor,实现了createReceive(),也就是说Flink RPC 所有通信都会被createReceive
之后根据消息范例流转到对应的handleMessage(),消息范例有RemoteHandshakeMessage握手消息、ControlMessages 控制类消息比如start,其他消息(RpcInvocation)
暂时无法在飞书文档外展示此内容
3.3.7.场景引导方式查看源码

假设设计一个TaskExecutor进程 向ResourceManager进程注册,如何设?
1.TaskExecutor、ResourceManager要是可以举行通信的(RpcEndpoint)
1.TaskManager要能获取到ResourceManager的代理对象(TaskExecutorGateway、ResourceManagerGateway)
2.TaskManager获取到代理对象之后要知道调用ResourceManager的那个方法举行注册(ResourceManagerGateway.registerTaskExecutor)
3.要实现有能连接ResourceManager进程的通佩服务(RpcService)
4.建立通信连接后要有能处理消息的公共类(PekkoRpcActor)
5.满意以上条件了,就相当于TaskManager、ResourceManager本身就是一个可以通信的进程,本地通信 本身与本身通信(RpcServer)
基于上面的设计,我们从Flink代码中可以找到对应的实现类、接口
3.5.TaskExecutor向ResourceManager注册 debug

TaskExecutor创建对象启动的时候会触发 onStart方法
Flink 内部所有的RpcEndpoint 实现(TaskExecutor,ResourceManager、JobMaster)等第一次启动都会触发onstart方法的执行,这是pekko的内部机制
1.进入到TaskExecutor的onStart方法后,调用startResourceManagerServices启动相关的服务

startResourceManagerServices方法内部做了如下操纵
2.resourceManagerLeaderRetriever监听服务中构建一个ResourceManagerLeaderListener会监听ResourceManager Leader,该类中的notifyLeaderAddress方法会在第一次启动、ResourceManager Leader的时候触发
3.启动taskSlotTable(后面章节举行分析)
4.启动jobLeaderService(后面章节举行分析)

4.notifyLeaderAddress方法会在第一次启动、ResourceManager Leader的时候触发,直接进去方法内部

5.获取ResourceManager地址
6.reconnectToResourceManager真正向ResouceManager注册的方法

7.关闭历史已经连接的ResourceManager
8.启动注册超时时间
9.试图连接ResourceManager(内部会调用connectToResourceManager)

10.方法内部调用继续下一步

11.TaskExecutorRegistration是TaskExecutor在注册到ResourceManager时提供的信息
12.TaskExecutorToResourceManagerConnection维护TaskExecutor与ResourceManager的连接
13.start真正连接注册的方法

14.查抄状态
15.createNewRegistration()注册成功、注册失败会回调方法
16.newRegistration.startRegistration()开始注册

17.调用RpcService.connect(内部就会用到java动态代理)

18.调用connectInternal方法创建代理类
19.FencedPekkoInvocationHandler刚好是实现了InvocationHandler

20.调用ask方法举行握手,包管ResourceManager能正常通信

21.Proxy.newProxyInstance创建代理实现

22.创建完代理类会通过异步编程调用register同期间理对象作为参数

23.invokeRegistration真正触发调用的方法进入实现类看

24.实现类中的invokeRegistration方法内部调用了resourceManager.registerTaskExecutor方法,此时还没有发送到ResourceManager,方法会流转到PekkoRpcActorl类中。

25.消息会跳转到PekkoInvocationHandler类的invoke方法
26.判断本地消息还是远程消息,由于ResourceManager是远程消息,以是会调用invokeRpc


27.将方法参数等封装成RpcInvocation,然后调用Pekko底层的ask发送消息
28.消息会被到PekkoRpcActor类中的方法所吸收


29.PekkoRpcActor类中createReceive方法吸收到数据,流转到handleMessage方法中

30.handleMessage方法中会剖析出来RpcInvocation获取到方法、参数、参数范例
31.通过java反射机制调用method.invoke 传入参数最终向目的类发送消息

32.最后进入到ResourceManager.registerTaskExecutor方法

如愿以偿进入最终我们预想的方法。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

瑞星

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表