ToB企服应用市场:ToB评测及商务社交产业平台

标题: spark 流处理的几个实例 [打印本页]

作者: 北冰洋以北    时间: 2023-4-7 04:00
标题: spark 流处理的几个实例
最近写了几个简单的spark structured streaming  的代码实例。 目的是熟悉spark 开发环境搭建, spark 代码开发流程。
开发环境:
系统:win 11 
java : 1.8
scala:2.13 
工具:idea 2022.2  ,maven 3, git 2.37
spark : 3.3.2
一, 使用 spark 结构化流读取文件数据,并做单词统计。
代码:
  1. package org.example;
  2. import org.apache.spark.sql.Dataset;
  3. import org.apache.spark.sql.Row;
  4. import org.apache.spark.sql.SparkSession;
  5. import org.apache.spark.sql.streaming.OutputMode;
  6. import org.apache.spark.sql.streaming.StreamingQuery;
  7. import org.apache.spark.sql.streaming.StreamingQueryException;
  8. import org.apache.spark.sql.types.DataTypes;
  9. import org.apache.spark.sql.types.StructType;
  10. import java.util.concurrent.TimeoutException;
  11. public class Main {
  12.     /*
  13.     例子:从文件中读取流, 被定义模式,生成dataset ,使用sql api 进行分析。
  14.      */
  15.     public static void main(String[] args) throws TimeoutException, StreamingQueryException {
  16.         System.out.println("Hello world!");
  17.         SparkSession spark = SparkSession.builder().appName("spark streaming").config("spark.master", "local")
  18.                 .config("spark.sql.warehouse.dir", "file:///app/")
  19.                 .getOrCreate();
  20.         spark.sparkContext().setLogLevel("ERROR");
  21.         StructType schema =
  22.                 new StructType().add("empId", DataTypes.StringType).add("empName", DataTypes.StringType)
  23.                         .add("department", DataTypes.StringType);
  24.         Dataset<Row> rawData = spark.readStream().option("header", false).format("csv").schema(schema)
  25.                 .csv("D:/za/spark_data/*.csv");
  26.         rawData.createOrReplaceTempView("empData");
  27.         Dataset<Row> result = spark.sql("select count(*), department from  empData group by department");
  28.         StreamingQuery query = result.writeStream().outputMode("complete").format("console").start();  // 每次触发,全表输出
  29.         query.awaitTermination();
  30.     }
  31. }
复制代码
输出:

二, 使用 spark 结构化流读取socket流,做单词统计,使用Java编程
代码:
  1. package org.example;
  2. import org.apache.spark.api.java.function.FlatMapFunction;
  3. import org.apache.spark.sql.Dataset;
  4. import org.apache.spark.sql.Encoders;
  5. import org.apache.spark.sql.Row;
  6. import org.apache.spark.sql.SparkSession;
  7. import org.apache.spark.sql.streaming.StreamingQuery;
  8. import org.apache.spark.sql.streaming.StreamingQueryException;
  9. import java.util.Arrays;
  10. import java.util.concurrent.TimeoutException;
  11. public class SocketStreaming_wordcount {
  12.     /*
  13.      * 从socket 读取字符流,并做word count分析
  14.      *
  15.      * */
  16.     public static void main(String[] args) throws TimeoutException, StreamingQueryException {
  17.         SparkSession spark = SparkSession
  18.                 .builder()
  19.                 .appName("JavaStructuredNetworkWordCount")
  20.                 .config("spark.master", "local")
  21.                 .getOrCreate();
  22.         // dataframe 表示 socket 字符流
  23.         Dataset<Row> lines = spark
  24.                 .readStream()
  25.                 .format("socket")
  26.                 .option("host", "localhost")
  27.                 .option("port", 9999)
  28.                 .load();
  29. //  把一行字符串切分为 单词
  30.         Dataset<String> words = lines
  31.                 .as(Encoders.STRING())
  32.                 .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
  33. //  对单词分组计数
  34.         Dataset<Row> wordCounts = words.groupBy("value").count();
  35. //  开始查询并打印输出到console
  36.         StreamingQuery query = wordCounts.writeStream()
  37.                 .outputMode("complete")
  38.                 .format("console")
  39.                 .start();
  40.         query.awaitTermination();
  41.     }
  42. }
复制代码
输出:

二, 使用 spark 结构化流读取socket流,做单词统计,使用scala 编程
代码:
  1. package org.example
  2. import org.apache.spark.sql.SparkSession
  3. object Main {
  4.   def main(args: Array[String]): Unit = {
  5.     val spark = SparkSession
  6.       .builder
  7.       .appName("streaming_socket_scala")
  8.       .config("spark.master", "local")
  9.       .getOrCreate()
  10.     import spark.implicits._
  11.     // 创建datafram 象征从网络socket 接收流
  12.     val lines = spark.readStream
  13.       .format("socket")
  14.       .option("host", "localhost")
  15.       .option("port", 9999)
  16.       .load()
  17.     // 切分一行成单词
  18.     val words = lines.as[String].flatMap(_.split(" "))
  19.     // 进行单词统计
  20.     val wordCounts = words.groupBy("value").count()
  21.     // 开始查询并输出
  22.     val query = wordCounts.writeStream
  23.       .outputMode("complete")
  24.       .format("console")
  25.       .start()
  26.     query.awaitTermination()
  27.   }
  28. }
复制代码
输出:

 
功能比较简单,代码比较简单,可以在网络上找到很多。  但是也是一个完整的spark结构流代码开发流程。权当熟悉下开发流程。
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4