ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Flume+Kafka+StructuredStreaming(pyspark)+Mysql分布式采集与微批处置惩罚
[打印本页]
作者:
农妇山泉一亩田
时间:
2024-12-4 18:50
标题:
Flume+Kafka+StructuredStreaming(pyspark)+Mysql分布式采集与微批处置惩罚
下面根据数据流向逐一介绍 Flume -> Kafka -> StructuredStreaming -> Mysql
目录
下面根据数据流向逐一介绍 Flume -> Kafka -> StructuredStreaming -> Mysql
1. Flume
2. Kafka
3. 生产数据
4. StructuredStreaming
5. Mysql
6. 启动步伐
1. Flume
Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files. If the new lines are being written, this source will retry reading them in wait for the completion of the write.
为了监测某个目录下,所有文件是否有新的行产生,因此,我们source 用 Taildir Source 见官网
Flume 1.11.0 User Guide — Apache Flume
我在node3上构建了Flume。
a1.sources =s1
a1.channels=c1
a1.sinks = k1
#define s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile=/export/server/flume/position/taildir_7mo_kafka.json
#将所有需要监控的数据源变成一个组
a1.sources.s1.filegroups =f1
#指定了f1是谁:监控目录下所有文件
a1.sources.s1.filegroups.f1 =/export/data/7mo_data/.*
#指定f1采集到的数拟的header中包含一个KV对
a1.sources.s1.headers.f1.type = 7mo
a1.sources.s1.fileHeader = true
复制代码
此中 /export/data/7mo_data 以及 /export/server/flume/position/ 目录自己先创建。
我们的sink是kafka,因此Kafka sink 见官网
Flume 1.11.0 User Guide — Apache Flume
具体如下(channels 也一起附上了):
#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=1000
#define k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic=7MO-MSG
a1.sinks.k1.kafka.bootstrap.servers= node2:9092
a1.sinks.k1.kafka.flumeBatchsize =10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms=100
#bind
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1
复制代码
留意topic, servers的设置,根据现实环境改。
2. Kafka
留意是创建topic即可,我在node2上创建(记得先去一键启动zookeeper和kafka)
# 先进入kafka目录
cd /export/server/kafka
# 创建topic
bin/kafka-topics.sh --create --topic 7MO-MSG \
--partitions 3 --replication-factor 2 \
--bootstrap-server node2:9092
复制代码
3. 生产数据
采用momo聊天消息。示例数据如下,采用 ‘\t’ 分开各个字段,模拟数据可在文章顶头处下载。
直接上代码,每隔1秒钟产生一条数据,数据来自data1.tsv文件。
import csv
import argparse
import time
parser = argparse.ArgumentParser(description='momo information')
parser.add_argument('--src', dest='srcFile', type=str, help='sourceFile')
parser.add_argument('--dst', dest='dstFile', type=str, help='destinationFile')
parser.add_argument('--sec', dest='second', default=1, type=str, help='the time of generate one data ')
args = parser.parse_args()
def writeToDst(row):
with open(args.dstFile, 'a') as fw:
line = '\t'.join(row) + '\n'
fw.write(line)
def main():
with open(args.srcFile, 'r', encoding='utf-8') as fr:
reader = csv.reader(fr,delimiter='\t')
for row in reader:
writeToDst(row)
time.sleep(args.second)
if __name__=="__main__":
main()
复制代码
src 是data1.tsv文件路径,dst是flume要监控的文件目录下的具体文件路径,sec是多少秒采集一条记载。
4. StructuredStreaming
#!/root/anaconda3/envs/pyspark_env/bin/ python3
from pyspark.sql import SparkSession, DataFrame
import os
from pyspark.sql.functions import col, split, to_timestamp
from pyspark.sql.types import StructType, StringType, StructField
import argparse
parser = argparse.ArgumentParser(description='argparse mysql')
parser.add_argument('--user', '-us', type=str, default="root", help="user")
parser.add_argument('--password', '-p', type=str, default='123456', help='password')
parser.add_argument('--driver', '-dr', type=str, default='com.mysql.jdbc.Driver')
parser.add_argument('--dbtable', '-db', type=str, default='Momo', help="table name")
parser.add_argument('--url', '-ur', type=str, default='jdbc:mysql://node1:3306/spark?useSSL=false&useUnicode=yes&characterEncoding=utf8', help="url name")
parser.add_argument('--batchsize', '-b', type=int, default=100, help="batch size")
schema_type = StructType([
StructField("msg_time", StringType()),
StructField("sender_account", StringType()),
StructField("receiver_account", StringType()),
StructField("context", StringType())
])
def insert2mysql(df: DataFrame, batch):
prop = parser.parse_args()
print("batch:{} is start".format(batch))
# df.write.jdbc("jdbc:mysql://node1:3306/spark", 'student', 'append', prop)
df.write.format("jdbc") \
.mode("append") \
.options(user=prop.user,
password=prop.password,
driver=prop.driver,
dbtable=prop.dbtable,
url=prop.url,
batchsize=prop.batchsize
) \
.save()
def createSparkSession():
spark = SparkSession \
.builder \
.appName("StructuredKafkaWordCount") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()
# config("spark.sql.shuffle.partitions",2) 可能需要在spark-submit提交才有效,但是检查点需要改
spark.sparkContext.setLogLevel('WARN')
# spark.conf.set("spark.sql.shuffle.partitions", 2)
return spark
def readFromKafka(spark: SparkSession, topicName):
lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "node2:9092") \
.option("subscribe", topicName) \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.load()
return lines
def process(kafkaStream):
# 获取数据
messageDF = kafkaStream.selectExpr("CAST(value AS STRING)").select(split(col("value"),"\t").alias("m"))
messageDF = messageDF.select(col("m")[0].alias("msg_time"),
col("m")[1].alias("sender_account"),
col("m")[9].alias("receiver_account"),
col("m")[19].alias("context"))
messageDF = messageDF.withColumn("msg_time", to_timestamp(col("msg_time"), "yyyy-MM-dd HH:mm:ss"))
messageDF = messageDF.selectExpr("CONCAT( CAST(msg_time AS STRING), '%', sender_account ) AS id" , "*")
# messageDF = messageDF.withColumn("msg_time", from_unixtime(messageDF.msg_time.cast("bigint"), 'yyyy-MM-dd HH:mm:ss'))
# messageDF=messageDF.select(from_json(col("value"),schema_type).alias("data")).select("data.msg_time")
return messageDF
# 测试,将数据打印到控制台
def printToConsole(streamDF):
streamDF \
.writeStream \
.outputMode("Append") \
.format("console") \
.option("numRows", "10") \
.option("truncate", "false") \
.option("checkpointLocation", "file:///tmp/kafka-sink-cp") \
.trigger(processingTime="8 seconds") \
.start()
if __name__ == "__main__":
# 构建sparksession实例对象
spark = createSparkSession()
# 从kafka实时消费数据
topicName = "7MO-MSG"
kafkaStream = readFromKafka(spark, topicName)
# 提取数据,转换数据类型
stream = process(kafkaStream)
# 打印输出到控制台
# printToConsole(stream)
# 保存数据至mysql
stream.writeStream \
.outputMode("append") \
.foreachBatch(insert2mysql) \
.trigger(processingTime="8 seconds") \
.start()
spark.streams.awaitAnyTermination()
复制代码
自行看即可,要留意,pyspark中structured Streaming的流转成rdd大概dataset有问题(scala的简单的多,没办法学校紧张走的是python),需要直接在流上举行各种转换的操纵方可。
5. Mysql
创建好数据库和表就行了。我的是mysql5,紧张数据库和表是utf8格式的,要不中文显示会出问题。
create table Momo(
id VARCHAR(30),
msg_time TIMESTAMP,
sender_account VARCHAR(20),
receiver_account VARCHAR(20),
context VARCHAR(500)
)
复制代码
-----------------------------------------------------------------------------------------
6. 启动步伐
启动好zookeeper和kafka
启动flmue,
我在node3启动
/export/server/flume/bin/flume-ng agent -n a1 -c /export/server/flume/conf/ -f /export/server/flume/conf/7mo_mem_kafka.properties -Dflume.root.logger=INFO,console
复制代码
启动kafka消费者
我在node2启动
/export/server/kafka/bin/kafka-console-consumer.sh --topic 7MO-MSG --bootstrap-server node2:9092
复制代码
启动spark步伐
我在node1启动
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2 ch07_flume_kafka_spark_consumer.py
复制代码
查看数据库内容
数据库在node1了,我在windows用navicat查看数据。
能整个流程跑通,还是挺不错的。后期就可以在此基础上,继承增长各种功能了。加油吧,少年!
整体参考自B站视频:黑马步伐员大数据数据湖架构Hudi视频教程。紧张代码逻辑改为pyspark了而已。02--实战案例技术架构--Flume+Kafka+StructuredStreaming+Hudi+Hive+MySQL_哔哩哔哩_bilibili
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4