Dolphinscheduler配置dataX离线采集任务写入hive实践(二)

打印 上一主题 下一主题

主题 816|帖子 816|积分 2448


一、 写入hive 配置

dataX 写入hive 不支持直接配置select 语句,必须配置一个json 任务,推荐使用hdfswriter
  1. "writer": {
  2.                     "name": "hdfswriter",
  3.                     "parameter": {
  4.                         "defaultFS": "hdfs://hadoop.fancv.com:9000",
  5.                         "fileType": "text",
  6.                         "path": "/user/hive/warehouse/fancv_center_devdb.db/test_p_text/ct=2020",
  7.                         "fileName": "test_p_file",
  8.                         "column": [
  9.                             {
  10.                                 "name": "id",
  11.                                 "type": "INT"
  12.                             },
  13.                             {
  14.                                 "name": "name",
  15.                                 "type": "STRING"
  16.                             }
  17.                         ],
  18.                         "writeMode": "append",
  19.                         "fieldDelimiter": "\u0001",
  20.                         "compress":"GZIP"
  21.                     }
  22.                 }
复制代码
hdfswriter 时直接写文件,所以回遇到权限问题,上一篇文章中 直接粗暴解决 chmod
但系统和内hive表一般时动态增加的,总不能不绝靠着下令行解决问题吧,后边研究了Hadoop的权限校验。
发现可以在系统中配置情况变量指定用户,如许就可以在dolphins中通过dataX 直接向hive 中写入数据了
1.1 权限报错信息 :

  1. Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=default, access=WRITE, inode="/user/hive/warehouse/sz_center_devdb.db/test_p":anonymous:supergroup:drwxr-xr-x
  2.                 at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:496)
  3.                 at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:336)
  4.                 at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:241)
  5.                 at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1909)
  6.                 at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1893)
  7.                 at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1852)
  8.                 at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:323)
  9.                 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2635)
  10.                 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2577)
  11.                 at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:807)
  12.                 at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:494)
  13.                 at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
  14.                 at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:532)
  15.                 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
  16.                 at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1020)
  17.                 at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:948)
  18.                 at java.security.AccessController.doPrivileged(Native Method)
  19.                 at javax.security.auth.Subject.doAs(Subject.java:422)
  20.                 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
  21.                 at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2952)
  22.                 at org.apache.hadoop.ipc.Client.call(Client.java:1476)
  23.                 at org.apache.hadoop.ipc.Client.call(Client.java:1407)
  24.                 at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
  25.                 at com.sun.proxy.$Proxy10.create(Unknown Source)
  26.                 at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:296)
  27.                 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  28.                 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  29.                 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  30.                 at java.lang.reflect.Method.invoke(Method.java:498)
  31.                 at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
  32.                 at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
  33.                 at com.sun.proxy.$Proxy11.create(Unknown Source)
  34.                 at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1623)
  35.                 ... 14 more
  36.                 at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:40)
  37.                 at com.alibaba.datax.plugin.writer.hdfswriter.HdfsHelper.textFileStartWrite(HdfsHelper.java:317)
  38.                 at com.alibaba.datax.plugin.writer.hdfswriter.HdfsWriter$Task.startWrite(HdfsWriter.java:360)
  39.                 at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:56)
  40.                 at java.lang.Thread.run(Thread.java:750)
  41.         Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=default, access=WRITE, inode="/user/hive/warehouse/sz_center_devdb.db/test_p":anonymous:supergroup:drwxr-xr-x
  42.                 at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:496)
  43.                 at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:336)
  44.                 at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:241)
  45.                 at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1909)
  46.                 at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1893)
  47.                 at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1852)
  48.                 at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:323)
  49.                 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2635)
  50.                 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2577)
  51.                 at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:807)
  52.                 at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:494)
  53.                 at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
  54.                 at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:532)
  55.                 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
  56.                 at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1020)
  57.                 at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:948)
  58.                 at java.security.AccessController.doPrivileged(Native Method)
  59.                 at javax.security.auth.Subject.doAs(Subject.java:422)
  60.                 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
  61.                 at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2952)
复制代码
这里可以通过设置情况变量的方式解决
详情参考: fancv.com
参考资料:https://www.cnblogs.com/chhyan-dream/p/12929362.html
1.2 hive 中文件格式

hive 文件保存有格式区别,假如在dataX 任务里,没有仔细核对hive 文件格式,固然表面上dataX 任务执行成功,但是hive的文件却被粉碎了,导致不可用。
报错信息如下:
  1. org.jkiss.dbeaver.model.exec.DBCException: SQL 错误: java.io.IOException: org.apache.orc.FileFormatException: Malformed ORC file hdfs://shangzhi165:9000/user/hive/warehouse/sz_center_devdb.db/test_p/ct=2020/test_p_file__4f4b257d_b0ad_446c_a3ea_e5ea2e81580c.gz. Invalid postscript length 0
  2.         at org.jkiss.dbeaver.model.impl.jdbc.exec.JDBCResultSetImpl.nextRow(JDBCResultSetImpl.java:183)
  3.         at org.jkiss.dbeaver.model.impl.jdbc.struct.JDBCTable.readData(JDBCTable.java:195)
  4.         at org.jkiss.dbeaver.ui.controls.resultset.ResultSetJobDataRead.lambda$0(ResultSetJobDataRead.java:123)
  5.         at org.jkiss.dbeaver.model.exec.DBExecUtils.tryExecuteRecover(DBExecUtils.java:173)
  6.         at org.jkiss.dbeaver.ui.controls.resultset.ResultSetJobDataRead.run(ResultSetJobDataRead.java:121)
  7.         at org.jkiss.dbeaver.ui.controls.resultset.ResultSetViewer$ResultSetDataPumpJob.run(ResultSetViewer.java:5062)
  8.         at org.jkiss.dbeaver.model.runtime.AbstractJob.run(AbstractJob.java:105)
  9.         at org.eclipse.core.internal.jobs.Worker.run(Worker.java:63)
  10. Caused by: org.apache.hive.service.cli.HiveSQLException: java.io.IOException: org.apache.orc.FileFormatException: Malformed ORC file hdfs://shangzhi165:9000/user/hive/warehouse/sz_center_devdb.db/test_p/ct=2020/test_p_file__4f4b257d_b0ad_446c_a3ea_e5ea2e81580c.gz. Invalid postscript length 0
  11.         at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:300)
  12.         at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:286)
  13.         at org.apache.hive.jdbc.HiveQueryResultSet.next(HiveQueryResultSet.java:374)
  14.         at org.jkiss.dbeaver.model.impl.jdbc.exec.JDBCResultSetImpl.next(JDBCResultSetImpl.java:272)
  15.         at org.jkiss.dbeaver.model.impl.jdbc.exec.JDBCResultSetImpl.nextRow(JDBCResultSetImpl.java:180)
  16.         ... 7 more
  17. Caused by: org.apache.hive.service.cli.HiveSQLException: java.io.IOException: org.apache.orc.FileFormatException: Malformed ORC file hdfs://shangzhi165:9000/user/hive/warehouse/sz_center_devdb.db/test_p/ct=2020/test_p_file__4f4b257d_b0ad_446c_a3ea_e5ea2e81580c.gz. Invalid postscript length 0
  18.         at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:465)
  19.         at org.apache.hive.service.cli.operation.OperationManager.getOperationNextRowSet(OperationManager.java:309)
  20.         at org.apache.hive.service.cli.session.HiveSessionImpl.fetchResults(HiveSessionImpl.java:905)
  21.         at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
  22.         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  23.         at java.lang.reflect.Method.invoke(Method.java:498)
  24.         at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:78)
  25.         at org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:36)
  26.         at org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:63)
  27.         at java.security.AccessController.doPrivileged(Native Method)
  28.         at javax.security.auth.Subject.doAs(Subject.java:422)
  29.         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
  30.         at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:59)
  31.         at com.sun.proxy.$Proxy37.fetchResults(Unknown Source)
  32.         at org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:561)
  33.         at org.apache.hive.service.cli.thrift.ThriftCLIService.FetchResults(ThriftCLIService.java:786)
  34.         at org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1837)
  35.         at org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1822)
  36.         at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
  37.         at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
  38.         at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
  39.         at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
  40.         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  41.         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  42.         at java.lang.Thread.run(Thread.java:748)
  43. Caused by: java.io.IOException: org.apache.orc.FileFormatException: Malformed ORC file hdfs://shangzhi165:9000/user/hive/warehouse/sz_center_devdb.db/test_p/ct=2020/test_p_file__4f4b257d_b0ad_446c_a3ea_e5ea2e81580c.gz. Invalid postscript length 0
  44.         at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:602)
  45.         at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:509)
  46.         at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:146)
  47.         at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:2691)
  48.         at org.apache.hadoop.hive.ql.reexec.ReExecDriver.getResults(ReExecDriver.java:229)
  49.         at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:460)
  50.         ... 24 more
  51. Caused by: java.lang.RuntimeException: org.apache.orc.FileFormatException:Malformed ORC file hdfs://shangzhi165:9000/user/hive/warehouse/sz_center_devdb.db/test_p/ct=2020/test_p_file__4f4b257d_b0ad_446c_a3ea_e5ea2e81580c.gz. Invalid postscript length 0
  52.         at org.apache.orc.impl.ReaderImpl.ensureOrcFooter(ReaderImpl.java:260)
  53.         at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:570)
  54.         at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:368)
  55.         at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.<init>(ReaderImpl.java:61)
  56.         at org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader(OrcFile.java:96)
  57.         at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getRecordReader(OrcInputFormat.java:1971)
  58.         at org.apache.hadoop.hive.ql.exec.FetchOperator$FetchInputFormatSplit.getRecordReader(FetchOperator.java:776)
  59.         at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:344)
  60.         at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:540)
  61.         ... 29 more
复制代码
以上就是把ORC 的文件当错text 来处理,导致非常。
我们要在配置datax 任务的时候,一定要仔细核对。
1.3 注意区别以下建表语句

A、构建ORC 格式分区表

  1. CREATE TABLE IF NOT EXISTS `test_p` (
  2.   `id` int COMMENT 'date in file',
  3.   `name` string COMMENT 'appname' )
  4. COMMENT 'cleared log of origin log'
  5. PARTITIONED BY (
  6.   `ct`  string
  7. )
  8. ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  9. STORED AS ORC
  10. TBLPROPERTIES ('creator'='c-chenjc', 'crate_time'='2018-06-07')
  11. ;
复制代码
B. 构建默认文件格式分区表

  1. CREATE TABLE IF NOT EXISTS `test_p_text` (
  2.   `id` int COMMENT 'date in file',
  3.   `name` string COMMENT 'appname' )
  4. COMMENT 'cleared log of origin log'
  5. PARTITIONED BY (
  6.   `ct`  string
  7. )
复制代码
C.构建非分区表

  1. CREATE TABLE IF NOT EXISTS `my_test_p` (
  2.   `id` int COMMENT 'date in file',
  3.   `name` string COMMENT 'appname' ,
  4.   `ct`  string)
  5. ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  6. STORED AS ORC
  7. TBLPROPERTIES ('creator'='c-chenjc', 'crate_time'='2018-06-07')
  8. ;
复制代码
二、dataX 配置hive 分区表导入 配置

dataX 导入hive 分区表必须指定分区,否则的话,任务执行成功,没有返回非常,但是呢,数据没写入。
示例:
  1. {
  2.         "job": {
  3.                 "setting": {
  4.                         "speed": {
  5.                                 "channel": 1
  6.                         }
  7.                 },
  8.                 "content": [{
  9.                         "reader": {
  10.                                 "name": "mysqlreader",
  11.                                 "parameter": {
  12.                                         "column": ["id","name"],
  13.                                         "password": "xxxxx$xx",
  14.                                         "username": "xxxx",
  15.                                         "where": "",
  16.                                         "connection": [{
  17.                                                 "jdbcUrl": ["jdbc:mysql://x x xx:3306/web_magic"],
  18.                                                 "table": ["mysql_test_p"]
  19.                                         }]
  20.                                 }
  21.                         },
  22.                         "writer": {
  23.                                 "name": "hdfswriter",
  24.                                 "parameter": {
  25.                                         "column": [{
  26.                                                         "name": "id",
  27.                                                         "type": "int"
  28.                                                 },
  29.                                                 {
  30.                                                         "name": "name",
  31.                                                         "type": "string"
  32.                                                 }
  33.                                         ],
  34.                                         "compress": "",
  35.                                         "defaultFS": "hdfs://xxxxx:xxx",
  36.                                         "fieldDelimiter": ",",
  37.                                         "fileName": "test_p",
  38.                                         "fileType": "text",
  39.                                         "path": "/user/hive/warehouse/testjar.db/test_p/ct=shanghai",
  40.                                         "writeMode": "append"
  41.                                 }
  42.                         }
  43.                 }]
  44.         }
  45. }
复制代码
注意:你在导入的时候这个分区还必须存在,否则报非常。
2.1 检查hive 表分区是否存在

  1. show partitions test_p;
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

不到断气不罢休

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表