Doris(五) -- 数据的导入导出

种地  金牌会员 | 2023-5-31 00:35:32 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 559|帖子 559|积分 1677

数据导入

使用 Insert 方式同步数据

用户可以通过 MySQL 协议,使用 INSERT 语句进行数据导入
INSERT 语句的使用方式和 MySQL 等数据库中 INSERT 语句的使用方式类似。 INSERT 语句支持以下两种语法:
  1. INSERT INTO table SELECT ...
  2. INSERT INTO table VALUES(...)
复制代码
对于 Doris 来说,一个 INSERT 命令就是一个完整的导入事务。
因此不论是导入一条数据,还是多条数据,我们都不建议在生产环境使用这种方式进行数据导入。高频次的 INSERT 操作会导致在存储层产生大量的小文件,会严重影响系统性能。
该方式仅用于线下简单测试或低频少量的操作。
或者可以使用以下方式进行批量的插入操作:
  1. INSERT INTO example_tbl VALUES
  2. (1000, "baidu1", 3.25)
  3. (2000, "baidu2", 4.25)
  4. (3000, "baidu3", 5.25);
复制代码
Stream Load

用于将本地文件导入到doris中。Stream Load 是通过 HTTP 协议与 Doris 进行连接交互的。
该方式中涉及 HOST:PORT 都是对应的HTTP 协议端口。
•        BE 的 HTTP 协议端口,默认为 8040。
•        FE 的 HTTP 协议端口,默认为 8030。
但须保证客户端所在机器网络能够联通FE, BE 所在机器。
  1. -- 创建表
  2. drop table if exists load_local_file_test;
  3. CREATE TABLE IF NOT EXISTS load_local_file_test
  4. (
  5.     id INT,
  6.     name VARCHAR(50),
  7.     age TINYINT
  8. )
  9. unique key(id)
  10. DISTRIBUTED BY HASH(id) BUCKETS 3;
复制代码
  1. # 创建文件
  2. 1,zss,28
  3. 2,lss,28
  4. 3,ww,88
  5. # 导入数据
  6. ## 语法示例
  7. curl \
  8. -u user:passwd \  # 账号密码
  9. -H "label:load_local_file_test" \  # 本次任务的唯一标识
  10. -T 文件地址 \
  11. http://主机名:端口号/api/库名/表名/_stream_load
  12. # user:passwd 为在 Doris 中创建的用户。初始用户为 admin / root,密码初始状态下为空。
  13. # host:port 为 BE 的 HTTP 协议端口,默认是 8040,可以在 Doris 集群 WEB UI页面查看。
  14. # label: 可以在 Header 中指定 Label 唯一标识这个导入任务。
  15. curl \
  16. -u root:123 \
  17. -H "label:load_local_file" \
  18. -H "column_separator:," \
  19. -T /root/data/loadfile.txt \
  20. http://doitedu01:8040/api/test/load_local_file_test/_stream_load
复制代码
curl的一些可配置的参数


  • label: 导入任务的标签,相同标签的数据无法多次导入。(标签默认保留30分钟)
  • column_separator:用于指定导入文件中的列分隔符,默认为\t。
  • line_delimiter:用于指定导入文件中的换行符,默认为\n。
  • columns:用于指定文件中的列和table中列的对应关系,默认一一对应
  • where: 用来过滤导入文件中的数据
  • max_filter_ratio:最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。数据不规范不包括通过 where 条件过滤掉的行。
  • partitions: 用于指定这次导入所设计的partition。如果用户能够确定数据对应的partition,推荐指定该项。不满足这些分区的数据将被过滤掉。
  • timeout: 指定导入的超时时间。单位秒。默认是 600 秒。可设置范围为 1 秒 ~ 259200 秒。
  • timezone: 指定本次导入所使用的时区。默认为东八区。该参数会影响所有导入涉及的和时区有关的函数结果。
  • exec_mem_limit: 导入内存限制。默认为 2GB。单位为字节。
  • format: 指定导入数据格式,默认是csv,支持json格式。
  • read_json_by_line: 布尔类型,为true表示支持每行读取一个json对象,默认值为false。
  • merge_type: 数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理, 示例:-H "merge_type: MERGE" -H "delete: flag=1"
  • delete: 仅在 MERGE下有意义, 表示数据的删除条件 function_column.sequence_col: 只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence列进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。
建议一个导入请求的数据量控制在 1 - 2 GB 以内。如果有大量本地文件,可以分批并发提交。
导入json数据
  1. # 准备数据
  2. {"id":1,"name":"liuyan","age":18}
  3. {"id":2,"name":"tangyan","age":18}
  4. {"id":3,"name":"jinlian","age":18}
  5. {"id":4,"name":"dalang","age":18}
  6. {"id":5,"name":"qingqing","age":18}
  7. curl \
  8. -u root: \
  9. -H "label:load_local_file_json_20221126" \
  10. -H "columns:id,name,age" \
  11. -H "max_filter_ratio:0.1" \
  12. -H "timeout:1000" \
  13. -H "exec_mem_limit:1G" \
  14. -H "where:id>1" \
  15. -H "format:json" \
  16. -H "read_json_by_line:true" \
  17. -H "merge_type:delete" \
  18. -T /root/data/json.txt \
  19. http://doitedu01:8040/api/test/load_local_file_test/_stream_load
  20.   -H "merge_type:append" \
  21.   # 会把id = 3 的这条数据删除
  22.   -H "merge_type:MERGE" \
  23.   -H "delete:id=3"
复制代码
外部存储数据导入(hdfs)

适用场景

•        源数据在 Broker 可以访问的存储系统中,如 HDFS。
•        数据量在几十到百 GB 级别。
基本原理


  • 创建提交导入的任务
  • FE生成执行计划并将执行计划分发到多个BE节点上(每个BE节点都导入一部分数据)
  • BE收到执行计划后开始执行,从broker上拉取数据到自己的节点上
  • 所有BE都完成后,FE决定是否导入成功,返回结果给客户端
  1. -- 新建一张表
  2. drop table if exists load_hdfs_file_test1;
  3. CREATE TABLE IF NOT EXISTS load_hdfs_file_test1
  4. (
  5.     id INT,
  6.     name VARCHAR(50),
  7.     age TINYINT
  8. )
  9. unique key(id)
  10. DISTRIBUTED BY HASH(id) BUCKETS 3;
复制代码
  1. 将本地的数据导入到hdfs上面
  2. hdfs dfs -put ./loadfile.txt  hdfs://linux01:8020/
  3. hdfs dfs -ls  hdfs://linux01:8020/
复制代码
  1. -- 导入语法
  2. LOAD LABEL test.label_202204(
  3. [MERGE|APPEND|DELETE]  -- 不写就是append
  4. DATA INFILE
  5. (
  6. "file_path1"[, file_path2, ...]  -- 描述数据的路径   这边可以写多个 ,以逗号分割
  7. )
  8. [NEGATIVE]               -- 负增长
  9. INTO TABLE `table_name`  -- 导入的表名字
  10. [PARTITION (p1, p2, ...)] -- 导入到哪些分区,不符合这些分区的就会被过滤掉
  11. [COLUMNS TERMINATED BY "column_separator"]  -- 指定分隔符
  12. [FORMAT AS "file_type"] -- 指定存储的文件类型
  13. [(column_list)] -- 指定导入哪些列
  14. [COLUMNS FROM PATH AS (c1, c2, ...)]  -- 从路劲中抽取的部分列
  15. [SET (column_mapping)] -- 对于列可以做一些映射,写一些函数
  16. -- 这个参数要写在要写在set的后面
  17. [PRECEDING FILTER predicate]  -- 在mapping前做过滤做一些过滤
  18. [WHERE predicate]  -- 在mapping后做一些过滤  比如id>10
  19. [DELETE ON expr] --根据字段去做一些抵消消除的策略  需要配合MERGE
  20. [ORDER BY source_sequence] -- 导入数据的时候保证数据顺序
  21. [PROPERTIES ("key1"="value1", ...)]  -- 一些配置参数
复制代码
  1. -- 将hdfs上的数据load到表中
  2. LOAD LABEL test.label_20221125
  3. (
  4. DATA INFILE("hdfs://linux01:8020/test.txt")
  5. INTO TABLE `load_hdfs_file_test`
  6. COLUMNS TERMINATED BY ","
  7. (id,name,age)
  8. )
  9. with HDFS (
  10. "fs.defaultFS"="hdfs://linux01:8020",
  11. "hadoop.username"="root"
  12. )
  13. PROPERTIES
  14. (
  15. "timeout"="1200",
  16. "max_filter_ratio"="0.1"
  17. );
  18. -- 这是一个异步的操作,所以需要去查看下执行的状态
  19. show load order by createtime desc limit 1\G;
复制代码
从 HDFS 导入数据,使用通配符匹配两批两批文件。分别导入到两个表中
  1. LOAD LABEL example_db.label2
  2. (
  3.     DATA INFILE("hdfs://linux01:8020/input/file-10*")
  4.     INTO TABLE `my_table1`
  5.     PARTITION (p1)
  6.     COLUMNS TERMINATED BY ","
  7.     FORMAT AS "parquet"  
  8.     (id, tmp_salary, tmp_score)
  9.     SET (
  10.         salary= tmp_salary + 1000,
  11.         score = tmp_score + 10
  12.     ),
  13.     DATA INFILE("hdfs://linux01:8020/input/file-20*")
  14.     INTO TABLE `my_table2`
  15.     COLUMNS TERMINATED BY ","
  16.     (k1, k2, k3)
  17. )
  18. with HDFS (
  19. "fs.defaultFS"="hdfs://linux01:8020",
  20. "hadoop.username"="root"
  21. )
  22. -- 导入数据,并提取文件路径中的分区字段
  23. LOAD LABEL example_db.label10
  24. (
  25.     DATA INFILE("hdfs://linux01:8020/user/hive/warehouse/table_name/dt=20221125/*")
  26.     INTO TABLE `my_table`
  27.     FORMAT AS "csv"
  28.     (k1, k2, k3)
  29.     COLUMNS FROM PATH AS (dt)
  30. )
  31. WITH BROKER hdfs
  32. (
  33.     "username"="root",
  34.     "password"="123"
  35. );
  36. -- 对待导入数据进行过滤。
  37. LOAD LABEL example_db.label6
  38. (
  39.     DATA INFILE("hdfs://linux01:8020/input/file")
  40.     INTO TABLE `my_table`
  41.     (k1, k2, k3)
  42.     SET (
  43.         k2 = k2 + 1
  44.     )
  45.         PRECEDING FILTER k1 = 1  ==》前置过滤
  46.     WHERE k1 > k2   ==》 后置过滤
  47. )
  48. WITH BROKER hdfs
  49. (
  50.     "username"="root",
  51.     "password"="123"
  52. );
  53. -- 只有原始数据中,k1 = 1,并且转换后,k1 > k2 的行才会被导入。
复制代码
取消导入任务

当 Broker load 作业状态不为 CANCELLED 或 FINISHED 时,可以被用户手动取消。
取消时需要指定待取消导入任务的 Label 。取消导入命令语法可执行 HELP CANCEL LOAD 查看。
  1. CANCEL LOAD [FROM db_name] WHERE LABEL="load_label";
复制代码
通过外部表同步数据

Doris 可以创建外部表。创建完成后,可以通过 SELECT 语句直接查询外部表的数据,也可以通过 INSERT INTO SELECT 的方式导入外部表的数据。
Doris 外部表目前支持的数据源包括:MySQL,Oracle,Hive,PostgreSQL,SQLServer,Iceberg,ElasticSearch
  1. -- 整体语法
  2. CREATE [EXTERNAL] TABLE table_name (
  3. col_name col_type [NULL | NOT NULL] [COMMENT "comment"]
  4. ) ENGINE=HIVE
  5. [COMMENT "comment"]
  6. PROPERTIES (
  7. -- 我要映射的hive表在哪个库里面
  8. -- 映射的表名是哪一张
  9. -- hive的元数据服务地址
  10. 'property_name'='property_value',
  11. ...
  12. );
  13. -- 参数说明:
  14. -- 1.外表列
  15. --         列名要与 Hive 表一一对应
  16. --         列的顺序需要与 Hive 表一致
  17. --         必须包含 Hive 表中的全部列
  18. --         Hive 表分区列无需指定,与普通列一样定义即可。
  19. -- 2.ENGINE 需要指定为 HIVE
  20. -- 3.PROPERTIES 属性:
  21. --         hive.metastore.uris:Hive Metastore 服务地址
  22. --         database:挂载 Hive 对应的数据库名
  23. --         table:挂载 Hive 对应的表名
复制代码
完成在 Doris 中建立 Hive 外表后,除了无法使用 Doris 中的数据模型(rollup、预聚合、物化视图等)外,与普通的 Doris OLAP 表并无区别
  1. -- 在Hive 中创建一个测试用表:
  2. CREATE TABLE `user_info` (
  3. `id` int,
  4. `name` string,
  5. `age` int
  6. ) stored as orc;
  7. insert into user_info values (1,'zss',18);
  8. insert into user_info values (2,'lss',20);
  9. insert into user_info values (3,'ww',25);
  10. -- Doris 中创建外部表
  11. CREATE EXTERNAL TABLE `hive_user_info` (
  12. `id` int,
  13. `name` varchar(10),
  14. `age` int
  15. ) ENGINE=HIVE
  16. PROPERTIES (
  17. 'hive.metastore.uris' = 'thrift://linux01:9083',
  18. 'database' = 'db1',
  19. 'table' = 'user_info'
  20. );
复制代码
外部表创建好后,就可以直接在doris中对这个外部表进行查询了
直接查询外部表,无法利用到doris自身的各种查询优化机制!
  1. select * from hive_user_info;
  2. -- 将数据从外部表导入内部表
  3. -- 数据从外部表导入内部表后,就可以利用doris自身的查询优势了!
  4. -- 假设要导入的目标内部表为: doris_user_info  (需要提前创建)
  5. CREATE TABLE IF NOT EXISTS doris_user_info
  6. (
  7.     id INT,
  8.     name VARCHAR(50),
  9.     age TINYINT
  10. )
  11. unique key(id)
  12. DISTRIBUTED BY HASH(id) BUCKETS 3;
  13. -- 就是用sql查询,从外部表中select出数据后,insert到内部表即可
  14. insert into doris_user_info
  15. select
  16. *
  17. from hive_user_info;
复制代码
注意:
Hive 表 Schema 变更不会自动同步,需要在 Doris 中重建 Hive 外表。
当前 Hive 的存储格式仅支持 Text,Parquet 和 ORC 类型
Binlog Load

Binlog Load提供了一种使Doris增量同步用户在Mysql数据库中对数据更新操作的CDC(Change Data Capture)功能。
基本原理
当前版本设计中,Binlog Load需要依赖canal作为中间媒介,让canal伪造成一个从节点去获取Mysql主节点上的Binlog并解析,再由Doris去获取Canal上解析好的数据,主要涉及Mysql端、Canal端以及Doris端

  • FE会为每个数据同步作业启动一个canal client,来向canal server端订阅并获取数据。
  • client中的receiver将负责通过Get命令接收数据,每获取到一个数据batch,都会由consumer根据对应表分发到不同的channel,每个channel都会为此数据batch产生一个发送数据的子任务Task。
  • 在FE上,一个Task是channel向BE发送数据的子任务,里面包含分发到当前channel的同一个batch的数据。
  • channel控制着单个表事务的开始、提交、终止。一个事务周期内,一般会从consumer获取到多个batch的数据,因此会产生多个向BE发送数据的子任务Task,在提交事务成功前,这些Task不会实际生效。
  • 满足一定条件时(比如超过一定时间、达到提交最大数据大小),consumer将会阻塞并通知各个channel提交事务。
  • 当且仅当所有channel都提交成功,才会通过Ack命令通知canal并继续获取并消费数据。
  • 如果有任意channel提交失败,将会重新从上一次消费成功的位置获取数据并再次提交(已提交成功的channel不会再次提交以保证幂等性)。
  • 整个数据同步作业中,FE通过以上流程不断的从canal获取数据并提交到BE,来完成数据同步。
Mysql端

在Mysql Cluster模式的主从同步中,二进制日志文件(Binlog)记录了主节点上的所有数据变化,数据在Cluster的多个节点间同步、备份都要通过Binlog日志进行,从而提高集群的可用性。架构通常由一个主节点(负责写)和一个或多个从节点(负责读)构成,所有在主节点上发生的数据变更将会复制给从节点。
注意:目前必须要使用Mysql 5.7及以上的版本才能支持Binlog Load功能。
  1. # 打开mysql的二进制binlog日志功能,则需要编辑my.cnf配置文件设置一下。
  2. find / -name my.cnf
  3. /etc/my.cnf
复制代码
  1. # 修改mysqld中的一些配置文件
  2. [mysqld]
  3. server_id = 1
  4. log-bin = mysql-bin
  5. binlog-format = ROW
  6. #binlog-format 的三种模式
  7. #ROW   记录每一行数据的信息
  8. #Statement  记录sql语句
  9. #Mixed   上面两种的混合
  10. # 重启 MySQL 使配置生效
  11. systemctl restart mysqld
复制代码
  1. -- 创建用户并授权
  2. -- 设置这些参数可以使得mysql的密码简单化
  3. set global validate_password_length=4;
  4. set global validate_password_policy=0;
  5. -- 新增一个canal的用户,让他监听所有库中的所有表,并且设置密码为canal
  6. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;
  7. -- 刷新一下权限
  8. FLUSH PRIVILEGES;
  9. -- 准备测试表
  10. CREATE TABLE `user_doris2` (
  11.   `id` int(11) NOT NULL AUTO_INCREMENT,
  12.   `name` varchar(255) DEFAULT NULL,
  13.   `age` int(11) DEFAULT NULL,
  14.   `gender` varchar(255) DEFAULT NULL,
  15.   PRIMARY KEY (`id`)
  16. ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
复制代码
配置 Canal 端

Canal 是属于阿里巴巴 otter 项目下的一个子项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,用于解决跨机房同步的业务场景,建议使用 canal 1.1.5及以上版本。
下载地址:https://github.com/alibaba/canal/releases
  1. # 上传并解压 canal deployer压缩包
  2. mkdir /opt/apps/canal
  3. tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/apps/canal
  4. # 在 conf 文件夹下新建目录并重命名
  5. # 一个 canal 服务中可以有多个 instance,conf/下的每一个目录即是一个实例,每个实例下面都有独立的配置文件
  6. mkdir /opt/apps/canel/conf/doris
  7. # 拷贝配置文件模板
  8. cp /opt/apps/canal/conf/example/instance.properties /opt/apps/canal/conf/doris/
  9. # 修改 conf/canal.properties 的配置
  10. vi canal.properties
  11. # 进入找到canal.destinations = example
  12. # 将其修改为 我们自己配置的目录
  13. canal.destinations = doris
  14. # 修改 instance 配置文件
  15. vi instance.properties
  16. # 修改:
  17. canal.instance.master.address=doitedu01:3306
  18. # 启动
  19. sh bin/startup.sh
复制代码
注意:canal client 和 canal instance 是一一对应的,Binlog Load 已限制多个数据同步作 业不能连接到同一个 destination。
配置目标表
  1. 基本语法:
  2. CREATE SYNC [db.]job_name
  3. (
  4. channel_desc,
  5. channel_desc
  6. ...
  7. )
  8. binlog_desc
  9. -- 参数说明:
  10. -- job_name:是数据同步作业在当前数据库内的唯一标识
  11. -- channel_desc :用来定义任务下的数据通道,可表示 MySQL 源表到 doris 目标表的映射关系。在设置此项时,如果存在多个映射关系,必须满足 MySQL 源表应该与 doris 目标表是一一对应关系,其他的任何映射关系(如一对多关系),检查语法时都被视为不合法。
  12. -- column_mapping:主要指MySQL源表和doris目标表的列之间的映射关系,如果不指定,FE 会默认源表和目标表的列按顺序一一对应。但是我们依然建议显式的指定列的映射关系,这样当目标表的结构发生变化(比如增加一个 nullable 的列),数据同步作业依然可以进行。否则,当发生上述变动后,因为列映射关系不再一一对应,导入将报错。
  13. -- binlog_desc:定义了对接远端 Binlog 地址的一些必要信息,目前可支持的对接类型只有 canal 方式,所有的配置项前都需要加上 canal 前缀。
  14. --         canal.server.ip: canal server 的地址
  15. --         canal.server.port: canal server 的端口
  16. --         canal.destination: 前文提到的 instance 的字符串标识
  17. --         canal.batchSize: 每批从 canal server 处获取的 batch 大小的最大值,默认 8192
  18. --         canal.username: instance 的用户名
  19. --         canal.password: instance 的密码
  20. --         canal.debug: 设置为 true 时,会将 batch 和每一行数据的详细信息都打印出来,会影响性能。
  21. -- Doris 创建与 Mysql 对应的目标表
  22. CREATE TABLE `binlog_mysql` (
  23. `id` int(11) NOT NULL COMMENT "",
  24. `name` VARCHAR(50) NOT NULL COMMENT "",
  25. `age` int(11) NOT NULL COMMENT "" ,
  26. `gender` VARCHAR(50) NOT NULL COMMENT ""
  27. ) ENGINE=OLAP
  28. UNIQUE KEY(`id`)
  29. DISTRIBUTED BY HASH(`id`) BUCKETS 1;
  30. CREATE SYNC test.job20221228
  31. (
  32. FROM test.binlog_test INTO binlog_test
  33. )
  34. FROM BINLOG
  35. (
  36. "type" = "canal",
  37. "canal.server.ip" = "linux01",
  38. "canal.server.port" = "11111",
  39. "canal.destination" = "doris",
  40. "canal.username" = "canal",
  41. "canal.password" = "canal"
  42. );
  43. -- 查看作业状态
  44. -- 展示当前数据库的所有数据同步作业状态。
  45. SHOW SYNC JOB;
  46. -- 展示数据库 `test_db` 下的所有数据同步作业状态。
  47. SHOW SYNC JOB FROM `test`;
  48. -- 停止名称为 `job_name` 的数据同步作业
  49. STOP SYNC JOB [db.]job_name
  50. -- 暂停名称为 `job_name` 的数据同步作业
  51. PAUSE SYNC JOB [db.]job_name
  52. -- 恢复名称为 `job_name` 的数据同步作业
  53. RESUME SYNC JOB `job_name`
复制代码
数据导出

数据导出(Export)是 Doris 提供的一种将数据导出的功能。该功能可以将用户指定的表或分区的数据,以文本的格式,通过 Broker 进程导出到远端存储上,如 HDFS / 对象存储(支持S3协议) 等。
原理

  • 用户提交一个 Export 作业到 FE。
  • FE 的 Export 调度器会通过两阶段来执行一个 Export 作业:
  • PENDING:FE 生成 ExportPendingTask,向 BE 发送 snapshot 命令,对所有涉及到的 Tablet 做一个快照。并生成多个查询计划。
  • EXPORTING:FE 生成 ExportExportingTask,开始执行查询计划。
查询计划拆分

Export 作业会生成多个查询计划,每个查询计划负责扫描一部分 Tablet。每个查询计划扫描的 Tablet 个数由 FE 配置参数 export_tablet_num_per_task 指定,默认为 5。即假设一共 100 个 Tablet,则会生成 20 个查询计划。用户也可以在提交作业时,通过作业属性 tablet_num_per_task 指定这个数值。
一个作业的多个查询计划顺序执行
查询计划执行

一个查询计划扫描多个分片,将读取的数据以行的形式组织,每 1024 行为一个 batch,调用 Broker 写入到远端存储上。
查询计划遇到错误会整体自动重试 3 次。如果一个查询计划重试 3 次依然失败,则整个作业失败。
Doris 会首先在指定的远端存储的路径中,建立一个名为 __doris_export_tmp_12345 的临时目录(其中 12345 为作业 id)。导出的数据首先会写入这个临时目录。每个查询计划会生成一个文件,文件名示例:
  1. export-data-c69fcf2b6db5420f-a96b94c1ff8bccef-1561453713822
复制代码
其中 c69fcf2b6db5420f-a96b94c1ff8bccef 为查询计划的 query id。1561453713822 为文件生成的时间戳。当所有数据都导出后,Doris 会将这些文件 rename 到用户指定的路径中
示例:导出到hdfs
  1. EXPORT TABLE test.event_info_log1 -- 库名.表名
  2. to "hdfs://linux01:8020/event_info_log1"  -- 导出到那里去
  3. PROPERTIES
  4. (
  5.     "label" = "event_info_log1",
  6.     "column_separator"=",",
  7.     "exec_mem_limit"="2147483648",
  8.     "timeout" = "3600"
  9. )
  10. WITH BROKER "broker_name"
  11. (
  12.     "username" = "root",
  13.     "password" = ""
  14. );
  15. -- 1.label:本次导出作业的标识。后续可以使用这个标识查看作业状态。
  16. -- 2.column_separator:列分隔符。默认为 \t。支持不可见字符,比如 '\x07'。
  17. -- 3.columns:要导出的列,使用英文状态逗号隔开,如果不填这个参数默认是导出表的所有列。
  18. -- 4.line_delimiter:行分隔符。默认为 \n。支持不可见字符,比如 '\x07'。
  19. -- 5.exec_mem_limit: 表示 Export 作业中,一个查询计划在单个 BE 上的内存使用限制。默认 2GB。单位字节。
  20. -- 6.timeout:作业超时时间。默认 2小时。单位秒。
  21. -- 7.tablet_num_per_task:每个查询计划分配的最大分片数。默认为 5。
  22. -- 查看导出状态
  23. show EXPORT \G;
复制代码
注意事项

  • 不建议一次性导出大量数据。一个 Export 作业建议的导出数据量最大在几十 GB。过大的导出会导致更多的垃圾文件和更高的重试成本。
  • 如果表数据量过大,建议按照分区导出。
  • 在 Export 作业运行过程中,如果 FE 发生重启或切主,则 Export 作业会失败,需要用户重新提交。
  • 如果 Export 作业运行失败,在远端存储中产生的 __doris_export_tmp_xxx 临时目录,以及已经生成的文件不会被删除,需要用户手动删除。
  • 如果 Export 作业运行成功,在远端存储中产生的 __doris_export_tmp_xxx 目录,根据远端存储的文件系统语义,可能会保留,也可能会被清除。比如在百度对象存储(BOS)中,通过 rename 操作将一个目录中的最后一个文件移走后,该目录也会被删除。如果该目录没有被清除,用户可以手动清除
  • 当 Export 运行完成后(成功或失败),FE 发生重启或切主,则 SHOW EXPORT展示的作业的部分信息会丢失,无法查看。
  • Export 作业只会导出 Base 表的数据,不会导出 Rollup Index 的数据。
  • Export 作业会扫描数据,占用 IO 资源,可能会影响系统的查询延迟
查询结果导出

SELECT INTO OUTFILE 语句可以将查询结果导出到文件中。目前支持通过 Broker进程, 通过 S3 协议, 或直接通过 HDFS 协议,导出到远端存储,如 HDFS,S3,BOS,COS (腾讯云)上。
  1. -- 语法
  2. query_stmt  -- 查询语句
  3. INTO OUTFILE "file_path"  --导出文件的路劲
  4. [format_as]  -- 指定文件存储的格式
  5. [properties]  -- 一些配置文件
复制代码
file_path:指向文件存储的路径以及文件前缀。如 hdfs://path/to/my_file_.最终的文件名将由 my_file_,文件序号以及文件格式后缀组成。其中文件序号由 0 开始,数量为文件被分割的数量
  1. -- 如
  2. my_file_abcdefg_0.csv
  3. my_file_abcdefg_1.csv
  4. my_file_abcdegf_2.csv
  5. -- [format_as]:指定导出格式。默认为 CSV
  6. -- [properties]:指定相关属性。目前支持通过 Broker 进程,hdfs协议等
  7. -- Broker 相关属性需加前缀 broker.
  8. -- HDFS 相关属性需加前缀 hdfs. 其中hdfs.fs.defaultFS 用于填写 namenode地址和端口,属于必填项。
  9. -- 如:
  10. ("broker.prop_key" = "broker.prop_val", ...)
  11. ("hdfs.fs.defaultFS" = "xxx", "hdfs.hdfs_user" = "xxx")
  12. -- 其他属性:
  13. -- column_separator:列分隔符,仅对 CSV 格式适用。默认为 \t。
  14. -- line_delimiter:行分隔符,仅对 CSV 格式适用。默认为 \n。
  15. -- max_file_size:单个文件的最大大小。默认为 1GB。取值范围在 5MB 到 2GB 之间。超过这个大小的文件将会被切分。
  16. -- schema:PARQUET 文件 schema 信息。仅对 PARQUET 格式适用。导出文件格式为 PARQUET 时,必须指定 schema。
复制代码
使用 broker 方式,将简单查询结果导出
  1. select * from log_detail where id >2
  2. INTO OUTFILE "hdfs://doitedu01:8020/doris-out/broker_a_"
  3. FORMAT AS CSV
  4. PROPERTIES
  5. (
  6. "broker.name" = "broker_name",
  7. "column_separator" = ",",
  8. "line_delimiter" = "\n",
  9. "max_file_size" = "100MB"
  10. );
复制代码
使用 HDFS 方式导出
  1. EXPLAIN SELECT * FROM log_detail
  2. INTO OUTFILE "hdfs://doris-out/hdfs_"
  3. FORMAT AS CSV
  4. PROPERTIES
  5. (
  6. "fs.defaultFS" = "hdfs://doitedu01:8020",
  7. "hadoop.username" = "root",
  8. "column_separator" = ","
  9. );
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

种地

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表