ToB企服应用市场:ToB评测及商务社交产业平台

标题: 教程 | 使用 Apache SeaTunnel 同步本地文件到阿里云 OSS [打印本页]

作者: 美丽的神话    时间: 2023-10-6 06:15
标题: 教程 | 使用 Apache SeaTunnel 同步本地文件到阿里云 OSS

一直以来,大数据量一直是爆炸性增长,每天几十 TB 的数据增量已经非常常见,但云存储相对来说还是不便宜的。众多云上的大数据用户特别希望可以非常简单快速的将文件移动到更实惠的 S3、OSS 上进行保存,这篇文章就来介绍如何使用 SeaTunnel 来进行到 OSS 的数据同步。
首先简要介绍一下 Apache SeaTunnel,SeaTunnel 专注于数据集成和数据同步,主要解决以下问题:
SeaTunnel 支持海量数据的高效离线/实时同步, 每天可稳定高效同步数百亿级数据,已经有 B 站,腾讯云,微博,360,Shopee 等数百家公司生产使用。
下面步入今天的正题,今天具体来说是讲 Apache SeaTunnel 产品与阿里云 OSS 的集成。
在阿里云 OSS 产品界面,开通 Bucket:

下面是 SeaTunnel 的部署, SeaTunnel 支持多种部署方式: 单机,集群,K8s 等方式。由于 SeaTunnel 不依赖 Zookeeper 等第三方组件,所以整体部署非常简单,具体请参考其官网:https://seatunnel.apache.org/docs/2.3.0/start-v2/locally/deployment
接下来是 SeaTunnel 使用过程,使用命令:
  1. ./bin/seatunnel.sh -m local -c ./config/localfile-oss.config
复制代码
在 SeaTunnel 中,用户可以通过 config 文件定制自己的数据同步需求,最大限度地发挥 SeaTunnel 的潜力。那么接下来就给大家介绍一下如何配置 Config 文件
可以看到,config 文件包含几个部分:env、source、transform、sink。不同的模块有不同的功能。了解这些模块后,您将了解 SeaTunnel 的工作原理。
用于添加一些引擎可选参数,无论是哪个引擎(Spark或Flink),这里都要填写相应的可选参数。
source 用于定义 SeaTunnel 需要从哪里获取数据,并将获取的数据用于下一步。可以同时定义多个源。现在支持的来源检查 SeaTunnel 的来源。每个 Source 都有自己特定的参数来定义如何取数据,SeaTunnel 也提取了每个 source 会用到的参数,比如parameter,用来指定 result_table_name 当前 source 产生的数据的名称,方便供其他模块后续使用。
本例中的 localfile-oss.config 配置文件内容介绍:
  1. env {                                                                                                                                                                          
  2.   # You can set SeaTunnel environment configuration here                                                                                                                     
  3.   execution.parallelism = 10                                                                                                                                                  
  4.   job.mode = "BATCH"                                                                                                                                                           
  5.   checkpoint.interval = 10000                                                                                                                                                  
  6.   #execution.checkpoint.interval = 10000                                                                                                                                      
  7.   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"                                                                                                         
  8. }                                                                                                                                                                              
  9.                                                                                                                                                                               
  10. source {                                                                                                                                                                       
  11. LocalFile {                                                                                                                                                                  
  12.   #本地待同步的数据文件夹, 本例子中只有一个 test0.csv 文件,具体内容参考下图
  13.   path = "/data/seatunnel-2.3.1/testfile/source"                                                                                                                              
  14.   type = "csv"                                                                                                                                                               
  15.                                                                                                                                                                   
  16.   delimiter = "#"                                                                                                                                                               
  17.   schema {                                                                                                                                                                     
  18.     fields {                                                                                                                                                                  
  19.         name = string                                                                                                                                                         
  20.         age = int                                                                                                                                                            
  21.         gender = string                                                                                                                                                        
  22.     }                                                                                                                                                                          
  23.   }                                                                                                                                                                           
  24. }                                                                                                                                                                             
  25.                                                                                                         
  26. }                                                                                                                                                                              
  27.                                                                                                                                                                               
  28. sink {                                                                                                                                                                                                                                                                                                                                         
  29.   OssJindoFile {                                                                                                                                                              
  30.                                                                                                                                                                                                                                    path="/seatunnel/oss03"                                                        
  31.     bucket = "oss://bucket123456654321234.cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                      
  32.     access_key = "I5t7VZyZSmMNwKsNv1LTADxW"                                                                                                                                   
  33.     access_secret = "BinZ9J0zYxRbvG9wQUi6LiUjZElLTA"                                                                                                                                                                                                                                                           
  34.     endpoint = "cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                                             
  35.   }
  36.                                                                                                                                                                                                                                                                                  
  37. }
复制代码
注:下图本地待同步的数据文件夹, 本例子中只有一个 test0.csv 文件,具体内容

特别注意:如果是开通了 HDFS 的 OSS,有 2 个地方是不一样的:1 是 bucket,1 是 endpoint 。如下红色部分是开通了 HDFS 后的,被 “#” 注释掉的是未开通 HDFS 的情况。

SeaTunnel 对这 2 种情况都是支持的,只是大家要注意一下配置 bucket 和 endpoint 时的不同!
执行运行命令后,我们可以从 SeaTunnel 控制台看下以下 SeaTunnel 本次同步情况的数据:
  1.        Job Statistic Information                                                                                                                                          
复制代码
Start Time                : 2023-02-22 17:12:19
End Time                  : 2023-02-22 17:12:37
Total Time(s)             :                  18
Total Read Count          :            10000000
Total Write Count         :            10000000
Total Failed Count        :                   0
从阿里云界面上可以看到 OSS 端的监控数据:



可以看出来 SeaTunnel 快速高效地同步了 1000万数据量的本地文件!
最后,Apache SeaTunnel 目前已经支持了过百种数据源,并发布了 SeaTunnel Zeta 同步引擎,性能巨佳,还有群进行技术支持,欢迎对比,欢迎一试!感兴趣的伙伴欢迎联系社区志愿者微信: seatunnel1
参考:
1、https://seatunnel.apache.org/docs/2.3.0/start-v2/locally/deployment
2、https://seatunnel.apache.org/docs/2.3.0/start-v2/locally/quick-start-seatunnel-engine
3、https://seatunnel.apache.org
本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4