必看!S3File Sink Connector 使用文档

打印 上一主题 下一主题

主题 785|帖子 785|积分 2355


S3File 是一个用于管理 Amazon S3(Simple Storage Service)的 Python 模块。当前,Apache SeaTunnel 已经支持 S3File Sink Connector,为了更好地使用这个 Connector,有必要看一下这篇使用文档指南。
描述

将数据输出到 AWS S3 文件系统。
提示:
如果您使用的是 Spark/Flink,在使用此连接器之前,必须确保您的 Spark/Flink 集群已经集成了 Hadoop。Hadoop 2.x 版本已通过测试。
如果您使用的是 SeaTunnel Engine,它会在您下载和安装 SeaTunnel Engine 时自动集成 Hadoop JAR 包。您可以在 ${SEATUNNEL_HOME}/lib 目录下确认这个 JAR 包是否存在。
主要特性

默认情况下,我们使用 2PC 提交来确保 "仅一次语义"。

  • 文件格式类型

    • 文本 (text)
    • CSV
    • Parquet
    • ORC
    • JSON
    • Excel

选项

名称类型必需默认值备注pathstring是-bucketstring是-fs.s3a.endpointstring是-fs.s3a.aws.credentials.providerstring是com.amazonaws.auth.InstanceProfileCredentialsProvideraccess_keystring否-仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用access_secretstring否-仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用custom_filenameboolean否false是否需要自定义文件名file_name_expressionstring否"${transactionId}"仅在 custom_filename 为 true 时使用filename_time_formatstring否"yyyy.MM.dd"仅在 custom_filename 为 true 时使用file_format_typestring否"csv"field_delimiterstring否'\001'仅在 file_format 为 text 时使用row_delimiterstring否"\n"仅在 file_format 为 text 时使用have_partitionboolean否false是否需要处理分区partition_byarray否-仅在 have_partition 为 true 时使用partition_dir_expressionstring否"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"仅在 have_partition 为 true 时使用is_partition_field_write_in_fileboolean否false仅在 have_partition 为 true 时使用sink_columnsarray否当此参数为空时,将写入所有从 "Transform" 或 "Source" 获取的字段is_enable_transactionboolean否truebatch_sizeint否1000000compress_codecstring否nonecommon-optionsobject否-max_rows_in_memoryint否-仅在 file_format 为 Excel 时使用sheet_namestring否Sheet$仅在 file_format 为 Excel 时使用path [string]

目标目录路径是必需的。
bucket [string]

S3 文件系统的bucket地址,例如:s3n://seatunnel-test,如果您使用的是 s3a 协议,此参数应为 s3a://seatunnel-test。
fs.s3a.endpoint [string]

fs s3a 端点
fs.s3a.aws.credentials.provider [string]

认证 s3a 的方式。目前我们仅支持 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 和 com.amazonaws.auth.InstanceProfileCredentialsProvider。
关于凭证提供程序的更多信息,您可以参考 Hadoop AWS 文档
access_key [string]

S3 文件系统的访问密钥。如果未设置此参数,请确认凭证提供程序链可以正确验证,可参考 hadoop-aws
access_secret [string]

S3 文件系统的访问密钥。如果未设置此参数,请确认凭证提供程序链可以正确验证,可参考 hadoop-aws
hadoop_s3_properties [map]

如果需要添加其他选项,可以在这里添加并参考此 链接
  1. hadoop_s3_properties {
  2.       "fs.s3a.buffer.dir" = "/data/st_test/s3a"
  3.       "fs.s3a.fast.upload.buffer" = "disk"
  4.    }
复制代码
custom_filename [boolean]

是否自定义文件名。
file_name_expression [string]

仅在 custom_filename 为 true 时使用
file_name_expression 描述了将创建到 path 中的文件表达式。我们可以在 file_name_expression 中添加变量 ${now} 或 ${uuid},例如 test_${uuid}_${now},
${now} 代表当前时间,其格式可以通过指定选项 filename_time_format 来定义。
请注意,如果 is_enable_transaction 为 true,我们将在文件名的开头自动添加${transactionId}_。
filename_time_format [string]

仅在 custom_filename 为 true 时使用
当 file_name_expression 参数中的格式为 xxxx-${now} 时,filename_time_format 可以指定路径的时间格式,默认值为 yyyy.MM.dd。常用的时间格式列于下表中:
符号描述y年M月d月中的天数H一天中的小时 (0-23)m小时中的分钟s分钟中的秒数file_format_type [string]

我们支持以下文件类型:

  • 文本 (text)
  • JSON
  • CSV
  • ORC
  • Parquet
  • Excel
请注意,最终文件名将以文件格式的后缀结尾,文本文件的后缀是 txt。
field_delimiter [string]

数据行中列之间的分隔符。仅在 file_format 为 text 时需要。
row_delimiter [string]

文件中行之间的分隔符。仅在 file_format 为 text 时需要。
have_partition [boolean]

是否需要处理分区。
partition_by [array]

仅在 have_partition 为 true 时使用。
基于选定字段对分区数据进行分区。
partition_dir_expression [string]

仅在 have_partition 为 true 时使用。
如果指定了 partition_by,我们将根据分区信息生成相应的分区目录,并将最终文件放在分区目录中。
默认的 partition_dir_expression 是 ${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/。k0 是第一个分区字段,v0 是第一个分区字段的值。
is_partition_field_write_in_file [boolean]

仅在 have_partition 为 true 时使用。
如果 is_partition_field_write_in_file 为 true,分区字段及其值将写入数据文件中。
例如,如果您想要写入 Hive 数据文件,其值应为 false。
sink_columns [array]

需要写入文件的哪些列,默认值为从 "Transform" 或 "Source" 获取的所有列。
字段的顺序决定了实际写入文件的顺序。
is_enable_transaction [boolean]

如果 is_enable_transaction 为 true,我们将确保在写入目标目录时数据不会丢失或重复。
请注意,如果 is_enable_transaction 为 true,我们将在文件头部自动添加 ${transactionId}_。
目前仅支持 true。
batch_size [int]

文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 batch_size 和 checkpoint.interval 共同决定。如果 checkpoint.interval 的值足够大,当文件中的行数大于 batch_size 时,写入器将写入文件。如果 checkpoint.interval 较小,则在新的检查点触发时,写入器将创建一个新文件。
compress_codec [string]

文件的压缩编解码器及其支持的详细信息如下:

  • txt: lzo none
  • JSON: lzo none
  • CSV: lzo none
  • ORC: lzo snappy lz4 zlib none
  • Parquet: lzo snappy lz4 gzip brotli zstd none
提示:Excel 类型不支持任何压缩格式。
常见选项

请参考 Sink Common Options 获取 Sink 插件的常见参数详细信息。
max_rows_in_memory [int]

当文件格式为 Excel 时,可以缓存在内存中的数据项的最大数量。
sheet_name [string]

工作簿的工作表名称。
示例

对于文本文件格式,具有 have_partition、custom_filename、sink_columns 和 com.amazonaws.auth.InstanceProfileCredentialsProvider 的配置示例:
  1.   S3File {
  2.     bucket = "s3a://seatunnel-test"
  3.     tmp_path = "/tmp/seatunnel"
  4.     path="/seatunnel/text"
  5.     fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
  6.     fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
  7.     file_format_type = "text"
  8.     field_delimiter = "\t"
  9.     row_delimiter = "\n"
  10.     have_partition = true
  11.     partition_by = ["age"]
  12.     partition_dir_expression = "${k0}=${v0}"
  13.     is_partition_field_write_in_file = true
  14.     custom_filename = true
  15.     file_name_expression = "${transactionId}_${now}"
  16.     filename_time_format = "yyyy.MM.dd"
  17.     sink_columns = ["name","age"]
  18.     is_enable_transaction=true
  19.     hadoop_s3_properties {
  20.       "fs.s3a.buffer.dir" = "/data/st_test/s3a"
  21.       "fs.s3a.fast.upload.buffer" = "disk"
  22.     }
  23.   }
复制代码
对于 Parquet 文件格式,仅需用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider进行配置:
  1.   S3File {
  2.     bucket = "s3a://seatunnel-test"
  3.     tmp_path = "/tmp/seatunnel"
  4.     path="/seatunnel/parquet"
  5.     fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
  6.     fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
  7.     access_key = "xxxxxxxxxxxxxxxxx"
  8.     secret_key = "xxxxxxxxxxxxxxxxx"
  9.     file_format_type = "parquet"
  10.     hadoop_s3_properties {
  11.       "fs.s3a.buffer.dir" = "/data/st_test/s3a"
  12.       "fs.s3a.fast.upload.buffer" = "disk"
  13.     }
  14.   }
复制代码
对于 orc 文件仅需配置 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider:
  1.   S3File {
  2.     bucket = "s3a://seatunnel-test"
  3.     tmp_path = "/tmp/seatunnel"
  4.     path="/seatunnel/orc"
  5.     fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
  6.     fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
  7.     access_key = "xxxxxxxxxxxxxxxxx"
  8.     secret_key = "xxxxxxxxxxxxxxxxx"
  9.     file_format_type = "orc"
  10.   }
复制代码
更新日志

2.3.0-beta 2022-10-20


  • 添加 S3File Sink 连接器
2.3.0 2022-12-30


  • Bug修复

    • 修复了以下导致数据写入文件失败的错误:

      • 当上游字段为空时会抛出 NullPointerException
      • Sink 列映射失败
      • 从状态中恢复写入器时直接获取事务失败 (3258)


  • 功能

    • 支持 S3A 协议 (3632)

      • 允许用户添加额外的 Hadoop-S3 参数
      • 允许使用 S3A 协议
      • 解耦 Hadoop-AWS 依赖

    • 支持设置每个文件的批处理大小 (3625)
    • 设置 S3 AK 为可选项 (3688)

下一版本

本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户国营

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表