需求配景
2023年11月份接到需求:将8张表数据文件导入到Doris中,数据文件是txt格式存放在zip内,平均天天数据量80亿,占磁盘空间300G
以前从来没接触过Doris数据库,所以初版用的jdbc批量导入,也用了一些代码层面的导入优化,效果不抱负
查看Doris官网后发现有很多种导入方式
经过实战streamLoad方式导入服从快又稳,已经稳定运行三个月
效果图
正题---代码部分
工具类
- package io.geekidea.boot.utils;
- import cn.hutool.core.io.FileTypeUtil;
- import cn.hutool.core.io.FileUtil;
- import cn.hutool.json.JSONObject;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.codec.binary.Base64;
- import org.apache.http.HttpHeaders;
- import org.apache.http.client.methods.CloseableHttpResponse;
- import org.apache.http.client.methods.HttpPut;
- import org.apache.http.entity.FileEntity;
- import org.apache.http.impl.client.CloseableHttpClient;
- import org.apache.http.impl.client.DefaultRedirectStrategy;
- import org.apache.http.impl.client.HttpClientBuilder;
- import org.apache.http.impl.client.HttpClients;
- import org.apache.http.util.EntityUtils;
- import java.io.File;
- import java.io.IOException;
- import java.nio.charset.StandardCharsets;
- /*
- 这是一个 Doris Stream Load 示例,需要依赖
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>4.5.13</version>
- </dependency>
- */
- @Slf4j
- public class DorisStreamLoader implements Runnable {
- //可以选择填写 FE 地址以及 FE 的 http_port,但须保证客户端和 BE 节点的连通性。
- private String HOST;
- private int PORT;
- // 要导入的数据库
- private String DATABASE;
- // Doris 密码
- private String username;
- // Doris 用户名
- private String password;
- private static String TABLE = ""; // 要导入的表
- private static String COLUMNS = ""; // 要导入的表字段顺序
- private static File sourceFile;//源文件
- private static String toFilePath;//目标文件路径
- private static String errorPath;//失败文件路径
- private static String Separator;//目标文件路径
- private static String LOAD_FILE_NAME = "/path/to/1.txt"; // 要导入的本地文件路径
- public DorisStreamLoader(String host,int port,String database,String username,String password,String table, String columns,File sourceFile,String toFilePath,String Separator,String errorPath) {
- this.HOST = host;
- this.PORT = port;
- this.DATABASE = database;
- this.username = username;
- this.password = password;
- this.TABLE = table;
- this.COLUMNS = columns;
- this.sourceFile = sourceFile;
- this.toFilePath = toFilePath;
- this.Separator = Separator;
- this.errorPath = errorPath;
- }
- private final static HttpClientBuilder httpClientBuilder = HttpClients
- .custom()
- .setRedirectStrategy(new DefaultRedirectStrategy() {
- @Override
- protected boolean isRedirectable(String method) {
- // 如果连接目标是 FE,则需要处理 307 redirect。
- return true;
- }
- });
- @Override
- public void run() {
- load(sourceFile,toFilePath);
- }
- /**
- * doris stream load
- * @param file 导入文件
- * @param toFile 导入后移动路径
- */
- public void load(File file,String toFile) {
- try (CloseableHttpClient client = httpClientBuilder.build()) {
- HttpPut put = new HttpPut(String.format("http://%s:%s/api/%s/%s/_stream_load",
- HOST, PORT, DATABASE, TABLE));
- put.setHeader(HttpHeaders.EXPECT, "100-continue");
- put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(username, password));
- // 可以在 Header 中设置 stream load 相关属性,这里我们设置 label 和 column_separator。
- // 不设置时候 Doris会生成默认的
- //put.setHeader("label","label1");
- String type = FileTypeUtil.getType(file);
- put.setHeader("format",type);
- put.setHeader("columns",COLUMNS);
- // 设置错误行过滤比例
- put.setHeader("max_filter_ratio","0.1");
- put.setHeader("column_separator",Separator);
- // 设置导入文件。
- // 这里也可以使用 StringEntity 来传输任意数据。
- FileEntity entity = new FileEntity(file);
- put.setEntity(entity);
- log.info("Put request: {}==={}==={}",file.getName(), String.format("http://%s:%s/api/%s/%s/_stream_load",
- HOST, PORT, DATABASE, TABLE),COLUMNS);
- try (CloseableHttpResponse response = client.execute(put)) {
- String loadResult = "";
- if (response.getEntity() != null) {
- loadResult = EntityUtils.toString(response.getEntity());
- }
- final int statusCode = response.getStatusLine().getStatusCode();
- if (statusCode != 200) {
- throw new IOException(
- String.format("导入失败. status: %s load result: %s", statusCode, loadResult));
- }else {
- JSONObject json = new JSONObject(loadResult);
- if(json.getStr("Status").equals("Success") || json.getStr("Status").contains("Publish")){
- FileUtil.move(file,new File(toFile),true);
- log.info("导入成功");
- }else {
- FileUtil.move(file,new File(toFile),true);
- log.info("导入失败");
- }
- }
- log.info("Get load result: {}", loadResult);
- }
- }catch (Exception e) {
- FileUtil.move(file,new File(toFile),true);
- log.error("上传文件:{}到Doris报错{}",file.getName(),e);
- }
- }
- private String basicAuthHeader(String username, String password) {
- final String tobeEncode = username + ":" + password;
- //log.info("用户名:密码{}",tobeEncode);
- byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
- return "Basic " + new String(encoded);
- }
- public static void main(String[] args) throws Exception{
- //DorisStreamLoader loader = new DorisStreamLoader();
- //File file = new File(LOAD_FILE_NAME);
- //loader.load(file,"/app/file");
- }
- }
复制代码 具体使用
此处可以自行更改为并发提交:假如提交过猛会导入失败,根据自己数据库集群性能更改。
- public void Import(String folders) {
- //文件所在目录:folders + unzipTargetPath
- //cn.hutool.core.io.FileUtil;
- List<File> unzipTxt = FileUtil.loopFiles(folders + unzipTargetPath);
- log.info("需要导入文件数量{}", unzipTxt.size());
- for (File file : unzipTxt) {
- Map<String, String> map = getColumn(file.getName());
- new DorisStreamLoader(
- sdkConfig.getTargetIp(),
- sdkConfig.getTargetPort(),
- sdkConfig.getTargetDatabase(),
- sdkConfig.getTargetUsername(),
- sdkConfig.getTargetPassword(),
- map.get("table"), map.get("columns"), file, folders + movePath + file.getName(),
- "\\u0002",
- folders + errorPath + file.getName()).run();
- }
- }
- //获取表名和字段
- private Map<String, String> getColumn(String path) {
- Map<String, String> map = new HashMap<>();
- if (path.contains("ip")) {
- map.put("table", "tableName");
- map.put("columns", "A,B,C,D,E");
- }
- //省略。。。。
- return map;
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |