摸鱼大数据——Spark Structured Steaming——Spark 和 Kafka 整合 ...

打印 上一主题 下一主题

主题 927|帖子 927|积分 2781

三、Spark 和 Kafka 整合

Spark天然支持集成Kafka, 基于Spark读取Kafka中的数据, 同时可以实施精准一次(仅且只会处理一次)的语义, 作为程序员, 仅需要关心如何处理消息数据即可, 结构化流会将数据读取过来, 转换为一个DataFrame的对象, DataFrame就是一个无界的DataFrame, 是一个无限增大的表
1、整合Kafka预备工作

说明: Jar包上传的位置说明
  1.  如何放置相关的Jar包?  
  2.      1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,
  3.          目录位置: /export/server/spark/jars
  4.     
  5.      2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包
  6.          目录位置: /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/
  7.     
  8.      3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下
  9.          hdfs的spark的jars目录下:  hdfs://node1:8020/spark/jars
  10.          
  11.  ​
  12.      请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案:  spark-submit --jars jar包路径
  13.     
  14.      jar包下载地址: https://mvnrepository.com/
复制代码

2、从kafka中读取数据

spark和kafka集成官网文档:
Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.1.2 Documentation

2.1 流式处理

官方示例:

  1.  # 订阅Kafka的一个Topic,从最新的消息数据开始消费
  2.  df = spark \
  3.    .readStream \
  4.    .format("kafka") \
  5.    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  6.    .option("subscribe", "topic1") \
  7.    .load()
  8.  df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  9.  ​
  10.  ​
  11.  # 订阅Kafka的多个Topic,多个Topic间使用英文逗号进行分隔。从最新的消息数据开始消费
  12.  df = spark \
  13.    .readStream \
  14.    .format("kafka") \
  15.    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  16.    .option("subscribe", "topic1,topic2") \
  17.    .load()
  18.  df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  19.  ​
  20.  ​
  21.  # 订阅一个Topic,并且指定header信息
  22.  df = spark \
  23.    .readStream \
  24.    .format("kafka") \
  25.    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  26.    .option("subscribe", "topic1") \
  27.    .option("includeHeaders", "true") \
  28.    .load()
  29.  df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  30.  ​
  31.  ​
  32.  # 订阅符合规则的Topic,从最新的数据开始消费
  33.  df = spark \
  34.    .readStream \
  35.    .format("kafka") \
  36.    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  37.    .option("subscribePattern", "topic.*") \
  38.    .load()
  39.  df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
复制代码
练习示例

   对接kafka后,返回的结果数据内容:
  1.  key: 发送数据的key值。如果没有,就为null
  2.  value: 最重要的字段。发送数据的value值,也就是消息内容。如果没有,就为null
  3.  topic: 表示消息是从哪个Topic中消费出来
  4.  partition: 分区编号。表示消费到的该条数据来源于Topic的哪个分区
  5.  offset: 表示消息偏移量
  6.  ​
  7.  timestamp: 接收的时间戳
  8.  timestampType: 时间戳类型(无意义)
复制代码
范例的说明:
  列名范例keybinaryvaluebinarytopicstringpartitionintoffsetlongtimestamptimestamptimestampTypeintheaders (optional)array  从某一个Topic中读取消息数据
  1.  from pyspark import SparkConf, SparkContext
  2.  import os
  3.  from pyspark.sql import SparkSession
  4.  import pyspark.sql.functions as F
  5.  ​
  6.  # 绑定指定的Python解释器
  7.  os.environ['SPARK_HOME'] = '/export/server/spark'
  8.  os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  9.  os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  10.  ​
  11.  if __name__ == '__main__':
  12.      # 1- 创建SparkSession对象
  13.      spark = SparkSession.builder\
  14.          .config("spark.sql.shuffle.partitions",1)\
  15.          .appName('ss_read_kafka_1_topic')\
  16.          .master('local[*]')\
  17.          .getOrCreate()
  18.  ​
  19.      # 2- 数据输入
  20.      # 默认从最新的地方开始消费
  21.      df = spark.readStream\
  22.          .format("kafka")\
  23.          .option("kafka.bootstrap.servers","node1:9092")\
  24.          .option("subscribe","itheima")\
  25.          .load()
  26.  ​
  27.      # 查看类型
  28.      print(type(df))
  29.  ​
  30.      # 注意: 字符串需要解码!!!
  31.      etl_df = df.select(
  32.          F.expr("cast(key as string) as key"),
  33.          F.decode(df.key,'utf8'),
  34.          F.expr("cast(value as string) as value"),
  35.          F.decode(df.value, 'utf8'),
  36.          df.topic,
  37.          df.partition,
  38.          df.offset
  39.      )
  40.  ​
  41.      # 获取数据
  42.      etl_df.writeStream.format("console").outputMode("append").start().awaitTermination()
  43.  ​
  44.      # 3- 数据处理
  45.      # result_df1 = init_df.select(F.expr("cast(value as string) as value"))
  46.      # # selectExpr = select + F.expr
  47.      # result_df2 = init_df.selectExpr("cast(value as string) as value")
  48.      # result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))
  49.  ​
  50.      # 4- 数据输出
  51.      # 5- 启动流式任务
  52.      """
  53.          如果有多个输出,那么只能在最后一个start的后面写awaitTermination()
  54.      """
  55.      # result_df1.writeStream.queryName("result_df1").format("console").outputMode("append").start()
  56.      # result_df2.writeStream.queryName("result_df2").format("console").outputMode("append").start()
  57.      # result_df3.writeStream.queryName("result_df3").format("console").outputMode("append").start().awaitTermination()
  58.  ​
复制代码

2.2 批处理

官方示例:

  1.  # 订阅一个Topic主题, 默认从最早到最晚的偏移量范围
  2.  df = spark \
  3.    .read \
  4.    .format("kafka") \
  5.    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  6.    .option("subscribe", "topic1") \
  7.    .load()
  8.  df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  9.  
  10.  
  11.  # 批处理订阅Kafka的多个Topic数据。并且可以通过startingOffsets和endingOffsets指定要消费的消息偏移
  12.  # 量(offset)范围。"topic1":{"0":23,"1":-2} 含义是:topic1,"0":23从分区编号为0的分区的
  13.  # offset=23地方开始消费,"1":-2 从分区编号为1的分区的最开始的地方开始消费
  14.  ​
  15.  df = spark \
  16.  .read \
  17.  .format("kafka") \
  18.  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  19.  .option("subscribe", "topic1,topic2") \
  20.  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
  21.  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
  22.  .load()
  23.  df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  24.  ​
  25.  ​
  26.  # 通过正则匹配多个Topic, 默认从最早到最晚的偏移量范围
  27.  df = spark \
  28.    .read \
  29.    .format("kafka") \
  30.    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  31.    .option("subscribePattern", "topic.*") \
  32.    .option("startingOffsets", "earliest") \
  33.    .option("endingOffsets", "latest") \
  34.    .load()
  35.  df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
复制代码
演示示例

   参数说明:
  选项值说明assign通过一个Json 字符串的方式来表现: {"topicA":[0,1],"topicB":[2,4]}设置使用特定的TopicPartitionssubscribe以逗号分隔的Topic主题列表要订阅的主题列表subscribePattern正则表达式字符串订阅匹配符合条件的Topic。assign、subscribe、subscribePattern任意指定一个。kafka.bootstrap.servers以英文逗号分隔的host:port列表指定kafka服务的地点  订阅一个Topic
  1.  from pyspark import SparkConf, SparkContext
  2.  import os
  3.  from pyspark.sql import SparkSession
  4.  import pyspark.sql.functions as F
  5.  ​
  6.  # 绑定指定的Python解释器
  7.  os.environ['SPARK_HOME'] = '/export/server/spark'
  8.  os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  9.  os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  10.  ​
  11.  if __name__ == '__main__':
  12.      # 1- 创建SparkSession对象
  13.      spark = SparkSession.builder\
  14.          .config("spark.sql.shuffle.partitions",1)\
  15.          .appName('sparksql_read_kafka_1_topic')\
  16.          .master('local[*]')\
  17.          .getOrCreate()
  18.  ​
  19.      # 2- 数据输入
  20.      # 默认从Topic开头一直消费到结尾
  21.      df = spark.read\
  22.          .format("kafka")\
  23.          .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
  24.          .option("subscribe","itheima")\
  25.          .load()
  26.  ​
  27.      # 查看类型
  28.      print(type(df))
  29.  ​
  30.      # 注意: 字符串需要解码!!!
  31.      etl_df = df.select(
  32.          F.expr("cast(key as string) as key"),
  33.          F.decode(df.key,'utf8'),
  34.          F.expr("cast(value as string) as value"),
  35.          F.decode(df.value, 'utf8'),
  36.          df.topic,
  37.          df.partition,
  38.          df.offset
  39.      )
  40.      # 获取数据
  41.      etl_df.show()
  42.  ​
  43.      # # 3- 数据处理
  44.      # result_df1 = init_df.select(F.expr("cast(value as string) as value"))
  45.      # # selectExpr = select + F.expr
  46.      # result_df2 = init_df.selectExpr("cast(value as string) as value")
  47.      # result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))
  48.      # # 4- 数据输出
  49.      # print("result_df1")
  50.      # result_df1.show()
  51.      # print("result_df2")
  52.      # result_df2.show()
  53.      # print("result_df3")
  54.      # result_df3.show()
  55.      # # 5- 释放资源
  56.      # spark.stop()
复制代码


3、数据写入Kafka中

3.1 流式处理

官方示例:

  1.  # 将Key和Value的数据都写入到Kafka当中
  2.  ds = df \
  3.  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  4.  .writeStream \
  5.  .format("kafka") \
  6.  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  7.  .option("topic", "topic1") \
  8.  .start()
  9.  ​
  10.  ​
  11.  # 将Key和Value的数据都写入到Kafka当中。使用DataFrame数据中的Topic字段来指定要将数据写入到Kafka集群
  12.  # 的哪个Topic中。这种方式适用于消费多个Topic的情况
  13.  ds = df \
  14.  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  15.  .writeStream \
  16.  .format("kafka") \
  17.  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  18.  .start()
复制代码
练习示例

   备注Column数据范例可选字段keystring or binary必填字段valuestring or binary可选字段headersarray必填字段topicstring可选字段partitionint  写出到指定Topic
  1.  from pyspark import SparkConf, SparkContext
  2.  import os
  3.  from pyspark.sql import SparkSession
  4.  import pyspark.sql.functions as F
  5.  ​
  6.  # 绑定指定的Python解释器
  7.  os.environ['SPARK_HOME'] = '/export/server/spark'
  8.  os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  9.  os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  10.  ​
  11.  if __name__ == '__main__':
  12.      # 1- 创建SparkSession对象
  13.      spark = SparkSession.builder\
  14.          .config("spark.sql.shuffle.partitions",1)\
  15.          .appName('ss_read_kafka_1_topic')\
  16.          .master('local[*]')\
  17.          .getOrCreate()
  18.  ​
  19.      # 2- 数据输入
  20.      # 默认从最新的地方开始消费
  21.      init_df = spark.readStream\
  22.          .format("kafka")\
  23.          .option("kafka.bootstrap.servers","node1:9092,node2:9092")\
  24.          .option("subscribe","itheima")\
  25.          .load()
  26.  ​
  27.      # 3- 数据处理
  28.      result_df = init_df.select(
  29.          F.expr("concat(cast(value as string),'_itheima') as value")
  30.      )
  31.  ​
  32.      # 4- 数据输出
  33.      # 注意: 咱们修改完直接保存到kafka的itcast主题中,所以控制台没有数据,这是正常的哦!!!
  34.      
  35.      # 5- 启动流式任务
  36.      result_df.writeStream\
  37.          .format("kafka")\
  38.          .option("kafka.bootstrap.servers","node1:9092,node2:9092")\
  39.          .option("topic","itcast")\
  40.          .option("checkpointLocation", "hdfs://node1:8020/ck")\
  41.          .start()\
  42.          .awaitTermination()
复制代码


3.2 批处理

官方示例:

  1.  # 从DataFrame中写入key-value数据到一个选项中指定的特定Kafka topic中
  2.  df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  3.    .write \
  4.    .format("kafka") \
  5.    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  6.    .option("topic", "topic1") \
  7.    .save()
  8.    
  9.  ​
  10.  # 使用数据中指定的主题将key-value数据从DataFrame写入Kafka
  11.  df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  12.    .write \
  13.    .format("kafka") \
  14.    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  15.    .save()
复制代码
演示示例

   备注Column数据范例可选字段keystring or binary必填字段valuestring or binary可选字段headersarray必填字段topicstring可选字段partitionint
  1.  from pyspark import SparkConf, SparkContext
  2.  import os
  3.  from pyspark.sql import SparkSession
  4.  import pyspark.sql.functions as F
  5.  ​
  6.  # 绑定指定的Python解释器
  7.  os.environ['SPARK_HOME'] = '/export/server/spark'
  8.  os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  9.  os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  10.  ​
  11.  if __name__ == '__main__':
  12.      # 1- 创建SparkSession对象
  13.      spark = SparkSession.builder\
  14.          .config("spark.sql.shuffle.partitions",1)\
  15.          .appName('ss_read_kafka_1_topic')\
  16.          .master('local[*]')\
  17.          .getOrCreate()
  18.  ​
  19.      # 2- 数据输入
  20.      # 默认从最新的地方开始消费
  21.      init_df = spark.read\
  22.          .format("kafka")\
  23.          .option("kafka.bootstrap.servers","node1:9092,node2:9092")\
  24.          .option("subscribe","itheima")\
  25.          .load()
  26.  ​
  27.      # 3- 数据处理
  28.      result_df = init_df.select(F.expr("concat(cast(value as string),'_666') as value"))
  29.  ​
  30.      # 4- 数据输出
  31.      # 5- 启动流式任务
  32.      result_df.write.format("kafka")\
  33.          .option("kafka.bootstrap.servers","node1:9092,node2:9092")\
  34.          .option("topic","itcast")\
  35.          .option("checkpointLocation", "hdfs://node1:8020/ck")\
  36.          .save()
复制代码









免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户云卷云舒

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表