ToB企服应用市场:ToB评测及商务社交产业平台

标题: Spark 源码解析(二) 根据 SparkRpc 自己动手实践一个跨节点通讯 [打印本页]

作者: 滴水恩情    时间: 2025-2-21 07:18
标题: Spark 源码解析(二) 根据 SparkRpc 自己动手实践一个跨节点通讯
 目录
一、框架流程:
二、Maven 搭建 Scala 导入POM依赖
三、根据流程举行编写
1、实例 Master
2、创建 RpcEnv
3、创建 RpcEndpoint
4、生成 RpcEndpointRef
5、RpcEndpointRef发送消息
 6、防止还没收到消息程序就结束运行
7、验证一下,看看结果
四、完整代码



一、框架流程:

1、实例 Master
2、创建 RpcEnv
3、MasterRpcEnv 注册
4、生成 RpcEndpointRef
5、RpcEndpointRef发送消息
二、Maven 搭建 Scala 导入POM依赖

起首是主模块
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <groupId>org.apache.spark</groupId>
  7.     <artifactId>10</artifactId>
  8.     <version>1.0-SNAPSHOT</version>
  9.     <packaging>pom</packaging>
  10.     <modules>
  11.         <module>11</module>
  12.     </modules>
  13.     <properties>
  14.         <maven.compiler.source>8</maven.compiler.source>
  15.         <maven.compiler.target>8</maven.compiler.target>
  16.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  17.     </properties>
  18.     <build>
  19.         <pluginManagement>
  20.             <plugins>
  21.                 <plugin>
  22.                     <groupId>net.alchim31.maven</groupId>
  23.                     <artifactId>scala-maven-plugin</artifactId>
  24.                     <version>3.2.2</version>
  25.                 </plugin>
  26.                 <plugin>
  27.                     <groupId>org.apache.maven.plugins</groupId>
  28.                     <artifactId>maven-site-plugin</artifactId>
  29.                     <version>3.5.1</version>
  30.                 </plugin>
  31.             </plugins>
  32.         </pluginManagement>
  33.         <plugins>
  34.             <plugin>
  35.                 <groupId>net.alchim31.maven</groupId>
  36.                 <artifactId>scala-maven-plugin</artifactId>
  37.                 <executions>
  38.                     <execution>
  39.                         <id>scala-compile-first</id>
  40.                         <phase>process-resources</phase>
  41.                         <goals>
  42.                             <goal>add-source</goal>
  43.                             <goal>compile</goal>
  44.                         </goals>
  45.                     </execution>
  46.                     <execution>
  47.                         <id>scala-test-compile</id>
  48.                         <phase>process-test-resources</phase>
  49.                         <goals>
  50.                             <goal>testCompile</goal>
  51.                         </goals>
  52.                     </execution>
  53.                 </executions>
  54.             </plugin>
  55.             <plugin>
  56.                 <groupId>org.apache.maven.plugins</groupId>
  57.                 <artifactId>maven-compiler-plugin</artifactId>
  58.                 <executions>
  59.                     <execution>
  60.                         <phase>compile</phase>
  61.                         <goals>
  62.                             <goal>compile</goal>
  63.                         </goals>
  64.                     </execution>
  65.                 </executions>
  66.             </plugin>
  67.             <plugin>
  68.                 <groupId>org.apache.maven.plugins</groupId>
  69.                 <artifactId>maven-shade-plugin</artifactId>
  70.                 <version>2.4.3</version>
  71.                 <executions>
  72.                     <execution>
  73.                         <phase>package</phase>
  74.                         <goals>
  75.                             <goal>shade</goal>
  76.                         </goals>
  77.                         <configuration>
  78.                             <filters>
  79.                                 <filter>
  80.                                     <artifact>*:*</artifact>
  81.                                     <excludes>
  82.                                         <exclude>META-INF/*.SF</exclude>
  83.                                         <exclude>META-INF/*.DSA</exclude>
  84.                                         <exclude>META-INF/*.RSA</exclude>
  85.                                     </excludes>
  86.                                 </filter>
  87.                             </filters>
  88.                         </configuration>
  89.                     </execution>
  90.                 </executions>
  91.             </plugin>
  92.         </plugins>
  93.     </build>
  94. </project>
复制代码
然后创建一个子模块
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <parent>
  7.         <groupId>org.apache.spark</groupId>
  8.         <artifactId>10</artifactId>
  9.         <version>1.0-SNAPSHOT</version>
  10.     </parent>
  11.     <artifactId>11</artifactId>
  12.     <properties>
  13.         <maven.compiler.source>8</maven.compiler.source>
  14.         <maven.compiler.target>8</maven.compiler.target>
  15.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  16.     </properties>
  17.     <dependencies>
  18.         <dependency>
  19.             <groupId>org.arakhne.afc.slf4j</groupId>
  20.             <artifactId>slf4j-log4j</artifactId>
  21.             <version>17.0</version>
  22.         </dependency>
  23.         <dependency>
  24.             <groupId>org.scala-lang</groupId>
  25.             <artifactId>scala-library</artifactId>
  26.             <version>2.13.15</version>
  27.         </dependency>
  28.         <dependency>
  29.             <groupId>org.apache.spark</groupId>
  30.             <artifactId>spark-core_2.13</artifactId>
  31.             <version>3.5.4</version>
  32.         </dependency>
  33.     </dependencies>
  34. </project>
复制代码
三、根据流程举行编写

在子模块下创建Scala类Master
1、实例 Master

  1. class Master(val rpcEnv:RpcEnv) extends ThreadSafeRpcEndpoint {
  2.   override def receive: PartialFunction[Any, Unit] = {
  3.     case "hello" => println("hello,这是一个测试")
  4.     case _ => println("receive uncatchable information!")
  5.   }
  6. }
复制代码
我们起首必要伪装一下包名,否则无法调用 RpcEnv
然后继承线程安全的 ThreadSafeRpcEndpoint 成为一个Endpoint
重写receive方法,用到了偏函数。
2、创建 RpcEnv

  1.     val sparkConf:SparkConf = new SparkConf()
  2.     val securityManager:SecurityManager = new SecurityManager(sparkConf)
  3.     // 1、创建RpcEnv
  4.     val rpcEnv1 = RpcEnv.create("master","127.0.0.1",10000,sparkConf,securityManager)
复制代码

3、创建 RpcEndpoint

  1.     // 2、创建RpcEndpoint
  2.     val master = new Master(rpcEnv1)
复制代码
Master 继承了 ThreadSafeRpcEndpoint 以是可以直接这么写

4、生成 RpcEndpointRef


  1.     // 3、向RpcEnv注册RpcEndpoint并返回RpcEndpointRef
  2.     val endpointEndpointRef = rpcEnv1.setupEndpoint("endpoint1", master)
复制代码
代码现实上很简单,由于都集成好了
5、RpcEndpointRef发送消息

  1.     // 4、使用RpcEndpointRef发送消息
  2.     endpointEndpointRef.send("hello")
复制代码
 6、防止还没收到消息程序就结束运行

  1.     rpcEnv.awaitTermination()
复制代码
7、验证一下,看看结果


乐成!
再试试传输别的信息
  1. endpointEndpointRef.send("123456")
复制代码

 没有问题。
四、完整代码

  1. package org.apache.spark.psy
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint}
  4. import org.apache.spark.SecurityManager
  5. class Master(val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint{
  6.   override def receive: PartialFunction[Any, Unit] = {
  7.     case "hello" => println("hello,这是一个测试")
  8.     case _ => println("receive uncatchable information!")
  9.   }
  10. }
  11. object Master{
  12.   def main(args: Array[String]): Unit = {
  13.     val conf = new SparkConf()
  14.     val securityManager = new SecurityManager(conf)
  15.     //1.创建RpcEnv
  16.     val rpcEnv = RpcEnv.create("SparkMaster","localhost",8888,conf,securityManager)
  17.     //2.创建RpcEndPoint
  18.     val master = new Master(rpcEnv)
  19.     //3.使用RpcEnv注册RpcEndPoint
  20.     val masterEndpointRef = rpcEnv.setupEndpoint("Master",master)
  21.     //4.通过返回的RpcEndPointRef发送消息
  22.     masterEndpointRef.send("123456")
  23.     rpcEnv.awaitTermination()
  24.   }
  25. }
复制代码

OK,代码不是很难,由于都已经集成好了,接下来我们要试着自己去集成代码,自己尝试完成一个 应用 Rpc 框架
加油!坚持就是胜利!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4