Starrocks扩展FileSystem代码分析

打印 上一主题 下一主题

主题 657|帖子 657|积分 1971

Starrocks扩展FileSystem代码分析

Starrocks支持使用FILES()算子对接文件系统
例如可以使用insert into files("path"="hdfs://xxx.xx.xxx.xx:9000/unload/data1", "format"="parquet", "compression" = "lz4") select * from sales_records实现将表sales_records中的数据导出到HDFS中,使用parquet格式保存。
也可以使用insert into foo select * from files("path"="hdfs://xxx.xx.xxx.xx:9000/unload/data1", "format"="parquet", "compression" = "lz4")实现从HDFS中读取文件然后导入到foo表中。
如果我们想扩展starrocks支持的文件系统,实现从其他文件系统读写文件应该从哪些方面入手呢?
以下我们就以insert into files()语句为例,从starrocks的前后端fe和be两方面来分析如何扩展其他文件系统。
1 FE解析过程

1.1 FE端到端框架

starrocks有多种连接方式,这里以mysql client连接方式举例
com/starrocks/qe/ConnectProcessor.java
  1. /**
  2. * Process one mysql connection, receive one pakcet, process, send one packet.
  3. */
  4. public class ConnectProcessor {
  5.     ...
  6.     // process COM_QUERY statement,
  7.     protected void handleQuery() {
  8.         ...
  9.         originStmt = new String(bytes, 1, ending, StandardCharsets.UTF_8);
  10.         ...
  11.         try {
  12.             ...
  13.             try {
  14.                 stmts = com.starrocks.sql.parser.SqlParser.parse(originStmt, ctx.getSessionVariable());
  15.             } catch (ParsingException parsingException) {
  16.                 throw new AnalysisException(parsingException.getMessage());
  17.             }
  18.             for (int i = 0; i < stmts.size(); ++i) {
  19.                 ...
  20.                 parsedStmt = stmts.get(i);
  21.                 ...
  22.                 executor = new StmtExecutor(ctx, parsedStmt);
  23.                 ...
  24.                 executor.execute();
  25.                 ...
  26.             }
  27.         }
  28.         ...
  29.     }
  30.     ...
  31. }
复制代码
启动starrocks会启动一个starrocks实现的mysql server,查询时查询语句会被分配到ConnectProcessor中,被handleQuery方法执行。这个方法先调用解析器将查询语句的字符串解析成多条语句的语法树。然后对每个语法树构造StmtExecutor,然后调用execute()方法来进行查询引擎前端的语义分析、优化等操作。
com/starrocks/qe/StmtExecutor.java
  1. public class StmtExecutor {
  2.     ...
  3.     public void execute() throws Exception {
  4.         ...
  5.         try {
  6.             ...
  7.             try (Timer ignored = Tracers.watchScope("Total")) {
  8.                 ...
  9.                 if (!isForwardToLeader()) {
  10.                     ...
  11.                     if {
  12.                         ...
  13.                     } else {
  14.                         execPlan = StatementPlanner.plan(parsedStmt, context);
  15.                         if (parsedStmt instanceof QueryStatement && context.shouldDumpQuery()) {
  16.                             context.getDumpInfo().setExplainInfo(execPlan.getExplainString(TExplainLevel.COSTS));
  17.                         }
  18.                     }
  19.                     ...
  20.                 }
  21.             }
  22.             ...
  23.         }
  24.         ...
  25.     }
  26.     ...
  27.     if {
  28.         ...
  29.     } else if (parsedStmt instanceof DmlStmt) {
  30.         handleDMLStmtWithProfile(execPlan, (DmlStmt) parsedStmt);
  31.     } ...
  32.    
  33. }
复制代码
StmtExecutor的execute()方法中,会将解析后的语法树传入StatementPlanner的plan()方法中,这个方法就是将语法树经过分析器和优化器生成执行计划的入口。
生成执行计划后,会在exeucte()方法后续的handleDMLStmtWithProfile()函数中来处理insert这种DML语句的执行过程。在该方法中,会获取调度器然后调用调度器去执行上面的执行计划。
com/starrocks/sql/StatementPlanner.java
  1. public class StatementPlanner {
  2.     public static ExecPlan plan(StatementBase stmt, ConnectContext session) {
  3.         if (session instanceof HttpConnectContext) {
  4.             return plan(stmt, session, TResultSinkType.HTTP_PROTOCAL);
  5.         }
  6.         return plan(stmt, session, TResultSinkType.MYSQL_PROTOCAL);
  7.     }
  8.     public static ExecPlan plan(StatementBase stmt, ConnectContext session,
  9.                                 TResultSinkType resultSinkType) {
  10.         ...
  11.         try {
  12.             ...
  13.             try (Timer ignored = Tracers.watchScope("Analyzer")) {
  14.                 Analyzer.analyze(stmt, session);
  15.             }
  16.             ...
  17.             if (stmt instanceof QueryStatement) {
  18.                 return planQuery(stmt, resultSinkType, session, false);
  19.             } else if (stmt instanceof InsertStmt) {
  20.                 return new InsertPlanner().plan((InsertStmt) stmt, session);
  21.             } else if (stmt instanceof UpdateStmt) {
  22.                 return new UpdatePlanner().plan((UpdateStmt) stmt, session);
  23.             } else if (stmt instanceof DeleteStmt) {
  24.                 return new DeletePlanner().plan((DeleteStmt) stmt, session);
  25.             }
  26.         }
  27.         ...
  28.     }
  29. }
