商道如狼道 发表于 2024-8-19 07:27:49

Spark Steaming有状态转换实行

创建一个streaming目录
mkdir streaming 一、运行网络版的WordCount
1. 连接虚拟机后利用sudo打开hosts后加入赤色方框内语句并生存:
   sudo vim /etc/hosts
  
https://i-blog.csdnimg.cn/direct/a7bc5e0454a9415a983d38ff59b37566.png
Netcat是一个用于TCP/UDP连接和监听的Linux工具, 重要用于网络传输及调试领域。先下载:
sudo apt-get update sudo apt-get -y install netcat-traditional 2. 启动 NetCat 服务端,并在1234端口监听
nc -lk  1234    #在lsn下面输入nc –l –p 1234

[*]利用xshell 打开一个新的选项卡,连接虚拟机。启动NetCat客户端,并连接Netcat服务端
nc localhost  1234
注意:假如客户端和服务端不在同一台机器,localhost 可以换成现实IP。

[*]在服务端输入以下字符串,并按回车,可以在客户端收到消息,并打印出来。这里注意更换学号为你个人学号。
hello 你的学号

[*]在客户端输入字符串,并按回车,可以在服务端收到消息,并打印出来。这里注意更换学号为你个人学号。
你好  你的学号

[*]在 NetCat 客户端的选项卡利用quit停止客户端历程。
[*]利用有状态操作updateStateByKey实现词频统计。
在streaming目录下新建一个stateful目录,用于生存持久化的数据;接着编写独立的NetWordCountStateful.py代码
1
from pyspark import SparkContext
2
from pyspark.streaming import StreamingContext
3
sc = SparkContext("local","NetworkWordCountStateful")
4
ssc = StreamingContext(sc,10)
5

6
ssc.checkpoint("file:///home/ubuntu/streaming/stateful")
7
def updateFunction(newValues, runningCount):
8
    if runningCount is None:
9
        runningCount = 0
10
    return sum(newValues, runningCount)
11
lines = ssc.socketTextStream('localhost', 1234)
12
running_counts = lines.flatMap(lambda line:line.split(' ')).map(lambda x:(x,1)).updateStateByKey(updateFunction)
13
running_counts.pprint()
14
ssc.start()
15
ssc.awaitTermination()
注意:有状态转换必要举行设置检查点。

新建一个终端,开启服务端

nc -l -p 1234
再建一个“流计算”终端,运行NetWordCountStateful.py代码:

cd $SPARK_HOME/bin

spark-submit /路径/NetWordCountStateful.py

[*]在 NetCat 服务端输入以下字符串,并按回车,观察Streaming WordCount的输出,并截图。
You jump I jump 1234

[*]在客户端查察统计效果:
https://i-blog.csdnimg.cn/direct/2dce3f2453524f7787d20547addd89b3.png
再次在服务端口输入以下字符串:
You and I jump 1234
回车后再次观察Streaming WordCount的输出,是否是累加后的效果。
可以用quit或ctrl+c停掉客户端,利用xshell的回滚来查察效果,由于回滚较快,以是在运行状态下查察效果截图较困难。

二、利用滑动窗口实现WordCount
1. 创建文件流监听目录:
mkdir logfile
cd logfile

[*]在streaming目录下新建一个code文件夹,用于持久化数据的存储;然后新开一个终端,输入“pyspark”进入PySpark交互式情况后,输入以下代码:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc,10)
ssc.checkpoint("file:///home/ubuntu/streaming/code")
lines = ssc.textFileStream("file:///home/ubuntu/streaming/logfile")
counts = lines.flatMap(lambda x:x.split(' ')).map(lambda x:(x,1)).reduceByKeyAndWindow(lambda x, y:x+y, lambda x,y:x-y, 30, 10)
counts.pprint()
ssc.start()
ssc.awaitTermination()
输入ssc.start()后,程序就开始自动进入循环监听状态,如下图所示:
https://i-blog.csdnimg.cn/direct/756676b29eb94e12929dcefa3c010ae4.png
打开一个新的shell窗口,切换到logfile目录下,创建一个log.txt文档生存,再创建一个log_new.txt文档生存,里面输入一些随意的单词,并用空格间隔开。查察监听页面,可以看到打印效果,例如下图所示:
https://i-blog.csdnimg.cn/direct/62a50731fe7a4a8cbf9b0460c973a925.png
注:log.txt输入为“a b c a b c d”,log_new.txt输入为“a b d e f a b e f”。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Spark Steaming有状态转换实行