在主节点使用Flume采集及时数据生成器10050端口的socket数据,将数据存入到Kafka的Topic中(Topic名称为order,分区数为4),使用Kafka自带的消费者消费order(Topic)中的数据,将前2条数据的效果截图粘贴至客户端桌面【Release任务D提交效果.docx】中对应的任务序号下;
- 新建getstreamingdata10050.conf文件,编写flume配置实现监听10050端口,将收到的数据发送的kafka的order主题,代码如下:
- # 给这个代理上的组件命名
- # 定义一个名为 r1 的数据源
- a1.sources = r1
- # 定义一个名为 k1 的数据汇
- a1.sinks = k1
- # 定义一个名为 c1 的通道
- a1.channels = c1
- # 描述/配置数据源
- # 数据源的类型为 netcat
- a1.sources.r1.type = netcat
- # 数据源绑定到本地主机
- a1.sources.r1.bind = localhost
- # 数据源监听端口为 10050
- a1.sources.r1.port = 10050
- # 描述 KafkaSink 数据汇
- # 数据汇的类型为 KafkaSink
- a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
- # Kafka 服务器的地址
- a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
- # Kafka 主题的名称为 order
- a1.sinks.k1.kafka.topic = order
- # 使用一个在内存中缓冲事件的通道
- # 通道的类型为 memory
- a1.channels.c1.type = memory
- # 通道的容量为 1000
- a1.channels.c1.capacity = 1000
- # 通道的事务容量为 100
- a1.channels.c1.transactionCapacity = 100
- # 将数据源和数据汇绑定到通道
- # 将数据源 r1 绑定到通道 c1
- a1.sources.r1.channels = c1
- # 将数据汇 k1 绑定到通道 c1
- a1.sinks.k1.channel = c1
- 新建order_data_generator.sh文件,编写脚本实现将MySQL中的order_info表的数据导出到csv文件,然后将csv文件中的内容通过socket发送到10050端口。代码如下:
- mysql -uroot1 -p123456 -e"
- SELECT * INTO OUTFILE '/var/lib/mysql-files/order_info.csv'
- FIELDS TERMINATED BY ','
- ENCLOSED BY ''
- LINES TERMINATED BY 'n'
- FROM shtd_store.order_info
- "
- sudo apt-get update
- sudo apt-get install telnet -y
- cat /var/lib/mysql-files/order_info.csv |nc localhost 10050
- --1.7运行数据生成脚本
- 在终端执行如下命令,运行数据生成脚本
- bash order_data_generator.sh
- --1.8.查看结果数据
- 在终端执行如下命令,使用Kafka自带的消费者消费order(Topic)中将前2条数据。
- /opt/kafka_2.12-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --
复制代码 采用多路复用模式,Flume吸收数据注入kafka 的同时,将数据备份到HDFS目录/user/test/flumebackup下,将查看备份目录下的第一个文件的前2条数据的下令与效果截图粘贴至客户端桌面【Release任务D提交效果.docx】中对应的任务序号下。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |