锦通 发表于 昨天 09:29

使用Apache SeaTunnel高效集成和管理SftpFile数据源

http://openwrite-whaleops.oss-cn-zhangjiakou.aliyuncs.com/31504_A6F9820BA5D940C38CC396017B9EC960
本文为Apache SeaTunnel已经支持的SftpFile Source Connector使用文档,旨在帮助读者理解怎样高效地使用SFTP文件源连接器,以便轻松地使用Apache SeaTunnel集成和管理您的SftpFil数据源。
SftpFile 是指通过 SFTP(Secure File Transfer Protocol)协议进行文件操作的对象或组件。在网络编程和数据集成中,SFTPFile 通常用来表示和操作存储在远程 SFTP 服务器上的文件。SFTP 是一种安全的文件传输协议,基于 SSH(Secure Shell)协议,提供了加密的文件传输和远程文件操作功能。
支持的引擎

Spark

Flink

SeaTunnel Zeta

主要特性


[*] 批处理
[*] 列投影
[*] 并行处理
[*] 文件格式类型

[*] 文本
[*] CSV
[*] JSON
[*] Excel

描述

从 SFTP 文件服务器读取数据。
支持的数据源信息

使用 SftpFile 连接器,需要以下依赖项。可以通过 install-plugin.sh 下载,也可以从 Maven 中央堆栈获取。
数据源支持的版本依赖项SftpFile通用下载

[*]提示
如果你使用的是 Spark/Flink,请确保 Spark/Flink 集群已经集成了 Hadoop。Hadoop 2.x 版本已通过测试。
如果使用 SeaTunnel 引擎,安装 SeaTunnel 引擎时会主动集成 Hadoop JAR 包。可以在 ${SEATUNNEL_HOME}/lib 目次下检查这个 JAR 包是否存在。
为了支持更多的文件类型,我们做了一些妥协,所以在内部访问 Sftp 时我们使用了 HDFS 协议,这个连接器需要一些 Hadoop 依赖项,且仅支持 Hadoop 版2.9.X+ 版本。
数据类型映射

文件没有特定的类型列表,我们可以通过在设置中指定模式来指示要将哪个 SeaTunnel 数据类型转换为相应的数据。
SeaTunnel 数据类型STRINGSHORTINTBIGINTBOOLEANDOUBLEDECIMALFLOATDATETIMETIMESTAMPBYTESARRAYMAPSource选项

名称类型必填默认值描述host字符串是-目的 SFTP 主机所在port整数是-目的 SFTP 端标语user字符串是-目的 SFTP 用户名password字符串是-目的 SFTP 暗码path字符串是-源文件路径file_format_type字符串是-请查看下文的 #file_format_typefile_filter_pattern字符串否-用于文件过滤的过滤器模式。delimiter字符串否\001字段分隔符,用于告诉连接器怎样在读取文本文件时切割字段。默认 \001,与 Hive 的默认分隔符类似。parse_partition_from_path布尔型否true控制是否从文件路径中解析分区键和值。
例如,如果从路径中读取文件 oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26
那么文件中的每条记录将添加这两个字段:
name       age
tyrantlucifer26
提示:不要在模式选项中界说分区字段。date_format字符串否yyyy-MM-dd日期类型的格式,用于告诉连接器怎样将字符串转换为日期,支持以下格式:
yyyy-MM-dd yyyy.MM.dd yyyy/MM/dd,默认 yyyy-MM-dd。datetime_format字符串否yyyy-MM-dd HH:mm:ss日期时间类型的格式,用于告诉连接器怎样将字符串转换为日期时间,支持以下格式:
yyyy-MM-dd HH:mm:ss yyyy.MM.dd HH:mm:ss yyyy/MM/dd HH:mm:ss yyyyMMddHHmmss,默认 yyyy-MM-dd HH:mm:ss。time_format字符串否HH:mm:ss时间类型的格式,用于告诉连接器怎样将字符串转换为时间,支持以下格式:
HH:mm:ss HH:mm:ss.SSS,默认 HH:mm:ss。skip_header_row_number长整型否0跳过前几行,仅对 txt 和 csv 文件有效。
例如,设置如下:
skip_header_row_number = 2
那么 SeaTunnel 将跳过源文件的前两行。sheet_name字符串否-读取工作簿的工作表名称,仅在文件格式为 Excel 时使用。schema设置项否-请查看下文的 #schema通用选项否-Source 插件通用参数,请参考 Source通用选项 获取详细信息。file_format_type [字符串]

支持以下文件类型:
text csv parquet orc json excel
如果将文件类型指定为 json,需要设置 Schema 模式选项,向连接器说明怎样解析数据为你需要所需的 Row。
例如:上游数据如下:
{"code":200, "data":"get success", "success":true}也可以将多个数据保存在一个文件中,并通过换行符进行分隔:
{"code":200, "data":"get success", "success":true}
{"code":300, "data":"get failed", "success":false}需要按照以下方式设置 Schema:
schema {
    fields {
      code = int
      data = string
      success = boolean
    }
}连接器将生成以下数据:
codedatasuccess200获取成功true如果将文件类型指定为 parquet 或 orc,则无需指定模式选项,连接器可以主动查找上游数据的模式。如果将文件类型指定为 text 或 csv,则可以选择是否指定模式信息或不指定。例如,上游数据如下:
tyrantlucifer#26#male如果不设置 Schema,Connector 将这样处理上游数据:
如果分配数据模式,则除了 CSV 文件类型外,还应分配选项 delimiter。
内容tyrantlucifer#26#male如果设置了数据 Schema,除了CSV文件类型,还需要设置选项分隔符。需要设置 Schema 和分隔符如下:
delimiter = "#"
schema {
    fields {
      name = string
      age = int
      gender = string
    }
}连接器将生成以下数据:
姓名年事性别tyrantlucifer26男Schema [设置项]

fields [设置项]
上游数据的 Schema。
怎样创建 Sftp 数据同步任务

以下示例演示了怎样创建一个数据同步任务,从 Sftp 读取数据并在当地客户端上打印出来:
# 设置要执行的任务的基本配置
env {
execution.parallelism = 1
job.mode = "BATCH"
}

# 创建连接到 Sftp 的源
source {
SftpFile {
    host = "sftp"
    port = 22
    user = seatunnel
    password = pass
    path = "tmp/seatunnel/read/json"
    file_format_type = "json"
    result_table_name = "sftp"
    schema = {
      fields {
      c_map = "map<string, string>"
      c_array = "array<int>"
      c_string = string
      c_boolean = boolean
      c_tinyint = tinyint
      c_smallint = smallint
      c_int = int
      c_bigint = bigint
      c_float = float
      c_double = double
      c_bytes = bytes
      c_date = date
      c_decimal = "decimal(38, 18)"
      c_timestamp = timestamp
      c_row = {
          C_MAP = "map<string, string>"
          C_ARRAY = "array<int>"
          C_STRING = string
          C_BOOLEAN = boolean
          C_TINYINT = tinyint
          C_SMALLINT = smallint
          C_INT = int
          C_BIGINT = bigint
          C_FLOAT = float
          C_DOUBLE = double
          C_BYTES = bytes
          C_DATE = date
          C_DECIMAL = "decimal(38, 18)"
          C_TIMESTAMP = timestamp
      }
      }
    }
}
}

# 控制台打印读取的 Sftp 数据
sink {
Console {
    parallelism = 1
}
}本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 使用Apache SeaTunnel高效集成和管理SftpFile数据源