复制代码
调用StatementPlanner的plan()方法时,从以上代码可以看出来,首先会调用分析器Analyzer对语法树进行语义分析,然后对于InsertStmt最后会调用InsertPlanner的plan()方法。
com/starrocks/sql/analyzer/Analyzer.java
  1. public class Analyzer {
  2.     private static final Analyzer INSTANCE = new Analyzer(new AnalyzerVisitor());
  3.     public static Analyzer getInstance() {
  4.         return INSTANCE;
  5.     }
  6.     private final AnalyzerVisitor analyzerVisitor;
  7.     private Analyzer(AnalyzerVisitor analyzerVisitor) {
  8.         this.analyzerVisitor = analyzerVisitor;
  9.     }
  10.     public static void analyze(StatementBase statement, ConnectContext context) {
  11.         getInstance().analyzerVisitor.analyze(statement, context);
  12.     }
  13. }
复制代码
分析器的analyze()方法会调用AnalyzerVisitor,这是一个可以对语法树每个节点进行访问的访问者。
com/starrocks/sql/InsertPlanner.java
  1. public class InsertPlanner {
  2.     ...
  3.     public ExecPlan plan(InsertStmt insertStmt, ConnectContext session) {
  4.         ...
  5.         // 语法树转换成逻辑计划
  6.         try (Timer ignore = Tracers.watchScope("Transform")) {
  7.             logicalPlan = new RelationTransformer(columnRefFactory, session).transform(queryRelation);
  8.         }
  9.         ...
  10.         try (Timer ignore = Tracers.watchScope("InsertPlanner")) {
  11.             ...
  12.             // 优化器执行优化输出物理计划
  13.             OptExpression optimizedPlan;
  14.             try (Timer ignore2 = Tracers.watchScope("Optimizer")) {
  15.                 optimizedPlan = optimizer.optimize(
  16.                         session,
  17.                         logicalPlan.getRoot(),
  18.                         requiredPropertySet,
  19.                         new ColumnRefSet(logicalPlan.getOutputColumn()),
  20.                         columnRefFactory);
  21.             }
  22.             ...
  23.             // 将物理计划划分后生成执行计划
  24.             ExecPlan execPlan;
  25.             try (Timer ignore3 = Tracers.watchScope("PlanBuilder")) {
  26.                 execPlan = PlanFragmentBuilder.createPhysicalPlan(
  27.                         optimizedPlan, session, logicalPlan.getOutputColumn(), columnRefFactory,
  28.                         queryRelation.getColumnOutputNames(), TResultSinkType.MYSQL_PROTOCAL, hasOutputFragment);
  29.             }
  30.             ...
  31.             // 如果targetTable是TableFunctionTable,就设置执行计划的sink节点为TableFunctionTableSink
  32.             DataSink dataSink;
  33.             if (targetTable instanceof ...) {
  34.                
  35.             } else if (targetTable instanceof TableFunctionTable) {
  36.                 dataSink = new TableFunctionTableSink((TableFunctionTable) targetTable);
  37.             }
  38.             ...
  39.             PlanFragment sinkFragment = execPlan.getFragments().get(0);
  40.             ...
  41.             sinkFragment.setSink(dataSink);
  42.         }
  43.         ...
  44.     }
  45.     ...
  46. }
复制代码
在InsertPlanner中,会执行语法树转逻辑计划,逻辑计划优化成物理计划,物理计划划分生成执行计划的过程。
以上是FE端在执行INSERT语句时的整体流程,下面我们详细看一下其中INSERT INTO FILES()语句是如何被解析的。
1.2 FE解析INSERT INTO FILES过程

1.2.1 词法语法分析

com/starrocks/sql/parser/StarRocks.g4
  1. insertStatement
  2.     : explainDesc? INSERT setVarHint* (INTO | OVERWRITE) (qualifiedName | (FILES propertyList)) partitionNames?
  3.         (WITH LABEL label=identifier)? columnAliases?
  4.         (queryStatement | (VALUES expressionsWithDefault (',' expressionsWithDefault)*))
  5.     ;
