flume接kafka数据入hive

打印 上一主题 下一主题

主题 829|帖子 829|积分 2487

这是最后flume采集数据放在hive里的数据
zookeeper版本:zookeeper-3.4.6
kafka版本:kafka_2.11-2.4.1
flume版本:apache-flume-1.9.0-bin
hive版本:apache-hive-3.1.2-bin.tar.gz
通过百度网盘分享的文件:软件
链接:https://pan.baidu.com/s/13z96NvXWdtUotCzrLWFCTA?pwd=huyc 
提取码:huyc
复制这段内容打开「百度网盘APP 即可获取」
1、hive设置
    建表时要进行分桶、赋予事务性,必要对hive进行设置
修改hive-site.xml文件:
<property>
    <name>hive.txn.manager</name>
    <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
    <description>
      Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive
      transactions, which also requires appropriate settings for hive.compactor.initiator.on,
      hive.compactor.worker.threads, hive.support.concurrency (true),
      and hive.exec.dynamic.partition.mode (nonstrict).
      The default DummyTxnManager replicates pre-Hive-0.13 behavior and provides
      no transactions.
    </description>
</property>
<property>
    <name>hive.support.concurrency</name>
    <value>true</value>
    <description>
      Whether Hive supports concurrency control or not.
      A ZooKeeper instance must be up and running when using zookeeper Hive lock manager
    </description>
</property>
<property>
    <name>hive.metastore.uris</name>
    <value>thrift://localhost:9083</value>
    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
  1. 1、hive配置
  2.     建表时要进行分桶、赋予事务性,需要对hive进行配置
  3. 修改hive-site.xml文件:
  4. <property>
  5.     <name>hive.txn.manager</name>
  6.     <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
  7.     <description>
  8.       Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive
  9.       transactions, which also requires appropriate settings for hive.compactor.initiator.on,
  10.       hive.compactor.worker.threads, hive.support.concurrency (true),
  11.       and hive.exec.dynamic.partition.mode (nonstrict).
  12.       The default DummyTxnManager replicates pre-Hive-0.13 behavior and provides
  13.       no transactions.
  14.     </description>
  15. </property>
  16. <property>
  17.     <name>hive.support.concurrency</name>
  18.     <value>true</value>
  19.     <description>
  20.       Whether Hive supports concurrency control or not.
  21.       A ZooKeeper instance must be up and running when using zookeeper Hive lock manager
  22.     </description>
  23. </property>
  24. <property>
  25.     <name>hive.metastore.uris</name>
  26.     <value>thrift://localhost:9083</value>
  27.     <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
  28. </property>
复制代码
 2.启动hive 
 cd /opt/server/hive/bin
 hiveservice.sh start
 ./hiveserver2
hive
3.在hive里建库建表
  1. create database jobdatabase;
  2. show databases;
  3. use jobdatabase;
  4. CREATE TABLE jobdatabase.jobtable (
  5.        name STRING,
  6.        money INT,
  7.        company STRING,
  8.        address STRING,
  9.        type STRING
  10.     )
  11.     CLUSTERED BY (name) INTO 10 BUCKETS
  12.     STORED AS ORC
  13.     TBLPROPERTIES ('transactional'='true');
复制代码
4.将`/opt/server/hive/hcatalog/share/hcatalog/hive-hcatalog-streaming-3.1.2.jar`拷贝到flume的lib文件夹下

创建conf文件

  1. a.sources=source_from_kafka
  2. a.channels=mem_channel
  3. a.sinks=hive_sink
  4. #kafka为souce的配置
  5. a.sources.source_from_kafka.type=org.apache.flume.source.kafka.KafkaSource
  6. a.sources.source_from_kafka.batchSize=10
  7. a.sources.source_from_kafka.kafka.bootstrap.servers=localhost:9092
  8. a.sources.source_from_kafka.topic=test
  9. a.sources.source_from_kafka.channels=mem_channel
  10. a.sources.source_from_kafka.consumer.timeout.ms=1000
  11. #hive为sink的配置
  12. a.sinks.hive_sink.type=hive
  13. a.sinks.hive_sink.hive.metastore=thrift://localhost:9083
  14. a.sinks.hive_sink.hive.database=hivedatabase
  15. a.sinks.hive_sink.hive.table=hivetable
  16. a.sinks.hive_sink.hive.txnsPerBatchAsk=2
  17. a.sinks.hive_sink.batchSize=10
  18. a.sinks.hive_sink.serializer=JSON
  19. a.sinks.hive_sink.serializer.fieldnames=id,name,age
  20. #channel的配置
  21. a.channels.mem_channel.type=memory
  22. a.channels.mem_channel.capacity=1000
  23. a.channels.mem_channel.transactionCapacity=100
  24. #三者之间的关系
  25. a.sources.source_from_kafka.channels=mem_channel
  26. a.sinks.hive_sink.channel=mem_channel
复制代码
5.启动flume
flume-ng agent -n a -c /opt/flume/conf -f /opt/flume/conf/kafka2hive.conf -Dflume.root.logger=INFO,console

  1. flume-ng agent -n a -c /opt/flume/conf -f /opt/flume/conf/kafka2hive.conf -Dflume.root.logger=INFO,console
复制代码
6.开启Kafka
kafka-console-producer.sh --broker-list localhost:9092 --topic test;(启动Kafka)

  1. kafka-console-producer.sh --broker-list localhost:9092 --topic test;(启动Kafka)
复制代码
7.向kafka的topic中传输JSON格式的数据:
  1. {"name":"大数据平台技术专家-中台方向","money":24000,"company":"杭州风扬网络科技有限公司","address":"成都","type":"民营"}
  2. {"name":"初级大数据开发工程师","money":6500,"company":"成都匠翎信息技术有限公司","address":"成都·高新区","type":"民营"}
  3. {"name":"大数据研发岗(J47618)","money":22500,"company":"京东方科技集团股份有限公司","address":"成都·郫都区","type":"已上市"}
复制代码
8.查看hive表:
select * from jobdatabase.jobtable;

到此结束,做的时间仔细

 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

曹旭辉

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表