【共建开源】手把手教你贡献一个 SeaTunnel PR,超级详细教程! ...

打印 上一主题 下一主题

主题 566|帖子 566|积分 1698


Apache SeaTunnel是一个非常易于使用的、超高性能的分布式数据集成平台,支持海量数据的实时同步。每天可稳定高效同步数百亿数据,已被近百家企业投入生产使用。
现在的版本不支持通过jtds的方式链接sqlserver,我们来自己写代码来实现它,并把代码提交给apache seatunnel。
1. 下载源代码

1.首先从远端仓库 https://github.com/apache/seatunnel fork一份代码到自己的仓库中


2.远端仓库中目前有超过30个分支:

  • dev :日常开发分支
  • 其他分支 :发布版本分支

3.把自己仓库clone到本地
git clone git@github.com:yougithubID/seatunnel.git


  • 添加远端仓库地址,命名为upstream
这一步是为了让本地代码知道他的上游是apache/seatunnel
git remote add upstream git@github.com:apache/seatunnel.git

5.查看仓库:
git remote -v

此时会有两个仓库:origin(自己的仓库)和upstream(远端仓库)
6.获取/更新远端仓库代码(已经是最新代码,就跳过)
git fetch upstream

2. 编写代码

1.加载拉取到本地的代码到IDEA中

这里我们需要注意两个module:seatunnel-connectors-v2和seatunnel-examples,其中seatunnel-connectors-v2是我们来写代码的module,seatunnel-examples是我们用来测试代码的module。
2.编写代码
目前代码中已经实现了基于JDBC的方式取链接SqlServer。我们只需要在它的基础之上去做一定的修改即可,经过debug来阅读源码,并了解了需要改的地方如下图:

代码实现如下:
SqlServerDialectFactory.java
  1. return (url.startsWith("jdbc:jtds:sqlserver:") || url.startsWith("jdbc:sqlserver:"));
复制代码
SqlserverTypeMapper.java
  1. private static final String SQLSERVER_SYSNAME = "SYSNAME";
  2. case SQLSERVER_SYSNAME:
  3.                 return BasicType.STRING_TYPE;
复制代码
SimpleJdbcConnectionProvider.java
  1. public boolean isConnectionValid() throws SQLException {
  2.         if (connection != null && connection.toString().startsWith("net.sourceforge.jtds")){
  3.             return connection != null
  4.                     && !connection.isClosed();
  5.         }else {
  6.             return connection != null
  7.                     && connection.isValid(jdbcConfig.getConnectionCheckTimeoutSeconds());
  8.         }
  9.     }
复制代码
pom.xml
  1. <jtds.version>1.3.1</jtds.version>
  2. <dependency>
  3.                 <groupId>net.sourceforge.jtds</groupId>
  4.                 <artifactId>jtds</artifactId>
  5.                 <version>${jtds.version}</version>
  6.                 <scope>provided</scope>
  7.             </dependency>
  8. <dependency>
  9.             <groupId>net.sourceforge.jtds</groupId>
  10.             <artifactId>jtds</artifactId>
  11.             <version>${jtds.version}</version>
  12.         </dependency>
复制代码
3. 测试代码

1.编写config文件,我们测试通过net.sourceforge.jtds.jdbc.Driver 从sqlserver中读出数据再写入sqlserver中
  1. env {
  2.   # You can set flink configuration here
  3.   execution.parallelism = 1
  4.   job.mode = "BATCH"
  5.   #execution.checkpoint.interval = 10000
  6.   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
  7. }
  8. source {
  9.   # This is a example source plugin **only for test and demonstrate the feature source plugin**
  10.   Jdbc {
  11.     driver = net.sourceforge.jtds.jdbc.Driver
  12.     url = "jdbc:jtds:sqlserver://localhost:1433/dbname"
  13.     user = SA
  14.     password = "A_Str0ng_Required_Password"
  15.     query = "select age, name from source"
  16.   }
  17.   # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
  18.   # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
  19. }
  20. transform {
  21.   # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  22.   # please go to https://seatunnel.apache.org/docs/transform-v2/sql
  23. }
  24. sink {
  25.   Jdbc {
  26.     driver = net.sourceforge.jtds.jdbc.Driver
  27.     url = "jdbc:jtds:sqlserver://localhost:1433/dbname"
  28.     user = SA
  29.     password = "A_Str0ng_Required_Password"
  30.     query = "insert into sink(age, name) values(?,?)"
  31.   }
  32.   # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
  33.   # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
  34. }
复制代码

2.修改seatunnel-flink-connector-v2-example中的SeaTunnelApiExample,写入我们写好的config文件

添加seatunnel-flink-connector-v2-example pom文件中添加jdbc依赖

3.运行代码
运行SeaTunnelApiExample,右键->run
4. 提交issue

issue的作用就是告诉社区我们打算做什么事情,后续的PR就是来提交代码解决这个issue。除此以外issue也是我们来提出bug或者其他想法的地方。不一定自己来实现它。你提出来,别人能解决,他们就会提交PR来解决这个问题。



我这里提交了一个叫[Feature][Connector-V2][SqlServer] Support driver jtds for SqlServer #5307 的issue,其中Feature可以按具体的内容换成BUG/DOCS等等 Connector-V2可以换成其他的具体模块,这里大家可以参照别人已经提的issue来命名。
注:带"*"的都是必填项
5. 提交代码
  1. git commit -m 'commit content'
  2. git push
复制代码
提交后在github上查看提交代码详情

6. 提交PR(Pull Request)

提交完成后,因为我们的仓库的代码的上游是apache/seatunnel,在conribute中我们就可以去提交一个PR。

点击Open pull request,就会跳转到上游apache/seatunnel下面的Pull Request列表,并自动开发创建PR页面。填写相关内容,PR的名称就跟需要解决的这个issue一样就可以了,需要注意的时候,最后要带上issue的ID ,我这里是#5307 ,之后点击提交就可以了。


之后就等社区的管理员审核就可以了,这个过程中可能会在PR中进行留言交流,必要的话,再修改代码,重新提交代码,openPR...
本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

郭卫东

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表