复制代码
INSERT (INTO) FILES语法对应在g4语法文件中如图所示,属于insertStatement
com/starrocks/sql/parser/AstBuilder.java
  1. @Override
  2. public ParseNode visitInsertStatement(StarRocksParser.InsertStatementContext context) {
  3.     ...
  4.     // INSERT INTO FILES(...)
  5.     Map<String, String> tableFunctionProperties = getPropertyList(context.propertyList());
  6.     InsertStmt res = new InsertStmt(tableFunctionProperties, queryStatement, createPos(context));
  7.     res.setOptHints(visitVarHints(context.setVarHint()));
  8.     return res;
  9. }
复制代码
查看Antlr生成的解析器可以看到,解析器将这条语法转化成了InsertStmt语法节点,调用了特定的构造函数。
com/starrocks/sql/ast/InsertStmt.java
  1. public class InsertStmt extends DmlStmt {
  2.     ...
  3.     // Ctor for INSERT INTO FILES(...)
  4.     public InsertStmt(Map<String, String> tableFunctionProperties, QueryStatement queryStatement, NodePosition pos) {
  5.         super(pos);
  6.         this.tblName = new TableName("table_function_catalog", "table_function_db", "table_function_table");
  7.         this.targetColumnNames = null;
  8.         this.targetPartitionNames = null;
  9.         this.queryStatement = queryStatement;
  10.         this.tableFunctionAsTargetTable = true;
  11.         this.tableFunctionProperties = tableFunctionProperties;
  12.     }
  13.     ...
  14. }
复制代码
调用了InsertStmt中这个构造函数
1.2.2 语义分析

com/starrocks/sql/ast/AstVisitor.java
com/starrocks/sql/analyzer/AnalyzerVisitor.java
  1. public class AnalyzerVisitor extends AstVisitor<Void, ConnectContext> {
  2.     public void analyze(StatementBase statement, ConnectContext session) {
  3.         visit(statement, session);
  4.     }
  5.     ...
  6.     @Override
  7.     public Void visitInsertStatement(InsertStmt statement, ConnectContext session) {
  8.         InsertAnalyzer.analyze(statement, session);
  9.         return null;
  10.     }
  11.     ....
  12. }
复制代码
经过解析器生成语法树后,需要经过分析器进行语义分析,分析器使用访问者模式,AnalyzerVisitor通过继承AstVisitor实现了访问语法树上的每个节点。
com/starrocks/sql/analyzer/InsertAnalyzer.java
  1. public class InsertAnalyzer {
  2.     public static void analyze(InsertStmt insertStmt, ConnectContext session) {
  3.         QueryRelation query = insertStmt.getQueryStatement().getQueryRelation();
  4.         new QueryAnalyzer(session).analyze(insertStmt.getQueryStatement());
  5.         List<Table> tables = new ArrayList<>();
  6.         AnalyzerUtils.collectSpecifyExternalTables(insertStmt.getQueryStatement(), tables, Table::isHiveTable);
  7.         tables.stream().map(table -> (HiveTable) table)
  8.                 .forEach(table -> table.useMetadataCache(false));
  9.         /*
  10.          *  Target table
  11.          */
  12.         Table table = getTargetTable(insertStmt, session);
  13.         ...
  14.         insertStmt.setTargetTable(table);
  15.         insertStmt.setTargetColumns(targetColumns);
  16.         if (session.getDumpInfo() != null) {
  17.             session.getDumpInfo().addTable(insertStmt.getTableName().getDb(), table);
  18.         }
  19.     }
  20.     ...
  21.     private static Table getTargetTable(InsertStmt insertStmt, ConnectContext session) {
  22.         if (insertStmt.useTableFunctionAsTargetTable()) {
  23.             return insertStmt.makeTableFunctionTable();
  24.         }
  25.         ...
  26.     }
  27.     ...
  28. }
复制代码
访问InsertStmt节点时,调用了InsertAnalyzer.analyze方法对insert语法进行语义分析,其中getTargetTable会根据insert语句分析并生成对应类型的语义Table。最后将生成的语义Table赋值给了语法树。
getTargetTable开始先判断insertStmt是否是useTableFunctionAsTargetTable,从InsertStmt的构造函数可以看出,INSERT (INTO) FILES语句对应的tableFunctionAsTargetTable是true。因此继续调用makeTableFunctionTable来生成Table。
com/starrocks/sql/ast/InsertStmt.java
  1. public class InsertStmt extends DmlStmt {
  2.     ...
  3.     public Table makeTableFunctionTable() {
  4.         ...
  5.         // parse table function properties
  6.         Map<String, String> props = getTableFunctionProperties();
  7.         String single = props.getOrDefault("single", "false");
  8.         if (!single.equalsIgnoreCase("true") && !single.equalsIgnoreCase("false")) {
  9.             throw new SemanticException("got invalid parameter "single" = "%s", expect a boolean value (true or false).",
  10.                     single);
  11.         }
  12.         boolean writeSingleFile = single.equalsIgnoreCase("true");
  13.         String path = props.get("path");
  14.         String format = props.get("format");
  15.         String partitionBy = props.get("partition_by");
  16.         String compressionType = props.get("compression");
  17.         ...
  18.         if (writeSingleFile) {
  19.             return new TableFunctionTable(path, format, compressionType, columns, null, true, props);
  20.         }
  21.         if (partitionBy == null) {
  22.             // prepend `data_` if path ends with forward slash
  23.             if (path.endsWith("/")) {
  24.                 path += "data_";
  25.             }
  26.             return new TableFunctionTable(path, format, compressionType, columns, null, false, props);
  27.         }
  28.         
  29.         ...
  30.         return new TableFunctionTable(path, format, compressionType, columns, partitionColumnIDs, false, props);
  31.     }
  32. }
复制代码
在makeTableFunctionTable中,解析了FILES(...)算子传入的参数,然后根据传入的参数调用了TableFunctionTable的构造函数,将语法节点转换成了语义节点。
分析器结束后就会给InsertStmt中赋值targetTable,这个表就表示INSERT INTO会将数据插入targetTable中。
1.2.3 生成执行计划

根据上文InsertPlanner的plan()方法中的步骤,分析器执行完成后就生成了targetTable,后续的转换和优化过程只是针对INSERT INTO FILES() SELECT ...后面的查询语句,分析器生成的targetTable会在最后转换成TableFunctionTableSink算子赋值给执行计划。
com/starrocks/planner/PlanFragment.java
  1. public class PlanFragment extends TreeNode<PlanFragment> {
  2.     ...
  3.     public TPlanFragment toThrift() {
  4.         TPlanFragment result = new TPlanFragment();
  5.         ...
  6.         if (sink != null) {
  7.             result.setOutput_sink(sink.toThrift());
  8.         }
  9.         ...
  10.         return result;
  11.     }
  12.     ...
  13. }
复制代码
InsertPlanner最后生成的执行计划就是由PlanFragment组成的,这个对象会通过thrift被发送给BE。通过这里的toThrift()方法可以看出,如果存在sink节点,则调用sink节点的toThrift()方法,然后将其赋值给thrift对象TPlanFragment的output_sink。
com/starrocks/planner/TableFunctionTableSink.java
  1. public class TableFunctionTableSink extends DataSink {
  2.     ...
  3.     @Override
  4.     protected TDataSink toThrift() {
  5.         TTableFunctionTableSink tTableFunctionTableSink = new TTableFunctionTableSink();
  6.         tTableFunctionTableSink.setTarget_table(table.toTTableFunctionTable());
  7.         TCloudConfiguration tCloudConfiguration = new TCloudConfiguration();
  8.         cloudConfiguration.toThrift(tCloudConfiguration);
  9.         tTableFunctionTableSink.setCloud_configuration(tCloudConfiguration);
  10.         // 设置Sink类型为TABLE_FUNCTION_TABLE_SINK
  11.         TDataSink tDataSink = new TDataSink(TDataSinkType.TABLE_FUNCTION_TABLE_SINK);
  12.         tDataSink.setTable_function_table_sink(tTableFunctionTableSink);
  13.         return tDataSink;
  14.     }
  15.     ...
  16. }
复制代码
以上sink的toThrift()方法就会调用到TableFunctionTableSink的toThrift()方法。其中将TDataSink的类型设置成了TABLE_FUNCTION_TABLE_SINK。
1.2.4 调度器转发

com/starrocks/rpc/PBackendService.java
  1. public interface PBackendService {
  2.     @ProtobufRPC(serviceName = "PBackendService", methodName = "exec_plan_fragment",
  3.             attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 60000)
  4.     Future<PExecPlanFragmentResult> execPlanFragmentAsync(PExecPlanFragmentRequest request);
  5.     ...
  6. }
复制代码
调度器通过一系列函数调用(省略中间过程,可通过这个方法向上追溯调用链,也可以通过调度器的exec方法向下追溯调用链),最终会通过调用execPlanFragmentAsync这个RPC将执行计划发送到BE。
1.3 BE执行TDataSink算子的过程

1.3.1 RPC入口

src/service/internal_service.h
  1. template <typename T>
  2. class PInternalServiceImplBase : public T {
  3. public:
  4.     ...
  5.     void exec_plan_fragment(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request,
  6.                             PExecPlanFragmentResult* result, google::protobuf::Closure* done) override;
  7.     ...
  8. }
复制代码
以上RPC到BE端对应的函数就是PInternalServiceImplBase中的exec_plan_fragment()方法。
src/service/internal_service.cpp
  1. template <typename T>
  2. Status PInternalServiceImplBase<T>::_exec_plan_fragment_by_pipeline(const TExecPlanFragmentParams& t_common_param,
  3.                                                                     const TExecPlanFragmentParams& t_unique_request) {
  4.     pipeline::FragmentExecutor fragment_executor;
  5.     auto status = fragment_executor.prepare(_exec_env, t_common_param, t_unique_request);
  6.     if (status.ok()) {
  7.         return fragment_executor.execute(_exec_env);
  8.     } else {
  9.         return status.is_duplicate_rpc_invocation() ? Status::OK() : status;
  10.     }
  11. }
复制代码
以上exec_plan_fragment()最终会调用到_exec_plan_fragment_by_pipeline()这个私有方法中,这个方法初始化了一个FragmentExecutor,这就是BE去运行执行计划的执行器了。执行器在这里调用了prepare()进行相关准备工作,然后调用execute()方法运行整个计划。
1.3.2 准备BE执行计划

src/exec/pipeline/fragment_executor.cpp
  1. Status FragmentExecutor::prepare(ExecEnv* exec_env, const TExecPlanFragmentParams& common_request,
  2.                                  const TExecPlanFragmentParams& unique_request) {
  3.     ...
  4.     {
  5.         SCOPED_RAW_TIMER(&profiler.prepare_runtime_state_time);
  6.         RETURN_IF_ERROR(_prepare_workgroup(request));
  7.         RETURN_IF_ERROR(_prepare_runtime_state(exec_env, request));
  8.         // thrift对象转成BE执行计划树
  9.         RETURN_IF_ERROR(_prepare_exec_plan(exec_env, request));
  10.         RETURN_IF_ERROR(_prepare_global_dict(request));
  11.     }
  12.     {
  13.         SCOPED_RAW_TIMER(&profiler.prepare_pipeline_driver_time);
  14.         // 准备pipeline driver,解析sink节点
  15.         RETURN_IF_ERROR(_prepare_pipeline_driver(exec_env, request));
  16.         RETURN_IF_ERROR(_prepare_stream_load_pipe(exec_env, request));
  17.     }
  18.     ...
  19. }
  20. Status FragmentExecutor::_prepare_pipeline_driver(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request) {
  21.     ...
  22.     std::unique_ptr<DataSink> datasink;
  23.     if (request.isset_output_sink()) {
  24.         const auto& tsink = request.output_sink();
  25.         ...
  26.         RETURN_IF_ERROR(DataSink::create_data_sink(runtime_state, tsink, fragment.output_exprs, params,
  27.                                                    request.sender_id(), plan->row_desc(), &datasink));
  28.         // 将fe的sink节点转换成BE的TableFunctionTableSinkOperatorFactory
  29.         RETURN_IF_ERROR(_decompose_data_sink_to_operator(runtime_state, &context, request, datasink, tsink,
  30.                                                          fragment.output_exprs));
  31.     }
  32.     ...
  33.     // 这里将调用pipeline里面所有factory生成真正的BE operator,在这里就会生成TableFunctionTableSinkOperator
  34.     if (!unready_pipeline_groups.empty()) {
  35.         RETURN_IF_ERROR(create_lazy_instantiate_drivers_pipeline(
  36.                 runtime_state, &context, _query_ctx, _fragment_ctx.get(), std::move(unready_pipeline_groups), drivers));
  37.     }
  38.     ...
  39. }
  40. Status FragmentExecutor::_decompose_data_sink_to_operator(RuntimeState* runtime_state, PipelineBuilderContext* context,
  41.                                                           const UnifiedExecPlanFragmentParams& request,
  42.                                                           std::unique_ptr<starrocks::DataSink>& datasink,
  43.                                                           const TDataSink& thrift_sink,
  44.                                                           const std::vector<TExpr>& output_exprs) {
  45.     ...
  46.     if (typeid(*datasink) == ...) {
  47.         ...
  48.     } else if (typeid(*datasink) == typeid(starrocks::TableFunctionTableSink)) {
  49.         ...
  50.         auto op = std::make_shared<TableFunctionTableSinkOperatorFactory>(
  51.                 context->next_operator_id(), target_table.path, target_table.file_format, target_table.compression_type,
  52.                 output_expr_ctxs, partition_expr_ctxs, column_names, partition_column_names,
  53.                 target_table.write_single_file, thrift_sink.table_function_table_sink.cloud_configuration,
  54.                 fragment_ctx);
  55.         ...
  56.     }
  57. }
复制代码
以上代码展示了Sink节点在BE的转换过程。
经过转换,最终生成了BE的TableFunctionTableSinkOperator。
src/exec/pipeline/sink/table_function_table_sink_operator.cpp
  1. Status TableFunctionTableSinkOperator::push_chunk(RuntimeState* state, const ChunkPtr& chunk) {
  2.     if (_partition_exprs.empty()) {
  3.         if (_partition_writers.empty()) {
  4.             auto writer = std::make_unique<RollingAsyncParquetWriter>(_make_table_info(_path), _output_exprs,
  5.                                                                       _common_metrics.get(), add_commit_info, state,
  6.                                                                       _driver_sequence);
  7.             RETURN_IF_ERROR(writer->init());
  8.             _partition_writers.insert({"default writer", std::move(writer)});
  9.         }
  10.         return _partition_writers["default writer"]->append_chunk(chunk.get(), state);
  11.     }
  12.     ...
  13.     return _partition_writers[partition_location]->append_chunk(chunk.get(), state);
  14. }
复制代码
pipeline执行时,对于TableFunctionTableSinkOperator调用push_chunk()方法向这个operator中写入数据(chunk表示一个数据块)。可以看到这里使用了RollingAsyncParquetWriter,首先调用了init()方法初始化writer,最后调用writer的append_chunk()方法将数据块写入。
1.3.3 ParquetWriter初始化

src/exec/parquet_writer.cpp
  1. Status RollingAsyncParquetWriter::init() {
  2.     ASSIGN_OR_RETURN(
  3.             _fs, FileSystem::CreateUniqueFromString(_table_info.partition_location, FSOptions(&_table_info.cloud_conf)))
  4.     _schema = _table_info.schema;
  5.     _partition_location = _table_info.partition_location;
  6.     ::parquet::WriterProperties::Builder builder;
  7.     _table_info.enable_dictionary ? builder.enable_dictionary() : builder.disable_dictionary();
  8.     ASSIGN_OR_RETURN(auto compression_codec,
  9.                      parquet::ParquetBuildHelper::convert_compression_type(_table_info.compress_type));
  10.     builder.compression(compression_codec);
  11.     builder.version(::parquet::ParquetVersion::PARQUET_2_0);
  12.     _properties = builder.build();
  13.     return Status::OK();
  14. }
复制代码
init()方法中首先调用了FileSystem::CreateUniqueFromString()方法进行fs文件系统初始化,然后初始化了写入parquet的一些配置WriterProperties。
src/fs/fs.cpp
  1. StatusOr<std::unique_ptr<FileSystem>> FileSystem::CreateUniqueFromString(std::string_view uri, FSOptions options) {
  2.     if (fs::is_posix_uri(uri)) {
  3.         return new_fs_posix();
  4.     }
  5.     if (fs::is_s3_uri(uri)) {
  6.         return new_fs_s3(options);
  7.     }
  8.     if (fs::is_azure_uri(uri) || fs::is_gcs_uri(uri)) {
  9.         // TODO(SmithCruise):
  10.         // Now Azure storage and Google Cloud Storage both are using LibHdfs, we can use cpp sdk instead in the future.
  11.         return new_fs_hdfs(options);
  12.     }
  13. #ifdef USE_STAROS
  14.     if (is_starlet_uri(uri)) {
  15.         return new_fs_starlet();
  16.     }
  17. #endif
  18.     // Since almost all famous storage are compatible with Hadoop FileSystem, it's always a choice to fallback using
  19.     // Hadoop FileSystem to access storage.
  20.     return new_fs_hdfs(options);
  21. }
复制代码
初始化文件系统会检查FILES()传入的path参数,path在这个函数中是uri参数,这里会判断uri是否是posix本地路径,是否是s3, azure格式,如果都不是会返回hdfs文件系统。
要扩展其他文件系统,这里需要定义一种uri的格式,然后在这里判断是否是该文件系统对应的格式,如果是,则构造一个自定义的FileSystem。
src/fs/fs.h
  1. class FileSystem {
  2. public:
  3.     enum Type { POSIX, S3, HDFS, BROKER, MEMORY, STARLET };
  4.     // Governs if/how the file is created.
  5.     //
  6.     // enum value                   | file exists       | file does not exist
  7.     // -----------------------------+-------------------+--------------------
  8.     // CREATE_OR_OPEN_WITH_TRUNCATE | opens + truncates | creates
  9.     // CREATE_OR_OPEN               | opens             | creates
  10.     // MUST_CREATE                  | fails             | creates
  11.     // MUST_EXIST                   | opens             | fails
  12.     enum OpenMode { CREATE_OR_OPEN_WITH_TRUNCATE, CREATE_OR_OPEN, MUST_CREATE, MUST_EXIST };
  13.     ...
  14.     // Create a brand new sequentially-readable file with the specified name.
  15.     //  If the file does not exist, returns a non-OK status.
  16.     //
  17.     // The returned file will only be accessed by one thread at a time.
  18.     StatusOr<std::unique_ptr<SequentialFile>> new_sequential_file(const std::string& fname) {
  19.         return new_sequential_file(SequentialFileOptions(), fname);
  20.     }
  21.     virtual StatusOr<std::unique_ptr<SequentialFile>> new_sequential_file(const SequentialFileOptions& opts,
  22.                                                                           const std::string& fname) = 0;
  23.     // Create a brand new random access read-only file with the
  24.     // specified name.
  25.     //
  26.     // The returned file will only be accessed by one thread at a time.
  27.     StatusOr<std::unique_ptr<RandomAccessFile>> new_random_access_file(const std::string& fname) {
  28.         return new_random_access_file(RandomAccessFileOptions(), fname);
  29.     }
  30.     virtual StatusOr<std::unique_ptr<RandomAccessFile>> new_random_access_file(const RandomAccessFileOptions& opts,
  31.                                                                                const std::string& fname) = 0;
  32.     // Create an object that writes to a new file with the specified
  33.     // name.  Deletes any existing file with the same name and creates a
  34.     // new file.
  35.     //
  36.     // The returned file will only be accessed by one thread at a time.
  37.     virtual StatusOr<std::unique_ptr<WritableFile>> new_writable_file(const std::string& fname) = 0;
  38.     // Like the previous new_writable_file, but allows options to be
  39.     // specified.
  40.     virtual StatusOr<std::unique_ptr<WritableFile>> new_writable_file(const WritableFileOptions& opts,
  41.                                                                       const std::string& fname) = 0;
  42.     // Returns OK if the path exists.
  43.     //         NotFound if the named file does not exist,
  44.     //                  the calling process does not have permission to determine
  45.     //                  whether this file exists, or if the path is invalid.
  46.     //         IOError if an IO Error was encountered
  47.     virtual Status path_exists(const std::string& fname) = 0;
  48.     // Store in *result the names of the children of the specified directory.
  49.     // The names are relative to "dir".
  50.     // Original contents of *results are dropped.
  51.     // Returns OK if "dir" exists and "*result" contains its children.
  52.     //         NotFound if "dir" does not exist, the calling process does not have
  53.     //                  permission to access "dir", or if "dir" is invalid.
  54.     //         IOError if an IO Error was encountered
  55.     virtual Status get_children(const std::string& dir, std::vector<std::string>* result) = 0;
  56.     // Iterate the specified directory and call given callback function with child's
  57.     // name. This function continues execution until all children have been iterated
  58.     // or callback function return false.
  59.     // The names are relative to "dir".
  60.     //
  61.     // The function call extra cost is acceptable. Compared with returning all children
  62.     // into a given vector, the performance of this method is 5% worse. However this
  63.     // approach is more flexiable and efficient in fulfilling other requirements.
  64.     //
  65.     // Returns OK if "dir" exists.
  66.     //         NotFound if "dir" does not exist, the calling process does not have
  67.     //                  permission to access "dir", or if "dir" is invalid.
  68.     //         IOError if an IO Error was encountered
  69.     virtual Status iterate_dir(const std::string& dir, const std::function<bool(std::string_view)>& cb) = 0;
  70.     // `iterate_dir2` is similar to `iterate_dir` but in addition to returning the directory entry name, it
  71.     // also returns some file statistics.
  72.     virtual Status iterate_dir2(const std::string& dir, const std::function<bool(DirEntry)>& cb) = 0;
  73.     // Delete the named file.
  74.     // FIXME: If the named file does not exist, OK or NOT_FOUND is returned, depend on the implementation.
  75.     virtual Status delete_file(const std::string& fname) = 0;
  76.     // Create the specified directory.
  77.     // NOTE: It will return error if the path already exist(not necessarily as a directory)
  78.     virtual Status create_dir(const std::string& dirname) = 0;
  79.     // Creates directory if missing.
  80.     // Return OK if it exists, or successful in Creating.
  81.     virtual Status create_dir_if_missing(const std::string& dirname, bool* created = nullptr) = 0;
  82.     // Create directory for every element of 'dirname' that does not already exist.
  83.     // If 'dirname' already exists, the function does nothing (this condition is not treated as an error).
  84.     virtual Status create_dir_recursive(const std::string& dirname) = 0;
  85.     // Delete the specified directory.
  86.     // NOTE: The dir must be empty.
  87.     virtual Status delete_dir(const std::string& dirname) = 0;
  88.     // Deletes the contents of 'dirname' (if it is a directory) and the contents of all its subdirectories,
  89.     // recursively, then deletes 'dirname' itself. Symlinks are not followed (symlink is removed, not its target).
  90.     virtual Status delete_dir_recursive(const std::string& dirname) = 0;
  91.     // Synchronize the entry for a specific directory.
  92.     virtual Status sync_dir(const std::string& dirname) = 0;
  93.     // Checks if the file is a directory. Returns an error if it doesn't
  94.     // exist, otherwise return true or false.
  95.     virtual StatusOr<bool> is_directory(const std::string& path) = 0;
  96.     // Canonicalize 'path' by applying the following conversions:
  97.     // - Converts a relative path into an absolute one using the cwd.
  98.     // - Converts '.' and '..' references.
  99.     // - Resolves all symbolic links.
  100.     //
  101.     // All directory entries in 'path' must exist on the filesystem.
  102.     virtual Status canonicalize(const std::string& path, std::string* result) = 0;
  103.     virtual StatusOr<uint64_t> get_file_size(const std::string& fname) = 0;
  104.     // Get the last modification time by given 'fname'.
  105.     virtual StatusOr<uint64_t> get_file_modified_time(const std::string& fname) = 0;
  106.     // Rename file src to target.
  107.     virtual Status rename_file(const std::string& src, const std::string& target) = 0;
  108.     // create a hard-link
  109.     virtual Status link_file(const std::string& /*old_path*/, const std::string& /*new_path*/) = 0;
  110.     // Determines the information about the filesystem on which the pathname 'path' is located.
  111.     virtual StatusOr<SpaceInfo> space(const std::string& path) { return Status::NotSupported("FileSystem::space()"); }
  112.     // Given the path to a remote file, delete the file's cache on the local file system, if any.
  113.     // On success, Status::OK is returned. If there is no cache, Status::NotFound is returned.
  114.     virtual Status drop_local_cache(const std::string& path) { return Status::NotFound(path); }
  115.     // Batch delete the given files.
  116.     // return ok if all success (not found error ignored), error if any failed and the message indicates the fail message
  117.     // possibly stop at the first error if is simulating batch deletes.
  118.     virtual Status delete_files(const std::vector<std::string>& paths) {
  119.         for (auto&& path : paths) {
  120.             auto st = delete_file(path);
  121.             if (!st.ok() && !st.is_not_found()) {
  122.                 return st;
  123.             }
  124.         }
  125.         return Status::OK();
  126.     }
  127. };
复制代码
starrocks的FileSystem可以扩展实现自己的xxxFileSystem,需要实现的接口就是上面这个基类的方法。主要有:

  • 创建顺序读的文件
  • 创建随机读的文件
  • 创建可写的文件
  • 一系列路径操作
  • 获取文件信息
  • 删除文件
如果可以扩展实现自己的xxxFileSystem,那么就可以调用new_writable_file()创建一个可写文件,然后就可以使用arrow::parquet:arquetFileWriter写入文件了。
1.3.4 ParquetWriter写入数据

src/exec/parquet_writer.cpp
  1. Status RollingAsyncParquetWriter::append_chunk(Chunk* chunk, RuntimeState* state) {
  2.     RETURN_IF_ERROR(get_io_status());
  3.     if (_writer == nullptr) {
  4.         RETURN_IF_ERROR(_new_file_writer());
  5.     }
  6.     // exceed file size
  7.     if (_max_file_size != -1 && _writer->file_size() > _max_file_size) {
  8.         RETURN_IF_ERROR(close_current_writer(state));
  9.         RETURN_IF_ERROR(_new_file_writer());
  10.     }
  11.     return _writer->write(chunk);
  12. }
  13. Status RollingAsyncParquetWriter::_new_file_writer() {
  14.     std::string new_file_location = _new_file_location();
  15.     WritableFileOptions options{.sync_on_close = false, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE};
  16.     ASSIGN_OR_RETURN(auto writable_file, _fs->new_writable_file(options, new_file_location))
  17.     _writer = std::make_shared<starrocks::parquet::AsyncFileWriter>(
  18.             std::move(writable_file), new_file_location, _partition_location, _properties, _schema, _output_expr_ctxs,
  19.             ExecEnv::GetInstance()->pipeline_sink_io_pool(), _parent_profile, _max_file_size);
  20.     auto st = _writer->init();
  21.     return st;
  22. }
复制代码
append_chunk()方法会初始化一个AsyncFileWriter,然后调用它的write()方法写入数据。
src/formats/parquet/file_writer.cpp
  1. Status FileWriterBase::init() {
  2.     _writer = ::parquet::ParquetFileWriter::Open(_outstream, _schema, _properties);
  3.     if (_writer == nullptr) {
  4.         return Status::InternalError("Failed to create file writer");
  5.     }
  6.     return Status::OK();
  7. }
  8. void FileWriterBase::_generate_chunk_writer() {
  9.     DCHECK(_writer != nullptr);
  10.     if (_chunk_writer == nullptr) {
  11.         auto rg_writer = _writer->AppendBufferedRowGroup();
  12.         _chunk_writer = std::make_unique<ChunkWriter>(rg_writer, _type_descs, _schema, _eval_func);
  13.     }
  14. }
  15. Status FileWriterBase::write(Chunk* chunk) {
  16.     if (!chunk->has_rows()) {
  17.         return Status::OK();
  18.     }
  19.     _generate_chunk_writer();
  20.     RETURN_IF_ERROR(_chunk_writer->write(chunk));
  21.     if (_chunk_writer->estimated_buffered_bytes() > _max_row_group_size && !is_last_row_group()) {
  22.         RETURN_IF_ERROR(_flush_row_group());
  23.     }
  24.     return Status::OK();
  25. }
复制代码
FileWriterBase是AsyncFileWriter的基类,调用AsyncFileWriter的write()方法如上,可以看到其实本质上就是调用了parquet:arquetFileWriter的write()方法。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

惊落一身雪

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表