大数据国赛第1套任务D-子任务一及时数据采集

打印 上一主题 下一主题

主题 826|帖子 826|积分 2478

在主节点使用Flume采集及时数据生成器10050端口的socket数据,将数据存入到Kafka的Topic中(Topic名称为order,分区数为4),使用Kafka自带的消费者消费order(Topic)中的数据,将前2条数据的效果截图粘贴至客户端桌面【Release任务D提交效果.docx】中对应的任务序号下;
  1. 新建getstreamingdata10050.conf文件,编写flume配置实现监听10050端口,将收到的数据发送的kafka的order主题,代码如下:
  2. # 给这个代理上的组件命名
  3. # 定义一个名为 r1 的数据源
  4. a1.sources = r1
  5. # 定义一个名为 k1 的数据汇
  6. a1.sinks = k1
  7. # 定义一个名为 c1 的通道
  8. a1.channels = c1
  9. # 描述/配置数据源
  10. # 数据源的类型为 netcat
  11. a1.sources.r1.type = netcat
  12. # 数据源绑定到本地主机
  13. a1.sources.r1.bind = localhost
  14. # 数据源监听端口为 10050
  15. a1.sources.r1.port = 10050
  16. # 描述 KafkaSink 数据汇
  17. # 数据汇的类型为 KafkaSink
  18. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  19. # Kafka 服务器的地址
  20. a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
  21. # Kafka 主题的名称为 order
  22. a1.sinks.k1.kafka.topic = order
  23. # 使用一个在内存中缓冲事件的通道
  24. # 通道的类型为 memory
  25. a1.channels.c1.type = memory
  26. # 通道的容量为 1000
  27. a1.channels.c1.capacity = 1000
  28. # 通道的事务容量为 100
  29. a1.channels.c1.transactionCapacity = 100
  30. # 将数据源和数据汇绑定到通道
  31. # 将数据源 r1 绑定到通道 c1
  32. a1.sources.r1.channels = c1
  33. # 将数据汇 k1 绑定到通道 c1
  34. a1.sinks.k1.channel = c1
  35. 新建order_data_generator.sh文件,编写脚本实现将MySQL中的order_info表的数据导出到csv文件,然后将csv文件中的内容通过socket发送到10050端口。代码如下:
  36. mysql -uroot1 -p123456 -e"
  37. SELECT * INTO OUTFILE '/var/lib/mysql-files/order_info.csv'
  38. FIELDS TERMINATED BY ','
  39. ENCLOSED BY ''
  40. LINES TERMINATED BY 'n'
  41. FROM shtd_store.order_info
  42. "
  43. sudo apt-get update
  44. sudo apt-get install telnet -y
  45. cat /var/lib/mysql-files/order_info.csv |nc localhost 10050
  46. --1.7运行数据生成脚本
  47. 在终端执行如下命令,运行数据生成脚本
  48. bash order_data_generator.sh
  49. --1.8.查看结果数据
  50. 在终端执行如下命令,使用Kafka自带的消费者消费order(Topic)中将前2条数据。
  51. /opt/kafka_2.12-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --
复制代码
   采用多路复用模式,Flume吸收数据注入kafka 的同时,将数据备份到HDFS目录/user/test/flumebackup下,将查看备份目录下的第一个文件的前2条数据的下令与效果截图粘贴至客户端桌面【Release任务D提交效果.docx】中对应的任务序号下。
  1. 在终端执行如下命令,启动Hadoop、Zookeeper、Kafka环境
  2. /opt/hadoop-3.2.4/sbin/start-all.sh
  3. zkServer.sh start
  4. /opt/kafka_2.12-2.4.1/bin/kafka-server-start.sh -daemon /opt/kafka_2.12-2.4.1/config/server.properties
  5. --2.3创建order1主题
  6. 在终端执行如下命令,利用kafka的命令行工具创建order1主题并设置为4个分区
  7. /opt/kafka_2.12-2.4.1/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4 --topic order1
  8. 创建后查看一下主题是否创建成功
  9. /opt/kafka_2.12-2.4.1/bin/kafka-topics.sh --zookeeper localhost:2181 --list
  10. --2.4编写flume配置
  11. 新建mutisinks.conf文件,编写flume配置实现监听10051端口,将收到的数据发送的kafka的order1主题和将数据备份到hdfs的/user/test/flumebackup/目录,代码如下:
  12. # 给这个代理上的组件命名
  13. # 定义一个名为 r1 的数据源
  14. a1.sources = r1
  15. # 定义两个数据汇,分别为 k1 和 k2
  16. a1.sinks = k1 k2
  17. # 定义两个通道,分别为 c1 和 c2
  18. a1.channels = c1 c2
  19. # 描述/配置数据源
  20. # 数据源的类型为 netcat
  21. a1.sources.r1.type = netcat
  22. # 数据源绑定到本地主机
  23. a1.sources.r1.bind = localhost
  24. # 数据源监听端口为 10051
  25. a1.sources.r1.port = 10051
  26. # 描述 HDFS 数据汇
  27. # 数据汇的类型为 HDFS
  28. a1.sinks.k1.type = hdfs
  29. # HDFS 存储路径
  30. a1.sinks.k1.hdfs.path = /user/test/flumebackup/
  31. # HDFS 文件前缀
  32. a1.sinks.k1.hdfs.filePrefix = log-
  33. # 描述 KafkaSink 数据汇
  34. # 数据汇的类型为 KafkaSink
  35. a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
  36. # Kafka 服务器的地址
  37. a1.sinks.k2.kafka.bootstrap.servers = localhost:9092
  38. # Kafka 主题的名称为 order1
  39. a1.sinks.k2.kafka.topic = order1
  40. # 文件路径下采用了%Y,系统会以日期创建文件夹
  41. # filePrefix 文件的前缀
  42. # 从临时文件变正式文件时间 s
  43. a1.sinks.k1.hdfs.rollInterval = 10
  44. # 文件大小
  45. #rollSize 定义了在多大的数据量下触发一个滚动(Roll)操作。
  46. #67108864 表示 64 MB,即当写入的数据达到 64 MB 时,会触发一个新的文件滚动,生成一个新的 HDFS 文件
  47. a1.sinks.k1.hdfs.rollSize = 67108864
  48. #rollCount 定义了在多少事件数量下触发一个滚动操作。
  49. #0 表示不基于事件数量触发滚动,仅基于数据量 (rollSize) 触发滚动。
  50. a1.sinks.k1.hdfs.rollCount = 0
  51. #useLocalTimeStamp 决定是否在 HDFS 文件名中使用本地时间戳
  52. #如果设置为 true,则文件名可能包含本地写入时的时间戳,以提供时间戳信息。
  53. a1.sinks.k1.hdfs.useLocalTimeStamp = true
  54. #fileType 定义了写入 HDFS 的文件类型。
  55. #在这里,设置为 DataStream 表示使用流式数据(DataStream)的方式写入文件。这意味着文件会不断地追加新的数据而不会清空文件。
  56. a1.sinks.k1.hdfs.fileType = DataStream
  57. # rollSize这里是以B为单位,这里是64MB
  58. # fileType监听方式是流
  59. # 使用内存中缓冲事件的通道
  60. # 通道 c2 的类型为 memory
  61. a1.channels.c2.type = memory
  62. # 通道 c2 的容量为 1000
  63. a1.channels.c2.capacity = 1000
  64. # 通道 c2 的事务容量为 100
  65. a1.channels.c2.transactionCapacity = 100
  66. # 通道 c1 的类型为 memory
  67. a1.channels.c1.type = memory
  68. # 通道 c1 的容量为 1000
  69. a1.channels.c1.capacity = 1000
  70. # 通道 c1 的事务容量为 100
  71. a1.channels.c1.transactionCapacity = 100
  72. # 将数据源和数据汇绑定到通道
  73. # 将数据源 r1 绑定到通道 c1 和 c2
  74. a1.sources.r1.channels = c1 c2
  75. # 将数据汇 k1 绑定到通道 c1
  76. a1.sinks.k1.channel = c1
  77. # 将数据汇 k2 绑定到通道 c2
  78. a1.sinks.k2.channel = c2
  79. --2.5启动flume
  80. 在终端执行如下命令,使用编写的配置文件启动flume监听10051端口,并把event数据写入到kafka中和HDFS中。
  81. /opt/apache-flume-1.9.0-bin/bin/flume-ng  agent -c /opt/apache-flume-1.9.0-bin/conf/ -n a1 -f /rgsoft/Desktop/Study/task/mutisinks.conf -Dflume.root.logger=INFO,console
  82. --2.6编写数据生成脚本
  83. 新建order_data_generator2.sh文件,编写脚本实现将MySQL中的order_info表的数据导出到csv文件,然后将csv文件中的内容通过socket发送到10051端口。代码如下:
  84. mysql -uroot1 -p123456 -e"
  85. SELECT * INTO OUTFILE '/var/lib/mysql-files/order_info.csv'
  86. FIELDS TERMINATED BY ','
  87. ENCLOSED BY ''
  88. LINES TERMINATED BY 'n'
  89. FROM shtd_store.order_info
  90. "
  91. sudo apt-get update
  92. sudo apt-get install telnet -y
  93. cat /var/lib/mysql-files/order_info.csv |nc localhost 10051
  94. --2.7运行数据生成脚本
  95. 在终端执行如下命令,运行数据生成脚本
  96. bash order_data_generator2.sh
  97. --2.8查看结果数据
  98. 在终端执行如下命令,使用Kafka自带的消费者消费order(Topic)中将前2条数据。
  99. /opt/kafka_2.12-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --to
复制代码
 
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王海鱼

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表