SeaTunnel Engine 的Master服务和Worker服务分离,每个服务单独一个进程。
- Master节点只负责作业调度,RESTful API,使命提交等,Imap数据只存储在Master节点中。
- Worker节点只负责使命的实行,不到场推举成为Master,也不存储Imap数据。
在所有Master节点中,同一时间只有一个Master节点工作,其他Master节点处于standby状态。
当Master节点宕机或心跳超时,会从其它节点中推举出一个新的Master Active节点。
这是最推荐的一种使用方式,在该模式下Master的负载会很小,Master有更多的资源用来举行作业的调度,使命的容错指标监控以及提供Rest API服务等,会有更高的稳定性。
同时Worker节点不存储Imap的数据,所有的Imap数据都存储在Master节点中,即使Worker节点负载高大概挂掉,也不会导致Imap数据重新分布。
下载安装包
在开始下载SeaTunnel之前,您必要确保您已经安装了SeaTunnel所必要的以下软件:
安装Java (Java 8 或 11, 其他高于Java 8的版本理论上也可以工作) 以及设置 JAVA_HOME。
进入SeaTunnel下载页面(https://seatunnel.apache.org/download)下载最新版本的发布版安装包seatunnel--bin.tar.gz
大概您也可以通过终端下载- export version="2.3.8"
- wget "https://archive.apache.org/dist/seatunnel/${version}/apache-seatunnel-${version}-bin.tar.gz"
- tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"
复制代码 配置 SEATUNNEL_HOME
您可以通过添加 /etc/profile.d/seatunnel.sh 文件来配置 SEATUNNEL_HOME 。
/etc/profile.d/seatunnel.sh 的内容如下:- export SEATUNNEL_HOME=${seatunnel install path}
- export PATH=$PATH:$SEATUNNEL_HOME/bin
复制代码 配置 Master 节点 JVM 选项
Master节点的JVM参数在$SEATUNNEL_HOME/config/jvm_master_options文件中配置。- # JVM Heap
- -Xms2g
- -Xmx2g
- # JVM Dump
- -XX:+HeapDumpOnOutOfMemoryError
- -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server
- # Metaspace
- -XX:MaxMetaspaceSize=2g
- # G1GC
- -XX:+UseG1GC
复制代码 Worker节点的JVM参数在$SEATUNNEL_HOME/config/jvm_worker_options文件中配置。- # JVM Heap
- -Xms2g
- -Xmx2g
- # JVM Dump
- -XX:+HeapDumpOnOutOfMemoryError
- -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server
- # Metaspace
- -XX:MaxMetaspaceSize=2g
- # G1GC
- -XX:+UseG1GC
复制代码 配置 SeaTunnel Engine
SeaTunnel Engine 提供很多功能,必要在 seatunnel.yaml 中举行配置。
Imap中数据的备份数设置(该参数在Worker节点无效)
SeaTunnel Engine 基于 Hazelcast IMDG 实现集群管理。集群的状态数据(作业运行状态、资源状态)存储在 Hazelcast IMap。
存储在 Hazelcast IMap 中的数据将在集群的所有节点上分布和存储。
Hazelcast 会分区存储在 Imap 中的数据。每个分区可以指定备份数量。因此,SeaTunnel Engine 可以实现集群 HA,无需使用其他服务(比方 zookeeper)。
backup count 是定义同步备份数量的参数。比方,如果设置为 1,则分区的备份将放置在一个其他成员上。如果设置为 2,则将放置在两个其他成员上。
我们建议 backup-count 的值为 min(1, max(5, N/2))。N 是集群节点的数量。- seatunnel:
- engine:
- backup-count: 1
- # 其他配置
复制代码由于在分离集群模式下,Worker节点不存储Imap数据,因此Worker节点的backup-count配置无效。
如果Master和Worker进程在同一个机器上启动,Master和Worker会共用seatunnel.yaml配置文件,此时Worker节点服务会忽略backup-count配置。
Slot配置(该参数在Master节点无效)
Slot数量决定了集群节点可以并行运行的使命组数量。一个使命必要的Slot的个数公式为 N = 2 + P(使命配置的并行度)。
默认情况下SeaTunnel Engine的slot个数为动态,即不限定个数。我们建议slot的个数设置为节点CPU核心数的2倍。
动态slot个数(默认)配置如下:- seatunnel:
- engine:
- slot-service:
- dynamic-slot: true
- # 其他配置
复制代码 静态slot个数配置如下:- seatunnel:
- engine:
- slot-service:
- dynamic-slot: false
- slot-num: 20
复制代码 由于在分离集群模式下,Master节点不运行使命,所以Master服务不会启动Slot服务,因此Master节点的slot-service配置无效。
如果Master和Worker进程在同一个机器上启动,Master和Worker会共用seatunnel.yaml配置文件,此时Master节点服务会忽略slot-service配置。
查抄点管理器(该参数在Worker节点无效)
与 Flink 一样,SeaTunnel Engine 支持 Chandy–Lamport 算法。因此,可以实现无数据丢失和重复的数据同步。
interval
两个查抄点之间的间隔,单位是毫秒。如果在作业配置文件的 env 中配置了 checkpoint.interval 参数,将以作业配置文件中设置的为准。
timeout
查抄点的超时时间。如果在超时时间内无法完成查抄点,则会触发查抄点失败,作业失败。如果在作业的配置文件的env中配置了checkpoint.timeout参数,将以作业配置文件中设置的为准。
示例- seatunnel:
- engine:
- backup-count: 1
- print-execution-info-interval: 10
- slot-service:
- dynamic-slot: true
- checkpoint:
- interval: 300000
- timeout: 10000
复制代码 checkpoint storage
查抄点是一种容错恢复机制。这种机制确保程序在运行时,即使突然碰到非常,也能自行恢复。
查抄点定时触发,每次查抄点举行时每个Task都会被要求将自身的状态信息(比如读取kafka时读取到了哪个offset)上报给查抄点线程,由该线程写入一个分布式存储(或共享存储)。
当使命失败然后自动容错恢复时,大概通过seatunnel.sh -r 指令恢复之前被停息的使命时,会从查抄点存储中加载对应作业的状态信息,并基于这些状态信息举行作业的恢复。
如果集群的节点大于1,查抄点存储必须是一个分布式存储,大概共享存储,如许才能包管任意节点挂掉后依然可以在另一个节点加载到存储中的使命状态信息。
查抄点配置只有Master服务才会读取,Worker服务不会读取查抄点配置。如果Master和Worker进程在同一个机器上启动,Master和Worker会共用seatunnel.yaml配置文件,此时Worker节点服务会忽略checkpoint配置。
历史作业逾期配置
每个完成的作业的信息,如状态、计数器和错误日志,都存储在 IMap 对象中。
随着运行作业数量的增加,内存会增加,终极内存将溢出。因此,您可以调整 history-job-expire-minutes 参数来解决这个问题。
此参数的时间单位是分钟。默认值是 1440 分钟,即一天。
示例- seatunnel:
- engine:
- history-job-expire-minutes: 1440
复制代码 类加载器缓存模式
此配置主要解决不断创建和尝试销毁类加载器所导致的资源走漏问题。如果您碰到与metaspace空间溢出干系的非常,您可以尝试启用此配置。
为了减少创建类加载器的频率,在启用此配置后,SeaTunnel 在作业完成时不会尝试开释相应的类加载器,以便它可以被后续作业使用,也就是说,当运行作业中使用的 Source/Sink 连接器类型不是太多时,它更有效。默认值是 false。
示例- seatunnel:
- engine:
- classloader-cache-mode: true
复制代码 IMap持久化配置(该参数在Worker节点无效)
由于在分离集群模式下,只有Master节点存储Imap数据,Worker节点不存储Imap数据,所以Worker服务不会读取该参数项。
在SeaTunnel中,我们使用IMap(一种分布式的Map,可以实现数据跨节点跨进程的写入的读取 有关详细信息,请参阅 Hazelcast Map) 来存储每个使命及其task的状态,以便在使命所在节点宕机后,可以在其他节点上获取到使命之前的状态信息,从而恢复使命实现使命的容错。
默认情况下Imap的信息只是存储在内存中,我们可以设置Imap数据的复本数,具体可参考(Imap中数据的备份数设置),如果复本数是2,代表每个数据会同时存储在2个不同的节点中。
一旦节点宕机,Imap中的数据会重新在其它节点上自动补充到设置的复本数。但是当所有节点都被停止后,Imap中的数据会丢失。当集群节点再次启动后,所有之前正在运行的使命都会被标记为失败,必要用户手工通过seatunnel.sh -r指令恢复运行。
为了解决这个问题,我们可以将Imap中的数据持久化到外部存储中,如HDFS、OSS等。如许即使所有节点都被停止,Imap中的数据也不会丢失,当集群节点再次启动后,所有之前正在运行的使命都会被自动恢复。
下面先容如何使用 MapStore 持久化配置。
type
imap 持久化的类型,目前仅支持 hdfs。
namespace
它用于区分不同业务的数据存储位置,如 OSS 存储桶名称。
clusterName
此参数主要用于集群隔离, 我们可以使用它来区分不同的集群,如 cluster1、cluster2,这也用于区分不同的业务。
fs.defaultFS
我们使用 HDFS API 读写文件,因此使用此存储必要提供 HDFS 配置。
如果您使用 HDFS,可以像如许配置:- map:
- engine*:
- map-store:
- enabled: true
- initial-mode: EAGER
- factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
- properties:
- type: hdfs
- namespace: /tmp/seatunnel/imap
- clusterName: seatunnel-cluster
- storage.type: hdfs
- fs.defaultFS: hdfs://localhost:9000
复制代码 如果没有 HDFS,而且您的集群只有一个节点,您可以像如许配置使用当地文件:- map:
- engine*:
- map-store:
- enabled: true
- initial-mode: EAGER
- factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
- properties:
- type: hdfs
- namespace: /tmp/seatunnel/imap
- clusterName: seatunnel-cluster
- storage.type: hdfs
- fs.defaultFS: file:///
复制代码 如果您使用 OSS,可以像如许配置:- map:
- engine*:
- map-store:
- enabled: true
- initial-mode: EAGER
- factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
- properties:
- type: hdfs
- namespace: /tmp/seatunnel/imap
- clusterName: seatunnel-cluster
- storage.type: oss
- block.size: block size(bytes)
- oss.bucket: oss://bucket name/
- fs.oss.accessKeyId: OSS access key id
- fs.oss.accessKeySecret: OSS access key secret
- fs.oss.endpoint: OSS endpoint
复制代码 注意:使用OSS 时,确保 lib目录下有这几个jar.- aliyun-sdk-oss-3.13.2.jar
- hadoop-aliyun-3.3.6.jar
- jdom2-2.0.6.jar
- netty-buffer-4.1.89.Final.jar
- netty-common-4.1.89.Final.jar
- seatunnel-hadoop3-3.1.4-uber.jar
复制代码 作业调度策略
当资源不足时,作业调度策略可以配置为以下两种模式:
WAIT:等待资源可用。
REJECT:拒绝作业,默认值。
示例- seatunnel:
- engine:
- job-schedule-strategy: WAIT
复制代码 当dynamic-slot: ture时,job-schedule-strategy: WAIT 配置会失效,将被强制修改为job-schedule-strategy: REJECT,由于动态Slot时该参数没有意义,可以直接提交。
配置网络服务
所有 SeaTunnel Engine 网络干系的配置都在 hazelcast-Master.yaml和hazelcast-worker.yaml 文件中.
集群名称
SeaTunnel Engine 节点使用 cluster-name 来确定另一个节点是否与自己在同一集群中。如果两个节点之间的集群名称不同,SeaTunnel 引擎将拒绝服务请求。
网络
基于 Hazelcast , 一个 SeaTunnel Engine 集群是由运行 SeaTunnel Engine 服务器的集群成员组成的网络。集群成员自动参加一起形成集群。这种自动参加是通过集群成员使用的各种发现机制来相互发现的。
请注意,集群形成后,集群成员之间的通信始终通过 TCP/IP 举行,无论使用的发现机制如何。
SeaTunnel Engine 使用以下发现机制。
TCP
您可以将 SeaTunnel Engine 配置为完整的 TCP/IP 集群。有关配置详细信息,请参阅 Discovering Members by TCP section。
在分离集群模式下,Master和Worker服务使用不同的端口。
Master节点网络配置 hazelcast-Master.yaml- hazelcast:
- cluster-name: seatunnel
- network:
- rest-api:
- enabled: true
- endpoint-groups:
- CLUSTER_WRITE:
- enabled: true
- DATA:
- enabled: true
- join:
- tcp-ip:
- enabled: true
- member-list:
- - master-node-1:5801
- - master-node-2:5801
- - worker-node-1:5802
- - worker-node-2:5802
- port:
- auto-increment: false
- port: 5801
- properties:
- hazelcast.heartbeat.failuredetector.type: phi-accrual
- hazelcast.heartbeat.interval.seconds: 2
- hazelcast.max.no.heartbeat.seconds: 180
- hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
- hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
- hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
复制代码 Worker节点网络配置 hazelcast-worker.yaml- hazelcast:
- cluster-name: seatunnel
- network:
- join:
- tcp-ip:
- enabled: true
- member-list:
- - master-node-1:5801
- - master-node-2:5801
- - worker-node-1:5802
- - worker-node-2:5802
- port:
- auto-increment: false
- port: 5802
- properties:
- hazelcast.heartbeat.failuredetector.type: phi-accrual
- hazelcast.heartbeat.interval.seconds: 2
- hazelcast.max.no.heartbeat.seconds: 180
- hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
- hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
- hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
复制代码 TCP 是我们建议在独立 SeaTunnel Engine 集群中使用的方式。
启动 SeaTunnel Engine Master 节点
可以通过守护进程使用 -d 参数启动。- mkdir -p $SEATUNNEL_HOME/logs
- ./bin/seatunnel-cluster.sh -d -r master
复制代码 日志将写入 $SEATUNNEL_HOME/logs/seatunnel-engine-Master.log
启动 SeaTunnel Engine Worker 节点
可以通过守护进程使用 -d 参数启动。- mkdir -p $SEATUNNEL_HOME/logs
- ./bin/seatunnel-cluster.sh -d -r worker
复制代码 日志将写入 $SEATUNNEL_HOME/logs/seatunnel-engine-worker.log
安装 SeaTunnel Engine 客户端
您可以通过添加 /etc/profile.d/seatunnel.sh 文件来配置 SEATUNNEL_HOME 。/etc/profile.d/seatunnel.sh 的内容如下:- export SEATUNNEL_HOME=${seatunnel install path}
- export PATH=$PATH:$SEATUNNEL_HOME/bin
复制代码 提交作业和管理作业
使用 SeaTunnel Engine 客户端提交作业
安装 SeaTunnel Engine 客户端
设置和服务器一样的SEATUNNEL_HOME
您可以通过添加 /etc/profile.d/seatunnel.sh 文件来配置 SEATUNNEL_HOME 。
/etc/profile.d/seatunnel.sh 的内容如下:- export SEATUNNEL_HOME=${seatunnel install path}
- export PATH=$PATH:$SEATUNNEL_HOME/bin
复制代码 配置 SeaTunnel Engine 客户端
所有 SeaTunnel Engine 客户端的配置都在 hazelcast-client.yaml 里。
cluster-name
客户端必须与 SeaTunnel Engine 具有相同的 cluster-name。
否则,SeaTunnel Engine 将拒绝客户端的请求。
network
必要将所有 SeaTunnel Engine Master节点的地址添加到这里。- hazelcast-client:
- cluster-name: seatunnel
- properties:
- hazelcast.logging.type: log4j2
- network:
- cluster-members:
- - master-node-1:5801
- - master-node-2:5801
复制代码 提交作业和管理作业
现在集群部署完成了,您可以通过以下教程完成作业的提交和管理:- bin/seatunnel.sh --config $SEATUNNEL_HOME/config/v2.batch.config.template
复制代码 --async参数可以让作业在后台运行,当作业提交后,客户端会退出。- ./bin/seatunnel.sh --config $SEATUNNEL_HOME/config/v2.batch.config.template --async
复制代码 -n或--name参数可以指定作业的名称- ./bin/seatunnel.sh --config $SEATUNNEL_HOME/config/v2.batch.config.template --async -n myjob
复制代码 查看作业列表该命令会输出所有当前集群中的作业列表(包含运行完成的历史作业和正在运行的作业)
查看作业状态- ./bin/seatunnel.sh -j <jobId>
复制代码 该命令会输出指定作业的状态信息
获取正在运行的作业监控信息- ./bin/seatunnel.sh --get_running_job_metrics
复制代码 该命令会输出正在运行的作业的监控信息
获取指定作业监控信息
--metrics 参数可以获取指定作业的监控信息- ./bin/seatunnel.sh --metrics <jobId>
复制代码 停息作业- ./bin/seatunnel.sh -s <jobId>
复制代码 该命令会停息指定作业,注意,只有开启了checkpoint的作业才支持停息作业(实时同步作业默认开启checkpoint,批处置惩罚作业默认不开启checkpoint必要通过在 env 中配置checkpoint.interval来开启checkpoint)。
停息作业是以split为最小单位的,即停息作业后,会等待当前正在运行的split运行完成后再停息。
使命恢复后,会从停息的split继续运行。
恢复作业- ./bin/seatunnel.sh -r <jobId> -c $SEATUNNEL_HOME/config/v2.batch.config.template
复制代码 该命令会恢复指定作业,注意,只有开启了checkpoint的作业才支持恢复作业(实时同步作业默认开启checkpoint,批处置惩罚作业默认不开启checkpoint必要通过在 env 中配置checkpoint.interval来开启checkpoint)。
恢复作业必要指定jobId和作业的配置文件。
运行失败的作业和通过seatunnel.sh -s停息的作业都可以通过该命令恢复。
取消作业- ./bin/seatunnel.sh -can <jobId1> [<jobId2> <jobId3> ...]
复制代码 该命令会取消指定作业,取消作业后,作业会被停止,作业的状态会变为CANCELED。
支持批量取消作业,可以一次取消多个作业。
被cancel的作业的所有断点信息都将被删除,无法通过seatunnel.sh -r恢复。
使用 REST API 提交作业
SeaTunnel Engine 提供了 REST API 用于提交作业。
v2版本的API使用jetty支持,与v1版本的接口规范相同 ,可以通过修改seatunnel.yaml中的配置项来指定端口和context-path,同时可以配置 enable-dynamic-port 开启动态端口(默认从 port 开始累加),默以为关闭, 如果enable-dynamic-port为true,将使用port和port+port-range范围内未使用的端口,默认范围是100。- seatunnel:
- engine:
- http:
- enable-http: true
- port: 8080
- enable-dynamic-port: false
- port-range: 100
复制代码 同时也可以配置context-path,配置如下:- seatunnel:
- engine:
- http:
- enable-http: true
- port: 8080
- context-path: /seatunnel
复制代码 其他API我已上传Github请查看自取
【 Github地址:https://github.com/Mrkuhuo/data-warehouse-learning 】
【 Gitee 地址:https://gitee.com/wzylzjtn/data-warehouse-learning 】
本文由 白鲸开源 提供发布支持!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |