文章目次
及时同步MySQL数据到Hive
一、开启MySQL的binlog日记
1、登录mysql检察MySQL是否开启binlog日记
2 、开启mysql binlog日记
3、重启mysql 服务,重新检察binlog日记环境
二、设置“CaptureChangeMySQL”处理器
1、创建“CaptureChangeMySQL”处理器
2、设置“DistributeMapCacheServer”控制服务
3、设置“SCHEDULING”
4、设置“PROPERTIES”
5、启动MySQL,创建表“test2”测试“CaptureChangeMySQL”处理器
三、设置“RouteOnAttribute”处理器
1、创建“RouteOnAttribute”处理器
2、设置“PROPERTIES”自定义属性
3、连接“CaptureChangeMySQL”处理器与“RouteOnAttribute”处理器
四、设置“EvaluatejsonPath”处理器
1、设置“EvaluatejsonPath”的“PROPERTIES”属性
2、连接“RouteOnAttribute”处理器和“EvaluatejsonPath”处理器
五、设置“ReplaceText”处理器
1、设置“RelaceText”处理器“PROPERTIES”属性
2、连接“EvaluatejsonPath”处理器与“ReplaceText”处理器
六、设置Hive 支持HiveServer2
1、在Hive服务端设置hive-site.xml
2、在每台Hadoop 节点设置core-site.xml
3、重启HDFS ,Hive ,在Hive服务端启动Metastore和HiveServer2服务
4、在客户端通过beeline连接Hive
七、设置“PutHiveQL”处理器
1、创建“PutHiveQL”处理器
2、 设置“PROPERTIES”
3、连接“ReplaceText”处理器与“PutHiveQL”处理器并设置关系
八、运行测试
1、在Hive中创建表“test2”
2、启动NiFi处理数据流程,向MySQL中写入数据,检察Hive中表数据
及时同步MySQL数据到Hive
案例:将mysql中新增的数据及时同步到Hive中。
以上案例需要用到的处理器有:“CaptureChangeMySQL”、“RouteOnAttribute”、“EvaluateJsonPath”、“ReplaceText”、“PutHiveQL”。
起首通过“CaptureChangeMySQL”读取MySQL中数据的变化(需要开启MySQL binlog日记),将Binlog中变化的数据同步到“RouteOnAttribute”处理器,通过此处理器获取上游数据属性,获取对应binlog操纵类型,再将想要处理的数据路由到“EvaluateJsonPath”处理器,该处理器可以将json格式的binlog数据解析,通过自定义json 表达式获取json数据中的属性放入FlowFile属性,将FlowFile通过“ReplaceText”处理器获取上游FowFile属性,动态拼接sql替换全部的FlowFile内容,将拼接好的sql组成FlowFile路由到“PutHiveQL”将数据写入到Hive表。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |