美丽的神话 发表于 2024-7-24 08:35:34

Doris数据库--导入数据之 stream load Java调FE接口上传文件导入数据

需求配景

2023年11月份接到需求:将8张表数据文件导入到Doris中,数据文件是txt格式存放在zip内,平均天天数据量80亿,占磁盘空间300G
以前从来没接触过Doris数据库,所以初版用的jdbc批量导入,也用了一些代码层面的导入优化,效果不抱负
查看Doris官网后发现有很多种导入方式
https://img-blog.csdnimg.cn/direct/93de55a60aae4aea9220808feafcb9dd.png
经过实战streamLoad方式导入服从快又稳,已经稳定运行三个月
效果图

https://img-blog.csdnimg.cn/direct/69b2086f1dee4cb19c2d3484313b8adc.png

正题---代码部分

工具类

package io.geekidea.boot.utils;

import cn.hutool.core.io.FileTypeUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.json.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.FileEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

/*
这是一个 Doris Stream Load 示例,需要依赖
<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.13</version>
</dependency>
*/
@Slf4j
public class DorisStreamLoader implements Runnable {

    //可以选择填写 FE 地址以及 FE 的 http_port,但须保证客户端和 BE 节点的连通性。
    private String HOST;
    private int PORT;
    // 要导入的数据库
    private String DATABASE;
    // Doris 密码
    private String username;
    // Doris 用户名
    private String password;
    private static String TABLE = "";   // 要导入的表
    private static String COLUMNS = "";   // 要导入的表字段顺序
    private static File sourceFile;//源文件
    private static String toFilePath;//目标文件路径
    private static String errorPath;//失败文件路径
    private static String Separator;//目标文件路径
    private static String LOAD_FILE_NAME = "/path/to/1.txt"; // 要导入的本地文件路径

    public DorisStreamLoader(String host,int port,String database,String username,String password,String table, String columns,File sourceFile,String toFilePath,String Separator,String errorPath) {
      this.HOST = host;
      this.PORT = port;
      this.DATABASE = database;
      this.username = username;
      this.password = password;
      this.TABLE = table;
      this.COLUMNS = columns;
      this.sourceFile = sourceFile;
      this.toFilePath = toFilePath;
      this.Separator = Separator;
      this.errorPath = errorPath;
    }

    private final static HttpClientBuilder httpClientBuilder = HttpClients
            .custom()
            .setRedirectStrategy(new DefaultRedirectStrategy() {
                @Override
                protected boolean isRedirectable(String method) {
                  // 如果连接目标是 FE,则需要处理 307 redirect。
                  return true;
                }
            });



    @Override
    public void run() {
      load(sourceFile,toFilePath);
    }

    /**
   * doris stream load
   * @param file 导入文件
   * @param toFile 导入后移动路径
   */
    public void load(File file,String toFile) {
      try (CloseableHttpClient client = httpClientBuilder.build()) {
            HttpPut put = new HttpPut(String.format("http://%s:%s/api/%s/%s/_stream_load",
                  HOST, PORT, DATABASE, TABLE));
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(username, password));
            // 可以在 Header 中设置 stream load 相关属性,这里我们设置 label 和 column_separator。
            // 不设置时候 Doris会生成默认的
            //put.setHeader("label","label1");
            String type = FileTypeUtil.getType(file);
            put.setHeader("format",type);
            put.setHeader("columns",COLUMNS);
            // 设置错误行过滤比例
            put.setHeader("max_filter_ratio","0.1");
            put.setHeader("column_separator",Separator);
            // 设置导入文件。
            // 这里也可以使用 StringEntity 来传输任意数据。
            FileEntity entity = new FileEntity(file);
            put.setEntity(entity);
            log.info("Put request: {}==={}==={}",file.getName(), String.format("http://%s:%s/api/%s/%s/_stream_load",
                  HOST, PORT, DATABASE, TABLE),COLUMNS);
            try (CloseableHttpResponse response = client.execute(put)) {
                String loadResult = "";
                if (response.getEntity() != null) {
                  loadResult = EntityUtils.toString(response.getEntity());
                }

                final int statusCode = response.getStatusLine().getStatusCode();
                if (statusCode != 200) {
                  throw new IOException(
                            String.format("导入失败. status: %s load result: %s", statusCode, loadResult));
                }else {
                  JSONObject json = new JSONObject(loadResult);
                  if(json.getStr("Status").equals("Success") || json.getStr("Status").contains("Publish")){
                        FileUtil.move(file,new File(toFile),true);
                        log.info("导入成功");
                  }else {
                        FileUtil.move(file,new File(toFile),true);
                        log.info("导入失败");
                  }
                }
                log.info("Get load result: {}", loadResult);
            }
      }catch (Exception e) {
            FileUtil.move(file,new File(toFile),true);
            log.error("上传文件:{}到Doris报错{}",file.getName(),e);
      }
    }

    private String basicAuthHeader(String username, String password) {
      final String tobeEncode = username + ":" + password;
      //log.info("用户名:密码{}",tobeEncode);
      byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
      return "Basic " + new String(encoded);
    }
    public static void main(String[] args) throws Exception{
      //DorisStreamLoader loader = new DorisStreamLoader();
      //File file = new File(LOAD_FILE_NAME);
      //loader.load(file,"/app/file");

    }
}
具体使用

此处可以自行更改为并发提交:假如提交过猛会导入失败,根据自己数据库集群性能更改。
public void Import(String folders) {
      //文件所在目录:folders + unzipTargetPath
      //cn.hutool.core.io.FileUtil;
      List<File> unzipTxt = FileUtil.loopFiles(folders + unzipTargetPath);
      log.info("需要导入文件数量{}", unzipTxt.size());
      for (File file : unzipTxt) {
            Map<String, String> map = getColumn(file.getName());
            new DorisStreamLoader(
                  sdkConfig.getTargetIp(),
                  sdkConfig.getTargetPort(),
                  sdkConfig.getTargetDatabase(),
                  sdkConfig.getTargetUsername(),
                  sdkConfig.getTargetPassword(),
                  map.get("table"), map.get("columns"), file, folders + movePath + file.getName(),
                  "\\u0002",
                  folders + errorPath + file.getName()).run();
      }
    }

    //获取表名和字段
    private Map<String, String> getColumn(String path) {
      Map<String, String> map = new HashMap<>();
      if (path.contains("ip")) {
            map.put("table", "tableName");
            map.put("columns", "A,B,C,D,E");
      }
      //省略。。。。
      return map;
    }

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Doris数据库--导入数据之 stream load Java调FE接口上传文件导入数据