例如可以使用insert into files("path"="hdfs://", "format"="parquet", "compression" = "lz4") select * from sales_records实现将表sales_records中的数据导出到HDFS中,使用parquet格式保存。
也可以使用insert into foo select * from files("path"="hdfs://", "format"="parquet", "compression" = "lz4")实现从HDFS中读取文件然后导入到foo表中。
以下我们就以insert into files()语句为例,从starrocks的前后端fe和be两方面来分析如何扩展其他文件系统。
1 FE解析过程

1.1 FE端到端框架

starrocks有多种连接方式,这里以mysql client连接方式举例
  1. /**
  2. * Process one mysql connection, receive one pakcet, process, send one packet.
  3. */
  4. public class ConnectProcessor {
  5.     ...
  6.     // process COM_QUERY statement,
  7.     protected void handleQuery() {
  8.         ...
  9.         originStmt = new String(bytes, 1, ending, StandardCharsets.UTF_8);
  10.         ...
  11.         try {
  12.             ...
  13.             try {
  14.                 stmts = com.starrocks.sql.parser.SqlParser.parse(originStmt, ctx.getSessionVariable());
  15.             } catch (ParsingException parsingException) {
  16.                 throw new AnalysisException(parsingException.getMessage());
  17.             }
  18.             for (int i = 0; i < stmts.size(); ++i) {
  19.                 ...
  20.                 parsedStmt = stmts.get(i);
  21.                 ...
  22.                 executor = new StmtExecutor(ctx, parsedStmt);
  23.                 ...
  24.                 executor.execute();
  25.                 ...
  26.             }
  27.         }
  28.         ...
  29.     }
  30.     ...
  31. }
启动starrocks会启动一个starrocks实现的mysql server,查询时查询语句会被分配到ConnectProcessor中,被handleQuery方法执行。这个方法先调用解析器将查询语句的字符串解析成多条语句的语法树。然后对每个语法树构造StmtExecutor,然后调用execute()方法来进行查询引擎前端的语义分析、优化等操作。
  1. public class StmtExecutor {
  2.     ...
  3.     public void execute() throws Exception {
  4.         ...
  5.         try {
  6.             ...
  7.             try (Timer ignored = Tracers.watchScope("Total")) {
  8.                 ...
  9.                 if (!isForwardToLeader()) {
  10.                     ...
  11.                     if {
  12.                         ...
  13.                     } else {
  14.                         execPlan = StatementPlanner.plan(parsedStmt, context);
  15.                         if (parsedStmt instanceof QueryStatement && context.shouldDumpQuery()) {
  16.                             context.getDumpInfo().setExplainInfo(execPlan.getExplainString(TExplainLevel.COSTS));
  17.                         }
  18.                     }
  19.                     ...
  20.                 }
  21.             }
  22.             ...
  23.         }
  24.         ...
  25.     }
  26.     ...
  27.     if {
  28.         ...
  29.     } else if (parsedStmt instanceof DmlStmt) {
  30.         handleDMLStmtWithProfile(execPlan, (DmlStmt) parsedStmt);
  31.     } ...
  33. }
  1. public class StatementPlanner {
  2.     public static ExecPlan plan(StatementBase stmt, ConnectContext session) {
  3.         if (session instanceof HttpConnectContext) {
  4.             return plan(stmt, session, TResultSinkType.HTTP_PROTOCAL);
  5.         }
  6.         return plan(stmt, session, TResultSinkType.MYSQL_PROTOCAL);
  7.     }
  8.     public static ExecPlan plan(StatementBase stmt, ConnectContext session,
  9.                                 TResultSinkType resultSinkType) {
  10.         ...
  11.         try {
  12.             ...
  13.             try (Timer ignored = Tracers.watchScope("Analyzer")) {
  14.                 Analyzer.analyze(stmt, session);
  15.             }
  16.             ...
  17.             if (stmt instanceof QueryStatement) {
  18.                 return planQuery(stmt, resultSinkType, session, false);
  19.             } else if (stmt instanceof InsertStmt) {
  20.                 return new InsertPlanner().plan((InsertStmt) stmt, session);
  21.             } else if (stmt instanceof UpdateStmt) {
  22.                 return new UpdatePlanner().plan((UpdateStmt) stmt, session);
  23.             } else if (stmt instanceof DeleteStmt) {
  24.                 return new DeletePlanner().plan((DeleteStmt) stmt, session);
  25.             }
  26.         }
  27.         ...
  28.     }
  29. }
  1. public class Analyzer {
  2.     private static final Analyzer INSTANCE = new Analyzer(new AnalyzerVisitor());
  3.     public static Analyzer getInstance() {
  4.         return INSTANCE;
  5.     }
  6.     private final AnalyzerVisitor analyzerVisitor;
  7.     private Analyzer(AnalyzerVisitor analyzerVisitor) {
  8.         this.analyzerVisitor = analyzerVisitor;
  9.     }
  10.     public static void analyze(StatementBase statement, ConnectContext context) {
  11.         getInstance().analyzerVisitor.analyze(statement, context);
  12.     }
  13. }
  1. public class InsertPlanner {
  2.     ...
  3.     public ExecPlan plan(InsertStmt insertStmt, ConnectContext session) {
  4.         ...
  5.         // 语法树转换成逻辑计划
  6.         try (Timer ignore = Tracers.watchScope("Transform")) {
  7.             logicalPlan = new RelationTransformer(columnRefFactory, session).transform(queryRelation);
  8.         }
  9.         ...
  10.         try (Timer ignore = Tracers.watchScope("InsertPlanner")) {
  11.             ...
  12.             // 优化器执行优化输出物理计划
  13.             OptExpression optimizedPlan;
  14.             try (Timer ignore2 = Tracers.watchScope("Optimizer")) {
  15.                 optimizedPlan = optimizer.optimize(
  16.                         session,
  17.                         logicalPlan.getRoot(),
  18.                         requiredPropertySet,
  19.                         new ColumnRefSet(logicalPlan.getOutputColumn()),
  20.                         columnRefFactory);
  21.             }
  22.             ...
  23.             // 将物理计划划分后生成执行计划
  24.             ExecPlan execPlan;
  25.             try (Timer ignore3 = Tracers.watchScope("PlanBuilder")) {
  26.                 execPlan = PlanFragmentBuilder.createPhysicalPlan(
  27.                         optimizedPlan, session, logicalPlan.getOutputColumn(), columnRefFactory,
  28.                         queryRelation.getColumnOutputNames(), TResultSinkType.MYSQL_PROTOCAL, hasOutputFragment);
  29.             }
  30.             ...
  31.             // 如果targetTable是TableFunctionTable,就设置执行计划的sink节点为TableFunctionTableSink
  32.             DataSink dataSink;
  33.             if (targetTable instanceof ...) {
  35.             } else if (targetTable instanceof TableFunctionTable) {
  36.                 dataSink = new TableFunctionTableSink((TableFunctionTable) targetTable);
  37.             }
  38.             ...
  39.             PlanFragment sinkFragment = execPlan.getFragments().get(0);
  40.             ...
  41.             sinkFragment.setSink(dataSink);
  42.         }
  43.         ...
  44.     }
  45.     ...
  46. }
以上是FE端在执行INSERT语句时的整体流程,下面我们详细看一下其中INSERT INTO FILES()语句是如何被解析的。

1.2.1 词法语法分析

  1. insertStatement
  2.     : explainDesc? INSERT setVarHint* (INTO | OVERWRITE) (qualifiedName | (FILES propertyList)) partitionNames?
  3.         (WITH LABEL label=identifier)? columnAliases?
  4.         (queryStatement | (VALUES expressionsWithDefault (',' expressionsWithDefault)*))
  5.     ;
INSERT (INTO) FILES语法对应在g4语法文件中如图所示,属于insertStatement
  1. @Override
  2. public ParseNode visitInsertStatement(StarRocksParser.InsertStatementContext context) {
  3.     ...
  4.     // INSERT INTO FILES(...)
  5.     Map<String, String> tableFunctionProperties = getPropertyList(context.propertyList());
  6.     InsertStmt res = new InsertStmt(tableFunctionProperties, queryStatement, createPos(context));
  7.     res.setOptHints(visitVarHints(context.setVarHint()));
  8.     return res;
  9. }
  1. public class InsertStmt extends DmlStmt {
  2.     ...
  3.     // Ctor for INSERT INTO FILES(...)
  4.     public InsertStmt(Map<String, String> tableFunctionProperties, QueryStatement queryStatement, NodePosition pos) {
  5.         super(pos);
  6.         this.tblName = new TableName("table_function_catalog", "table_function_db", "table_function_table");
  7.         this.targetColumnNames = null;
  8.         this.targetPartitionNames = null;
  9.         this.queryStatement = queryStatement;
  10.         this.tableFunctionAsTargetTable = true;
  11.         this.tableFunctionProperties = tableFunctionProperties;
  12.     }
  13.     ...
  14. }
1.2.2 语义分析

  1. public class AnalyzerVisitor extends AstVisitor<Void, ConnectContext> {
  2.     public void analyze(StatementBase statement, ConnectContext session) {
  3.         visit(statement, session);
  4.     }
  5.     ...
  6.     @Override
  7.     public Void visitInsertStatement(InsertStmt statement, ConnectContext session) {
  8.         InsertAnalyzer.analyze(statement, session);
  9.         return null;
  10.     }
  11.     ....
  12. }
  1. public class InsertAnalyzer {
  2.     public static void analyze(InsertStmt insertStmt, ConnectContext session) {
  3.         QueryRelation query = insertStmt.getQueryStatement().getQueryRelation();
  4.         new QueryAnalyzer(session).analyze(insertStmt.getQueryStatement());
  5.         List<Table> tables = new ArrayList<>();
  6.         AnalyzerUtils.collectSpecifyExternalTables(insertStmt.getQueryStatement(), tables, Table::isHiveTable);
  7. -> (HiveTable) table)
  8.                 .forEach(table -> table.useMetadataCache(false));
  9.         /*
  10.          *  Target table
  11.          */
  12.         Table table = getTargetTable(insertStmt, session);
  13.         ...
  14.         insertStmt.setTargetTable(table);
  15.         insertStmt.setTargetColumns(targetColumns);
  16.         if (session.getDumpInfo() != null) {
  17.             session.getDumpInfo().addTable(insertStmt.getTableName().getDb(), table);
  18.         }
  19.     }
  20.     ...
  21.     private static Table getTargetTable(InsertStmt insertStmt, ConnectContext session) {
  22.         if (insertStmt.useTableFunctionAsTargetTable()) {
  23.             return insertStmt.makeTableFunctionTable();
  24.         }
  25.         ...
  26.     }
  27.     ...
  28. }
getTargetTable开始先判断insertStmt是否是useTableFunctionAsTargetTable,从InsertStmt的构造函数可以看出,INSERT (INTO) FILES语句对应的tableFunctionAsTargetTable是true。因此继续调用makeTableFunctionTable来生成Table。
  1. public class InsertStmt extends DmlStmt {
  2.     ...
  3.     public Table makeTableFunctionTable() {
  4.         ...
  5.         // parse table function properties
  6.         Map<String, String> props = getTableFunctionProperties();
  7.         String single = props.getOrDefault("single", "false");
  8.         if (!single.equalsIgnoreCase("true") && !single.equalsIgnoreCase("false")) {
  9.             throw new SemanticException("got invalid parameter "single" = "%s", expect a boolean value (true or false).",
  10.                     single);
  11.         }
  12.         boolean writeSingleFile = single.equalsIgnoreCase("true");
  13.         String path = props.get("path");
  14.         String format = props.get("format");
  15.         String partitionBy = props.get("partition_by");
  16.         String compressionType = props.get("compression");
  17.         ...
  18.         if (writeSingleFile) {
  19.             return new TableFunctionTable(path, format, compressionType, columns, null, true, props);
  20.         }
  21.         if (partitionBy == null) {
  22.             // prepend `data_` if path ends with forward slash
  23.             if (path.endsWith("/")) {
  24.                 path += "data_";
  25.             }
  26.             return new TableFunctionTable(path, format, compressionType, columns, null, false, props);
  27.         }
  29.         ...
  30.         return new TableFunctionTable(path, format, compressionType, columns, partitionColumnIDs, false, props);
  31.     }
  32. }
分析器结束后就会给InsertStmt中赋值targetTable,这个表就表示INSERT INTO会将数据插入targetTable中。
1.2.3 生成执行计划

根据上文InsertPlanner的plan()方法中的步骤,分析器执行完成后就生成了targetTable,后续的转换和优化过程只是针对INSERT INTO FILES() SELECT ...后面的查询语句,分析器生成的targetTable会在最后转换成TableFunctionTableSink算子赋值给执行计划。
  1. public class PlanFragment extends TreeNode<PlanFragment> {
  2.     ...
  3.     public TPlanFragment toThrift() {
  4.         TPlanFragment result = new TPlanFragment();
  5.         ...
  6.         if (sink != null) {
  7.             result.setOutput_sink(sink.toThrift());
  8.         }
  9.         ...
  10.         return result;
  11.     }
  12.     ...
  13. }
  1. public class TableFunctionTableSink extends DataSink {
  2.     ...
  3.     @Override
  4.     protected TDataSink toThrift() {
  5.         TTableFunctionTableSink tTableFunctionTableSink = new TTableFunctionTableSink();
  6.         tTableFunctionTableSink.setTarget_table(table.toTTableFunctionTable());
  7.         TCloudConfiguration tCloudConfiguration = new TCloudConfiguration();
  8.         cloudConfiguration.toThrift(tCloudConfiguration);
  9.         tTableFunctionTableSink.setCloud_configuration(tCloudConfiguration);
  10.         // 设置Sink类型为TABLE_FUNCTION_TABLE_SINK
  11.         TDataSink tDataSink = new TDataSink(TDataSinkType.TABLE_FUNCTION_TABLE_SINK);
  12.         tDataSink.setTable_function_table_sink(tTableFunctionTableSink);
  13.         return tDataSink;
  14.     }
  15.     ...
  16. }
1.2.4 调度器转发

  1. public interface PBackendService {
  2.     @ProtobufRPC(serviceName = "PBackendService", methodName = "exec_plan_fragment",
  3.             attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 60000)
  4.     Future<PExecPlanFragmentResult> execPlanFragmentAsync(PExecPlanFragmentRequest request);
  5.     ...
  6. }
1.3 BE执行TDataSink算子的过程

1.3.1 RPC入口

  1. template <typename T>
  2. class PInternalServiceImplBase : public T {
  3. public:
  4.     ...
  5.     void exec_plan_fragment(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request,
  6.                             PExecPlanFragmentResult* result, google::protobuf::Closure* done) override;
  7.     ...
  8. }
  1. template <typename T>
  2. Status PInternalServiceImplBase<T>::_exec_plan_fragment_by_pipeline(const TExecPlanFragmentParams& t_common_param,
  3.                                                                     const TExecPlanFragmentParams& t_unique_request) {
  4.     pipeline::FragmentExecutor fragment_executor;
  5.     auto status = fragment_executor.prepare(_exec_env, t_common_param, t_unique_request);
  6.     if (status.ok()) {
  7.         return fragment_executor.execute(_exec_env);
  8.     } else {
  9.         return status.is_duplicate_rpc_invocation() ? Status::OK() : status;
  10.     }
  11. }
1.3.2 准备BE执行计划

  1. Status FragmentExecutor::prepare(ExecEnv* exec_env, const TExecPlanFragmentParams& common_request,
  2.                                  const TExecPlanFragmentParams& unique_request) {
  3.     ...
  4.     {
  5.         SCOPED_RAW_TIMER(&profiler.prepare_runtime_state_time);
  6.         RETURN_IF_ERROR(_prepare_workgroup(request));
  7.         RETURN_IF_ERROR(_prepare_runtime_state(exec_env, request));
  8.         // thrift对象转成BE执行计划树
  9.         RETURN_IF_ERROR(_prepare_exec_plan(exec_env, request));
  10.         RETURN_IF_ERROR(_prepare_global_dict(request));
  11.     }
  12.     {
  13.         SCOPED_RAW_TIMER(&profiler.prepare_pipeline_driver_time);
  14.         // 准备pipeline driver,解析sink节点
  15.         RETURN_IF_ERROR(_prepare_pipeline_driver(exec_env, request));
  16.         RETURN_IF_ERROR(_prepare_stream_load_pipe(exec_env, request));
  17.     }
  18.     ...
  19. }
  20. Status FragmentExecutor::_prepare_pipeline_driver(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request) {
  21.     ...
  22.     std::unique_ptr<DataSink> datasink;
  23.     if (request.isset_output_sink()) {
  24.         const auto& tsink = request.output_sink();
  25.         ...
  26.         RETURN_IF_ERROR(DataSink::create_data_sink(runtime_state, tsink, fragment.output_exprs, params,
  27.                                                    request.sender_id(), plan->row_desc(), &datasink));
  28.         // 将fe的sink节点转换成BE的TableFunctionTableSinkOperatorFactory
  29.         RETURN_IF_ERROR(_decompose_data_sink_to_operator(runtime_state, &context, request, datasink, tsink,
  30.                                                          fragment.output_exprs));
  31.     }
  32.     ...
  33.     // 这里将调用pipeline里面所有factory生成真正的BE operator,在这里就会生成TableFunctionTableSinkOperator
  34.     if (!unready_pipeline_groups.empty()) {
  35.         RETURN_IF_ERROR(create_lazy_instantiate_drivers_pipeline(
  36.                 runtime_state, &context, _query_ctx, _fragment_ctx.get(), std::move(unready_pipeline_groups), drivers));
  37.     }
  38.     ...
  39. }
  40. Status FragmentExecutor::_decompose_data_sink_to_operator(RuntimeState* runtime_state, PipelineBuilderContext* context,
  41.                                                           const UnifiedExecPlanFragmentParams& request,
  42.                                                           std::unique_ptr<starrocks::DataSink>& datasink,
  43.                                                           const TDataSink& thrift_sink,
  44.                                                           const std::vector<TExpr>& output_exprs) {
  45.     ...
  46.     if (typeid(*datasink) == ...) {
  47.         ...
  48.     } else if (typeid(*datasink) == typeid(starrocks::TableFunctionTableSink)) {
  49.         ...
  50.         auto op = std::make_shared<TableFunctionTableSinkOperatorFactory>(
  51.                 context->next_operator_id(), target_table.path, target_table.file_format, target_table.compression_type,
  52.                 output_expr_ctxs, partition_expr_ctxs, column_names, partition_column_names,
  53.                 target_table.write_single_file, thrift_sink.table_function_table_sink.cloud_configuration,
  54.                 fragment_ctx);
  55.         ...
  56.     }
  57. }
  1. Status TableFunctionTableSinkOperator::push_chunk(RuntimeState* state, const ChunkPtr& chunk) {
  2.     if (_partition_exprs.empty()) {
  3.         if (_partition_writers.empty()) {
  4.             auto writer = std::make_unique<RollingAsyncParquetWriter>(_make_table_info(_path), _output_exprs,
  5.                                                                       _common_metrics.get(), add_commit_info, state,
  6.                                                                       _driver_sequence);
  7.             RETURN_IF_ERROR(writer->init());
  8.             _partition_writers.insert({"default writer", std::move(writer)});
  9.         }
  10.         return _partition_writers["default writer"]->append_chunk(chunk.get(), state);
  11.     }
  12.     ...
  13.     return _partition_writers[partition_location]->append_chunk(chunk.get(), state);
  14. }
1.3.3 ParquetWriter初始化

  1. Status RollingAsyncParquetWriter::init() {
  3.             _fs, FileSystem::CreateUniqueFromString(_table_info.partition_location, FSOptions(&_table_info.cloud_conf)))
  4.     _schema = _table_info.schema;
  5.     _partition_location = _table_info.partition_location;
  6.     ::parquet::WriterProperties::Builder builder;
  7.     _table_info.enable_dictionary ? builder.enable_dictionary() : builder.disable_dictionary();
  8.     ASSIGN_OR_RETURN(auto compression_codec,
  9.                      parquet::ParquetBuildHelper::convert_compression_type(_table_info.compress_type));
  10.     builder.compression(compression_codec);
  11.     builder.version(::parquet::ParquetVersion::PARQUET_2_0);
  12.     _properties =;
  13.     return Status::OK();
  14. }
  1. StatusOr<std::unique_ptr<FileSystem>> FileSystem::CreateUniqueFromString(std::string_view uri, FSOptions options) {
  2.     if (fs::is_posix_uri(uri)) {
  3.         return new_fs_posix();
  4.     }
  5.     if (fs::is_s3_uri(uri)) {
  6.         return new_fs_s3(options);
  7.     }
  8.     if (fs::is_azure_uri(uri) || fs::is_gcs_uri(uri)) {
  9.         // TODO(SmithCruise):
  10.         // Now Azure storage and Google Cloud Storage both are using LibHdfs, we can use cpp sdk instead in the future.
  11.         return new_fs_hdfs(options);
  12.     }
  13. #ifdef USE_STAROS
  14.     if (is_starlet_uri(uri)) {
  15.         return new_fs_starlet();
  16.     }
  17. #endif
  18.     // Since almost all famous storage are compatible with Hadoop FileSystem, it's always a choice to fallback using
  19.     // Hadoop FileSystem to access storage.
  20.     return new_fs_hdfs(options);
  21. }
初始化文件系统会检查FILES()传入的path参数,path在这个函数中是uri参数,这里会判断uri是否是posix本地路径,是否是s3, azure格式,如果都不是会返回hdfs文件系统。
  1. class FileSystem {
  2. public:
  3.     enum Type { POSIX, S3, HDFS, BROKER, MEMORY, STARLET };
  4.     // Governs if/how the file is created.
  5.     //
  6.     // enum value                   | file exists       | file does not exist
  7.     // -----------------------------+-------------------+--------------------
  8.     // CREATE_OR_OPEN_WITH_TRUNCATE | opens + truncates | creates
  9.     // CREATE_OR_OPEN               | opens             | creates
  10.     // MUST_CREATE                  | fails             | creates
  11.     // MUST_EXIST                   | opens             | fails
  13.     ...
  14.     // Create a brand new sequentially-readable file with the specified name.
  15.     //  If the file does not exist, returns a non-OK status.
  16.     //
  17.     // The returned file will only be accessed by one thread at a time.
  18.     StatusOr<std::unique_ptr<SequentialFile>> new_sequential_file(const std::string& fname) {
  19.         return new_sequential_file(SequentialFileOptions(), fname);
  20.     }
  21.     virtual StatusOr<std::unique_ptr<SequentialFile>> new_sequential_file(const SequentialFileOptions& opts,
  22.                                                                           const std::string& fname) = 0;
  23.     // Create a brand new random access read-only file with the
  24.     // specified name.
  25.     //
  26.     // The returned file will only be accessed by one thread at a time.
  27.     StatusOr<std::unique_ptr<RandomAccessFile>> new_random_access_file(const std::string& fname) {
  28.         return new_random_access_file(RandomAccessFileOptions(), fname);
  29.     }
  30.     virtual StatusOr<std::unique_ptr<RandomAccessFile>> new_random_access_file(const RandomAccessFileOptions& opts,
  31.                                                                                const std::string& fname) = 0;
  32.     // Create an object that writes to a new file with the specified
  33.     // name.  Deletes any existing file with the same name and creates a
  34.     // new file.
  35.     //
  36.     // The returned file will only be accessed by one thread at a time.
  37.     virtual StatusOr<std::unique_ptr<WritableFile>> new_writable_file(const std::string& fname) = 0;
  38.     // Like the previous new_writable_file, but allows options to be
  39.     // specified.
  40.     virtual StatusOr<std::unique_ptr<WritableFile>> new_writable_file(const WritableFileOptions& opts,
  41.                                                                       const std::string& fname) = 0;
  42.     // Returns OK if the path exists.
  43.     //         NotFound if the named file does not exist,
  44.     //                  the calling process does not have permission to determine
  45.     //                  whether this file exists, or if the path is invalid.
  46.     //         IOError if an IO Error was encountered
  47.     virtual Status path_exists(const std::string& fname) = 0;
  48.     // Store in *result the names of the children of the specified directory.
  49.     // The names are relative to "dir".
  50.     // Original contents of *results are dropped.
  51.     // Returns OK if "dir" exists and "*result" contains its children.
  52.     //         NotFound if "dir" does not exist, the calling process does not have
  53.     //                  permission to access "dir", or if "dir" is invalid.
  54.     //         IOError if an IO Error was encountered
  55.     virtual Status get_children(const std::string& dir, std::vector<std::string>* result) = 0;
  56.     // Iterate the specified directory and call given callback function with child's
  57.     // name. This function continues execution until all children have been iterated
  58.     // or callback function return false.
  59.     // The names are relative to "dir".
  60.     //
  61.     // The function call extra cost is acceptable. Compared with returning all children
  62.     // into a given vector, the performance of this method is 5% worse. However this
  63.     // approach is more flexiable and efficient in fulfilling other requirements.
  64.     //
  65.     // Returns OK if "dir" exists.
  66.     //         NotFound if "dir" does not exist, the calling process does not have
  67.     //                  permission to access "dir", or if "dir" is invalid.
  68.     //         IOError if an IO Error was encountered
  69.     virtual Status iterate_dir(const std::string& dir, const std::function<bool(std::string_view)>& cb) = 0;
  70.     // `iterate_dir2` is similar to `iterate_dir` but in addition to returning the directory entry name, it
  71.     // also returns some file statistics.
  72.     virtual Status iterate_dir2(const std::string& dir, const std::function<bool(DirEntry)>& cb) = 0;
  73.     // Delete the named file.
  74.     // FIXME: If the named file does not exist, OK or NOT_FOUND is returned, depend on the implementation.
  75.     virtual Status delete_file(const std::string& fname) = 0;
  76.     // Create the specified directory.
  77.     // NOTE: It will return error if the path already exist(not necessarily as a directory)
  78.     virtual Status create_dir(const std::string& dirname) = 0;
  79.     // Creates directory if missing.
  80.     // Return OK if it exists, or successful in Creating.
  81.     virtual Status create_dir_if_missing(const std::string& dirname, bool* created = nullptr) = 0;
  82.     // Create directory for every element of 'dirname' that does not already exist.
  83.     // If 'dirname' already exists, the function does nothing (this condition is not treated as an error).
  84.     virtual Status create_dir_recursive(const std::string& dirname) = 0;
  85.     // Delete the specified directory.
  86.     // NOTE: The dir must be empty.
  87.     virtual Status delete_dir(const std::string& dirname) = 0;
  88.     // Deletes the contents of 'dirname' (if it is a directory) and the contents of all its subdirectories,
  89.     // recursively, then deletes 'dirname' itself. Symlinks are not followed (symlink is removed, not its target).
  90.     virtual Status delete_dir_recursive(const std::string& dirname) = 0;
  91.     // Synchronize the entry for a specific directory.
  92.     virtual Status sync_dir(const std::string& dirname) = 0;
  93.     // Checks if the file is a directory. Returns an error if it doesn't
  94.     // exist, otherwise return true or false.
  95.     virtual StatusOr<bool> is_directory(const std::string& path) = 0;
  96.     // Canonicalize 'path' by applying the following conversions:
  97.     // - Converts a relative path into an absolute one using the cwd.
  98.     // - Converts '.' and '..' references.
  99.     // - Resolves all symbolic links.
  100.     //
  101.     // All directory entries in 'path' must exist on the filesystem.
  102.     virtual Status canonicalize(const std::string& path, std::string* result) = 0;
  103.     virtual StatusOr<uint64_t> get_file_size(const std::string& fname) = 0;
  104.     // Get the last modification time by given 'fname'.
  105.     virtual StatusOr<uint64_t> get_file_modified_time(const std::string& fname) = 0;
  106.     // Rename file src to target.
  107.     virtual Status rename_file(const std::string& src, const std::string& target) = 0;
  108.     // create a hard-link
  109.     virtual Status link_file(const std::string& /*old_path*/, const std::string& /*new_path*/) = 0;
  110.     // Determines the information about the filesystem on which the pathname 'path' is located.
  111.     virtual StatusOr<SpaceInfo> space(const std::string& path) { return Status::NotSupported("FileSystem::space()"); }
  112.     // Given the path to a remote file, delete the file's cache on the local file system, if any.
  113.     // On success, Status::OK is returned. If there is no cache, Status::NotFound is returned.
  114.     virtual Status drop_local_cache(const std::string& path) { return Status::NotFound(path); }
  115.     // Batch delete the given files.
  116.     // return ok if all success (not found error ignored), error if any failed and the message indicates the fail message
  117.     // possibly stop at the first error if is simulating batch deletes.
  118.     virtual Status delete_files(const std::vector<std::string>& paths) {
  119.         for (auto&& path : paths) {
  120.             auto st = delete_file(path);
  121.             if (!st.ok() && !st.is_not_found()) {
  122.                 return st;
  123.             }
  124.         }
  125.         return Status::OK();
  126.     }
  127. };

  • 创建顺序读的文件
  • 创建随机读的文件
  • 创建可写的文件
  • 一系列路径操作
  • 获取文件信息
  • 删除文件
1.3.4 ParquetWriter写入数据

  1. Status RollingAsyncParquetWriter::append_chunk(Chunk* chunk, RuntimeState* state) {
  2.     RETURN_IF_ERROR(get_io_status());
  3.     if (_writer == nullptr) {
  4.         RETURN_IF_ERROR(_new_file_writer());
  5.     }
  6.     // exceed file size
  7.     if (_max_file_size != -1 && _writer->file_size() > _max_file_size) {
  8.         RETURN_IF_ERROR(close_current_writer(state));
  9.         RETURN_IF_ERROR(_new_file_writer());
  10.     }
  11.     return _writer->write(chunk);
  12. }
  13. Status RollingAsyncParquetWriter::_new_file_writer() {
  14.     std::string new_file_location = _new_file_location();
  15.     WritableFileOptions options{.sync_on_close = false, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE};
  16.     ASSIGN_OR_RETURN(auto writable_file, _fs->new_writable_file(options, new_file_location))
  17.     _writer = std::make_shared<starrocks::parquet::AsyncFileWriter>(
  18.             std::move(writable_file), new_file_location, _partition_location, _properties, _schema, _output_expr_ctxs,
  19.             ExecEnv::GetInstance()->pipeline_sink_io_pool(), _parent_profile, _max_file_size);
  20.     auto st = _writer->init();
  21.     return st;
  22. }
  1. Status FileWriterBase::init() {
  2.     _writer = ::parquet::ParquetFileWriter::Open(_outstream, _schema, _properties);
  3.     if (_writer == nullptr) {
  4.         return Status::InternalError("Failed to create file writer");
  5.     }
  6.     return Status::OK();
  7. }
  8. void FileWriterBase::_generate_chunk_writer() {
  9.     DCHECK(_writer != nullptr);
  10.     if (_chunk_writer == nullptr) {
  11.         auto rg_writer = _writer->AppendBufferedRowGroup();
  12.         _chunk_writer = std::make_unique<ChunkWriter>(rg_writer, _type_descs, _schema, _eval_func);
  13.     }
  14. }
  15. Status FileWriterBase::write(Chunk* chunk) {
  16.     if (!chunk->has_rows()) {
  17.         return Status::OK();
  18.     }
  19.     _generate_chunk_writer();
  20.     RETURN_IF_ERROR(_chunk_writer->write(chunk));
  21.     if (_chunk_writer->estimated_buffered_bytes() > _max_row_group_size && !is_last_row_group()) {
  22.         RETURN_IF_ERROR(_flush_row_group());
  23.     }
  24.     return Status::OK();
  25. }


