Doris数据库--导入数据之 stream load Java调FE接口上传文件导入数据 ...

打印 上一主题 下一主题

主题 974|帖子 974|积分 2922

需求配景

2023年11月份接到需求:将8张表数据文件导入到Doris中,数据文件是txt格式存放在zip内,平均天天数据量80亿,占磁盘空间300G
以前从来没接触过Doris数据库,所以初版用的jdbc批量导入,也用了一些代码层面的导入优化,效果不抱负
查看Doris官网后发现有很多种导入方式

经过实战streamLoad方式导入服从快又稳,已经稳定运行三个月
效果图



正题---代码部分

工具类

  1. package io.geekidea.boot.utils;
  2. import cn.hutool.core.io.FileTypeUtil;
  3. import cn.hutool.core.io.FileUtil;
  4. import cn.hutool.json.JSONObject;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.commons.codec.binary.Base64;
  7. import org.apache.http.HttpHeaders;
  8. import org.apache.http.client.methods.CloseableHttpResponse;
  9. import org.apache.http.client.methods.HttpPut;
  10. import org.apache.http.entity.FileEntity;
  11. import org.apache.http.impl.client.CloseableHttpClient;
  12. import org.apache.http.impl.client.DefaultRedirectStrategy;
  13. import org.apache.http.impl.client.HttpClientBuilder;
  14. import org.apache.http.impl.client.HttpClients;
  15. import org.apache.http.util.EntityUtils;
  16. import java.io.File;
  17. import java.io.IOException;
  18. import java.nio.charset.StandardCharsets;
  19. /*
  20. 这是一个 Doris Stream Load 示例,需要依赖
  21. <dependency>
  22.     <groupId>org.apache.httpcomponents</groupId>
  23.     <artifactId>httpclient</artifactId>
  24.     <version>4.5.13</version>
  25. </dependency>
  26. */
  27. @Slf4j
  28. public class DorisStreamLoader implements Runnable {
  29.     //可以选择填写 FE 地址以及 FE 的 http_port,但须保证客户端和 BE 节点的连通性。
  30.     private String HOST;
  31.     private int PORT;
  32.     // 要导入的数据库
  33.     private String DATABASE;
  34.     // Doris 密码
  35.     private String username;
  36.     // Doris 用户名
  37.     private String password;
  38.     private static String TABLE = "";     // 要导入的表
  39.     private static String COLUMNS = "";     // 要导入的表字段顺序
  40.     private static File sourceFile;//源文件
  41.     private static String toFilePath;//目标文件路径
  42.     private static String errorPath;//失败文件路径
  43.     private static String Separator;//目标文件路径
  44.     private static String LOAD_FILE_NAME = "/path/to/1.txt"; // 要导入的本地文件路径
  45.     public DorisStreamLoader(String host,int port,String database,String username,String password,String table, String columns,File sourceFile,String toFilePath,String Separator,String errorPath) {
  46.         this.HOST = host;
  47.         this.PORT = port;
  48.         this.DATABASE = database;
  49.         this.username = username;
  50.         this.password = password;
  51.         this.TABLE = table;
  52.         this.COLUMNS = columns;
  53.         this.sourceFile = sourceFile;
  54.         this.toFilePath = toFilePath;
  55.         this.Separator = Separator;
  56.         this.errorPath = errorPath;
  57.     }
  58.     private final static HttpClientBuilder httpClientBuilder = HttpClients
  59.             .custom()
  60.             .setRedirectStrategy(new DefaultRedirectStrategy() {
  61.                 @Override
  62.                 protected boolean isRedirectable(String method) {
  63.                     // 如果连接目标是 FE,则需要处理 307 redirect。
  64.                     return true;
  65.                 }
  66.             });
  67.     @Override
  68.     public void run() {
  69.         load(sourceFile,toFilePath);
  70.     }
  71.     /**
  72.      * doris stream load
  73.      * @param file 导入文件
  74.      * @param toFile 导入后移动路径
  75.      */
  76.     public void load(File file,String toFile) {
  77.         try (CloseableHttpClient client = httpClientBuilder.build()) {
  78.             HttpPut put = new HttpPut(String.format("http://%s:%s/api/%s/%s/_stream_load",
  79.                     HOST, PORT, DATABASE, TABLE));
  80.             put.setHeader(HttpHeaders.EXPECT, "100-continue");
  81.             put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(username, password));
  82.             // 可以在 Header 中设置 stream load 相关属性,这里我们设置 label 和 column_separator。
  83.             // 不设置时候 Doris会生成默认的
  84.             //put.setHeader("label","label1");
  85.             String type = FileTypeUtil.getType(file);
  86.             put.setHeader("format",type);
  87.             put.setHeader("columns",COLUMNS);
  88.             // 设置错误行过滤比例
  89.             put.setHeader("max_filter_ratio","0.1");
  90.             put.setHeader("column_separator",Separator);
  91.             // 设置导入文件。
  92.             // 这里也可以使用 StringEntity 来传输任意数据。
  93.             FileEntity entity = new FileEntity(file);
  94.             put.setEntity(entity);
  95.             log.info("Put request: {}==={}==={}",file.getName(), String.format("http://%s:%s/api/%s/%s/_stream_load",
  96.                     HOST, PORT, DATABASE, TABLE),COLUMNS);
  97.             try (CloseableHttpResponse response = client.execute(put)) {
  98.                 String loadResult = "";
  99.                 if (response.getEntity() != null) {
  100.                     loadResult = EntityUtils.toString(response.getEntity());
  101.                 }
  102.                 final int statusCode = response.getStatusLine().getStatusCode();
  103.                 if (statusCode != 200) {
  104.                     throw new IOException(
  105.                             String.format("导入失败. status: %s load result: %s", statusCode, loadResult));
  106.                 }else {
  107.                     JSONObject json = new JSONObject(loadResult);
  108.                     if(json.getStr("Status").equals("Success") || json.getStr("Status").contains("Publish")){
  109.                         FileUtil.move(file,new File(toFile),true);
  110.                         log.info("导入成功");
  111.                     }else {
  112.                         FileUtil.move(file,new File(toFile),true);
  113.                         log.info("导入失败");
  114.                     }
  115.                 }
  116.                 log.info("Get load result: {}", loadResult);
  117.             }
  118.         }catch (Exception e) {
  119.             FileUtil.move(file,new File(toFile),true);
  120.             log.error("上传文件:{}到Doris报错{}",file.getName(),e);
  121.         }
  122.     }
  123.     private String basicAuthHeader(String username, String password) {
  124.         final String tobeEncode = username + ":" + password;
  125.         //log.info("用户名:密码{}",tobeEncode);
  126.         byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
  127.         return "Basic " + new String(encoded);
  128.     }
  129.     public static void main(String[] args) throws Exception{
  130.         //DorisStreamLoader loader = new DorisStreamLoader();
  131.         //File file = new File(LOAD_FILE_NAME);
  132.         //loader.load(file,"/app/file");
  133.     }
  134. }
复制代码
具体使用

此处可以自行更改为并发提交:假如提交过猛会导入失败,根据自己数据库集群性能更改。
  1. public void Import(String folders) {
  2.         //文件所在目录:folders + unzipTargetPath
  3.         //cn.hutool.core.io.FileUtil;
  4.         List<File> unzipTxt = FileUtil.loopFiles(folders + unzipTargetPath);
  5.         log.info("需要导入文件数量{}", unzipTxt.size());
  6.         for (File file : unzipTxt) {
  7.             Map<String, String> map = getColumn(file.getName());
  8.             new DorisStreamLoader(
  9.                     sdkConfig.getTargetIp(),
  10.                     sdkConfig.getTargetPort(),
  11.                     sdkConfig.getTargetDatabase(),
  12.                     sdkConfig.getTargetUsername(),
  13.                     sdkConfig.getTargetPassword(),
  14.                     map.get("table"), map.get("columns"), file, folders + movePath + file.getName(),
  15.                     "\\u0002",
  16.                     folders + errorPath + file.getName()).run();
  17.         }
  18.     }
  19.     //获取表名和字段
  20.     private Map<String, String> getColumn(String path) {
  21.         Map<String, String> map = new HashMap<>();
  22.         if (path.contains("ip")) {
  23.             map.put("table", "tableName");
  24.             map.put("columns", "A,B,C,D,E");
  25.         }
  26.         //省略。。。。
  27.         return map;
  28.     }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

美丽的神话

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表