ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flume+Kafka+StructuredStreaming(pyspark)+Mysql分布式采集与微批处置惩罚 [打印本页]

作者: 农妇山泉一亩田    时间: 2024-12-4 18:50
标题: Flume+Kafka+StructuredStreaming(pyspark)+Mysql分布式采集与微批处置惩罚
下面根据数据流向逐一介绍 Flume  -> Kafka -> StructuredStreaming -> Mysql

目录
下面根据数据流向逐一介绍 Flume  -> Kafka -> StructuredStreaming -> Mysql
1. Flume
2. Kafka
3. 生产数据
4. StructuredStreaming
5. Mysql
6.  启动步伐


1. Flume


为了监测某个目录下,所有文件是否有新的行产生,因此,我们source 用 Taildir Source 见官网
Flume 1.11.0 User Guide — Apache Flume
我在node3上构建了Flume。
  1. a1.sources =s1
  2. a1.channels=c1
  3. a1.sinks = k1
  4. #define s1
  5. a1.sources.s1.type = TAILDIR
  6. #指定一个元数据记录文件
  7. a1.sources.s1.positionFile=/export/server/flume/position/taildir_7mo_kafka.json
  8. #将所有需要监控的数据源变成一个组
  9. a1.sources.s1.filegroups =f1
  10. #指定了f1是谁:监控目录下所有文件
  11. a1.sources.s1.filegroups.f1 =/export/data/7mo_data/.*
  12. #指定f1采集到的数拟的header中包含一个KV对
  13. a1.sources.s1.headers.f1.type = 7mo
  14. a1.sources.s1.fileHeader = true
复制代码
此中 /export/data/7mo_data 以及 /export/server/flume/position/ 目录自己先创建。
我们的sink是kafka,因此Kafka sink 见官网
Flume 1.11.0 User Guide — Apache Flume
具体如下(channels 也一起附上了):
  1. #define c1
  2. a1.channels.c1.type = memory
  3. a1.channels.c1.capacity=10000
  4. a1.channels.c1.transactionCapacity=1000
  5. #define k1
  6. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  7. a1.sinks.k1.kafka.topic=7MO-MSG
  8. a1.sinks.k1.kafka.bootstrap.servers= node2:9092
  9. a1.sinks.k1.kafka.flumeBatchsize =10
  10. a1.sinks.k1.kafka.producer.acks = 1
  11. a1.sinks.k1.kafka.producer.linger.ms=100
  12. #bind
  13. a1.sources.s1.channels=c1
  14. a1.sinks.k1.channel=c1
复制代码
留意topic, servers的设置,根据现实环境改。
2. Kafka

留意是创建topic即可,我在node2上创建(记得先去一键启动zookeeper和kafka)
  1. # 先进入kafka目录
  2. cd /export/server/kafka
  3. # 创建topic
  4. bin/kafka-topics.sh --create --topic 7MO-MSG \
  5. --partitions 3 --replication-factor 2  \
  6. --bootstrap-server node2:9092
复制代码

3. 生产数据

采用momo聊天消息。示例数据如下,采用  ‘\t’ 分开各个字段,模拟数据可在文章顶头处下载。

直接上代码,每隔1秒钟产生一条数据,数据来自data1.tsv文件。
  1. import csv
  2. import argparse
  3. import time
  4. parser = argparse.ArgumentParser(description='momo information')
  5. parser.add_argument('--src', dest='srcFile', type=str, help='sourceFile')
  6. parser.add_argument('--dst', dest='dstFile', type=str, help='destinationFile')
  7. parser.add_argument('--sec', dest='second', default=1, type=str, help='the time of generate one data  ')
  8. args = parser.parse_args()
  9. def writeToDst(row):
  10.     with open(args.dstFile, 'a') as fw:
  11.         line = '\t'.join(row) + '\n'
  12.         fw.write(line)
  13. def main():
  14.     with open(args.srcFile, 'r', encoding='utf-8') as fr:
  15.         reader = csv.reader(fr,delimiter='\t')
  16.         for row in reader:
  17.             writeToDst(row)
  18.             time.sleep(args.second)
  19. if __name__=="__main__":
  20.     main()
复制代码
src 是data1.tsv文件路径,dst是flume要监控的文件目录下的具体文件路径,sec是多少秒采集一条记载。
4. StructuredStreaming


  1. #!/root/anaconda3/envs/pyspark_env/bin/ python3
  2. from pyspark.sql import SparkSession, DataFrame
  3. import os
  4. from pyspark.sql.functions import col, split, to_timestamp
  5. from pyspark.sql.types import StructType, StringType, StructField
  6. import argparse
  7. parser = argparse.ArgumentParser(description='argparse mysql')
  8. parser.add_argument('--user', '-us', type=str, default="root", help="user")
  9. parser.add_argument('--password', '-p', type=str, default='123456', help='password')
  10. parser.add_argument('--driver', '-dr', type=str, default='com.mysql.jdbc.Driver')
  11. parser.add_argument('--dbtable', '-db', type=str, default='Momo', help="table name")
  12. parser.add_argument('--url', '-ur', type=str, default='jdbc:mysql://node1:3306/spark?useSSL=false&useUnicode=yes&characterEncoding=utf8', help="url name")
  13. parser.add_argument('--batchsize', '-b', type=int, default=100, help="batch size")
  14. schema_type = StructType([
  15.                StructField("msg_time", StringType()),
  16.                StructField("sender_account", StringType()),
  17.                StructField("receiver_account", StringType()),
  18.                StructField("context", StringType())
  19.               ])
  20. def insert2mysql(df: DataFrame, batch):
  21.     prop = parser.parse_args()
  22.     print("batch:{} is start".format(batch))
  23.     # df.write.jdbc("jdbc:mysql://node1:3306/spark", 'student', 'append', prop)
  24.     df.write.format("jdbc") \
  25.         .mode("append") \
  26.         .options(user=prop.user,
  27.                  password=prop.password,
  28.                  driver=prop.driver,
  29.                  dbtable=prop.dbtable,
  30.                  url=prop.url,
  31.                  batchsize=prop.batchsize
  32.                  ) \
  33.         .save()
  34. def createSparkSession():
  35.     spark = SparkSession \
  36.         .builder \
  37.         .appName("StructuredKafkaWordCount") \
  38.         .config("spark.sql.shuffle.partitions", 2) \
  39.         .getOrCreate()
  40.     # config("spark.sql.shuffle.partitions",2)  可能需要在spark-submit提交才有效,但是检查点需要改
  41.     spark.sparkContext.setLogLevel('WARN')
  42.     # spark.conf.set("spark.sql.shuffle.partitions", 2)
  43.     return spark
  44. def readFromKafka(spark: SparkSession, topicName):
  45.     lines = spark \
  46.         .readStream \
  47.         .format("kafka") \
  48.         .option("kafka.bootstrap.servers", "node2:9092") \
  49.         .option("subscribe", topicName) \
  50.         .option("startingOffsets", "latest") \
  51.         .option("failOnDataLoss", "false") \
  52.         .load()
  53.     return lines
  54. def process(kafkaStream):
  55.     # 获取数据
  56.     messageDF = kafkaStream.selectExpr("CAST(value AS STRING)").select(split(col("value"),"\t").alias("m"))
  57.     messageDF = messageDF.select(col("m")[0].alias("msg_time"),
  58.                                  col("m")[1].alias("sender_account"),
  59.                                  col("m")[9].alias("receiver_account"),
  60.                                  col("m")[19].alias("context"))
  61.     messageDF = messageDF.withColumn("msg_time", to_timestamp(col("msg_time"), "yyyy-MM-dd HH:mm:ss"))
  62.     messageDF = messageDF.selectExpr("CONCAT( CAST(msg_time AS STRING), '%',  sender_account ) AS id" , "*")
  63.     # messageDF = messageDF.withColumn("msg_time", from_unixtime(messageDF.msg_time.cast("bigint"), 'yyyy-MM-dd HH:mm:ss'))
  64.     # messageDF=messageDF.select(from_json(col("value"),schema_type).alias("data")).select("data.msg_time")
  65.    
  66.     return messageDF
  67. # 测试,将数据打印到控制台
  68. def printToConsole(streamDF):
  69.     streamDF \
  70.         .writeStream \
  71.         .outputMode("Append") \
  72.         .format("console") \
  73.         .option("numRows", "10") \
  74.         .option("truncate", "false") \
  75.         .option("checkpointLocation", "file:///tmp/kafka-sink-cp") \
  76.         .trigger(processingTime="8 seconds") \
  77.         .start()
  78. if __name__ == "__main__":
  79.     # 构建sparksession实例对象
  80.     spark = createSparkSession()
  81.     # 从kafka实时消费数据
  82.     topicName = "7MO-MSG"
  83.     kafkaStream = readFromKafka(spark, topicName)
  84.     # 提取数据,转换数据类型
  85.     stream = process(kafkaStream)
  86.     # 打印输出到控制台
  87.     # printToConsole(stream)
  88.     # 保存数据至mysql
  89.     stream.writeStream \
  90.         .outputMode("append") \
  91.         .foreachBatch(insert2mysql) \
  92.         .trigger(processingTime="8 seconds") \
  93.         .start()
  94.     spark.streams.awaitAnyTermination()
复制代码
自行看即可,要留意,pyspark中structured Streaming的流转成rdd大概dataset有问题(scala的简单的多,没办法学校紧张走的是python),需要直接在流上举行各种转换的操纵方可。
5. Mysql

创建好数据库和表就行了。我的是mysql5,紧张数据库和表是utf8格式的,要不中文显示会出问题。
  1. create table Momo(
  2. id  VARCHAR(30),
  3. msg_time TIMESTAMP,
  4. sender_account VARCHAR(20),
  5. receiver_account VARCHAR(20),
  6. context VARCHAR(500)
  7. )
复制代码
-----------------------------------------------------------------------------------------
6.  启动步伐


       我在node3启动
  1. /export/server/flume/bin/flume-ng agent -n a1 -c /export/server/flume/conf/ -f /export/server/flume/conf/7mo_mem_kafka.properties  -Dflume.root.logger=INFO,console
复制代码

       我在node2启动
  1. /export/server/kafka/bin/kafka-console-consumer.sh --topic 7MO-MSG --bootstrap-server node2:9092
复制代码

      我在node1启动
  1. spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2 ch07_flume_kafka_spark_consumer.py
复制代码

     数据库在node1了,我在windows用navicat查看数据。

能整个流程跑通,还是挺不错的。后期就可以在此基础上,继承增长各种功能了。加油吧,少年!
整体参考自B站视频:黑马步伐员大数据数据湖架构Hudi视频教程。紧张代码逻辑改为pyspark了而已。02--实战案例技术架构--Flume+Kafka+StructuredStreaming+Hudi+Hive+MySQL_哔哩哔哩_bilibili

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4