目录
一、框架流程:
二、Maven 搭建 Scala 导入POM依赖
三、根据流程举行编写
1、实例 Master
2、创建 RpcEnv
3、创建 RpcEndpoint
4、生成 RpcEndpointRef
5、RpcEndpointRef发送消息
6、防止还没收到消息程序就结束运行
7、验证一下,看看结果
四、完整代码
一、框架流程:
1、实例 Master
2、创建 RpcEnv
3、Master向 RpcEnv 注册
4、生成 RpcEndpointRef
5、RpcEndpointRef发送消息
二、Maven 搭建 Scala 导入POM依赖
起首是主模块
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>org.apache.spark</groupId>
- <artifactId>10</artifactId>
- <version>1.0-SNAPSHOT</version>
- <packaging>pom</packaging>
- <modules>
- <module>11</module>
- </modules>
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
- <build>
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.2.2</version>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-site-plugin</artifactId>
- <version>3.5.1</version>
- </plugin>
- </plugins>
- </pluginManagement>
- <plugins>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>add-source</goal>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>scala-test-compile</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <executions>
- <execution>
- <phase>compile</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.4.3</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </project>
复制代码 然后创建一个子模块
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>10</artifactId>
- <version>1.0-SNAPSHOT</version>
- </parent>
- <artifactId>11</artifactId>
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.arakhne.afc.slf4j</groupId>
- <artifactId>slf4j-log4j</artifactId>
- <version>17.0</version>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>2.13.15</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.13</artifactId>
- <version>3.5.4</version>
- </dependency>
- </dependencies>
- </project>
复制代码 三、根据流程举行编写
在子模块下创建Scala类Master
1、实例 Master
- class Master(val rpcEnv:RpcEnv) extends ThreadSafeRpcEndpoint {
- override def receive: PartialFunction[Any, Unit] = {
- case "hello" => println("hello,这是一个测试")
- case _ => println("receive uncatchable information!")
- }
- }
复制代码 我们起首必要伪装一下包名,否则无法调用 RpcEnv
然后继承线程安全的 ThreadSafeRpcEndpoint 成为一个Endpoint
重写receive方法,用到了偏函数。
2、创建 RpcEnv
- val sparkConf:SparkConf = new SparkConf()
- val securityManager:SecurityManager = new SecurityManager(sparkConf)
- // 1、创建RpcEnv
- val rpcEnv1 = RpcEnv.create("master","127.0.0.1",10000,sparkConf,securityManager)
复制代码
3、创建 RpcEndpoint
- // 2、创建RpcEndpoint
- val master = new Master(rpcEnv1)
复制代码 Master 继承了 ThreadSafeRpcEndpoint 以是可以直接这么写。
4、生成 RpcEndpointRef
- // 3、向RpcEnv注册RpcEndpoint并返回RpcEndpointRef
- val endpointEndpointRef = rpcEnv1.setupEndpoint("endpoint1", master)
复制代码 代码现实上很简单,由于都集成好了
5、RpcEndpointRef发送消息
- // 4、使用RpcEndpointRef发送消息
- endpointEndpointRef.send("hello")
复制代码 6、防止还没收到消息程序就结束运行
- rpcEnv.awaitTermination()
复制代码 7、验证一下,看看结果
乐成!
再试试传输别的信息
- endpointEndpointRef.send("123456")
复制代码
没有问题。
四、完整代码
- package org.apache.spark.psy
- import org.apache.spark.SparkConf
- import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint}
- import org.apache.spark.SecurityManager
- class Master(val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint{
- override def receive: PartialFunction[Any, Unit] = {
- case "hello" => println("hello,这是一个测试")
- case _ => println("receive uncatchable information!")
- }
- }
- object Master{
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf()
- val securityManager = new SecurityManager(conf)
- //1.创建RpcEnv
- val rpcEnv = RpcEnv.create("SparkMaster","localhost",8888,conf,securityManager)
- //2.创建RpcEndPoint
- val master = new Master(rpcEnv)
- //3.使用RpcEnv注册RpcEndPoint
- val masterEndpointRef = rpcEnv.setupEndpoint("Master",master)
- //4.通过返回的RpcEndPointRef发送消息
- masterEndpointRef.send("123456")
- rpcEnv.awaitTermination()
- }
- }
复制代码
OK,代码不是很难,由于都已经集成好了,接下来我们要试着自己去集成代码,自己尝试完成一个 应用 Rpc 框架
加油!坚持就是胜利!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |