 这是最后flume采集数据放在hive里的数据
zookeeper版本:zookeeper-3.4.6
kafka版本:kafka_2.11-2.4.1
flume版本:apache-flume-1.9.0-bin
hive版本:apache-hive-3.1.2-bin.tar.gz
通过百度网盘分享的文件:软件
链接:https://pan.baidu.com/s/13z96NvXWdtUotCzrLWFCTA?pwd=huyc
提取码:huyc
复制这段内容打开「百度网盘APP 即可获取」
1、hive设置
建表时要进行分桶、赋予事务性,必要对hive进行设置
修改hive-site.xml文件:
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
<description>
Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive
transactions, which also requires appropriate settings for hive.compactor.initiator.on,
hive.compactor.worker.threads, hive.support.concurrency (true),
and hive.exec.dynamic.partition.mode (nonstrict).
The default DummyTxnManager replicates pre-Hive-0.13 behavior and provides
no transactions.
</description>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
<description>
Whether Hive supports concurrency control or not.
A ZooKeeper instance must be up and running when using zookeeper Hive lock manager
</description>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://localhost:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
- 1、hive配置
- 建表时要进行分桶、赋予事务性,需要对hive进行配置
- 修改hive-site.xml文件:
- <property>
- <name>hive.txn.manager</name>
- <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
- <description>
- Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive
- transactions, which also requires appropriate settings for hive.compactor.initiator.on,
- hive.compactor.worker.threads, hive.support.concurrency (true),
- and hive.exec.dynamic.partition.mode (nonstrict).
- The default DummyTxnManager replicates pre-Hive-0.13 behavior and provides
- no transactions.
- </description>
- </property>
- <property>
- <name>hive.support.concurrency</name>
- <value>true</value>
- <description>
- Whether Hive supports concurrency control or not.
- A ZooKeeper instance must be up and running when using zookeeper Hive lock manager
- </description>
- </property>
- <property>
- <name>hive.metastore.uris</name>
- <value>thrift://localhost:9083</value>
- <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
- </property>
复制代码 2.启动hive
cd /opt/server/hive/bin
hiveservice.sh start
./hiveserver2
hive
3.在hive里建库建表
- create database jobdatabase;
- show databases;
- use jobdatabase;
- CREATE TABLE jobdatabase.jobtable (
- name STRING,
- money INT,
- company STRING,
- address STRING,
- type STRING
- )
- CLUSTERED BY (name) INTO 10 BUCKETS
- STORED AS ORC
- TBLPROPERTIES ('transactional'='true');
复制代码 4.将`/opt/server/hive/hcatalog/share/hcatalog/hive-hcatalog-streaming-3.1.2.jar`拷贝到flume的lib文件夹下
创建conf文件
- a.sources=source_from_kafka
- a.channels=mem_channel
- a.sinks=hive_sink
- #kafka为souce的配置
- a.sources.source_from_kafka.type=org.apache.flume.source.kafka.KafkaSource
- a.sources.source_from_kafka.batchSize=10
- a.sources.source_from_kafka.kafka.bootstrap.servers=localhost:9092
- a.sources.source_from_kafka.topic=test
- a.sources.source_from_kafka.channels=mem_channel
- a.sources.source_from_kafka.consumer.timeout.ms=1000
- #hive为sink的配置
- a.sinks.hive_sink.type=hive
- a.sinks.hive_sink.hive.metastore=thrift://localhost:9083
- a.sinks.hive_sink.hive.database=hivedatabase
- a.sinks.hive_sink.hive.table=hivetable
- a.sinks.hive_sink.hive.txnsPerBatchAsk=2
- a.sinks.hive_sink.batchSize=10
- a.sinks.hive_sink.serializer=JSON
- a.sinks.hive_sink.serializer.fieldnames=id,name,age
- #channel的配置
- a.channels.mem_channel.type=memory
- a.channels.mem_channel.capacity=1000
- a.channels.mem_channel.transactionCapacity=100
- #三者之间的关系
- a.sources.source_from_kafka.channels=mem_channel
- a.sinks.hive_sink.channel=mem_channel
复制代码 5.启动flume
flume-ng agent -n a -c /opt/flume/conf -f /opt/flume/conf/kafka2hive.conf -Dflume.root.logger=INFO,console
- flume-ng agent -n a -c /opt/flume/conf -f /opt/flume/conf/kafka2hive.conf -Dflume.root.logger=INFO,console
复制代码 6.开启Kafka
kafka-console-producer.sh --broker-list localhost:9092 --topic test;(启动Kafka)
- kafka-console-producer.sh --broker-list localhost:9092 --topic test;(启动Kafka)
复制代码 7.向kafka的topic中传输JSON格式的数据:
- {"name":"大数据平台技术专家-中台方向","money":24000,"company":"杭州风扬网络科技有限公司","address":"成都","type":"民营"}
- {"name":"初级大数据开发工程师","money":6500,"company":"成都匠翎信息技术有限公司","address":"成都·高新区","type":"民营"}
- {"name":"大数据研发岗(J47618)","money":22500,"company":"京东方科技集团股份有限公司","address":"成都·郫都区","type":"已上市"}
复制代码 8.查看hive表:
select * from jobdatabase.jobtable;
到此结束,做的时间仔细
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |