ToB企服应用市场:ToB评测及商务社交产业平台

标题: 拈花云科基于 Apache DolphinScheduler 在文旅业态下的实践 [打印本页]

作者: 用户国营    时间: 2023-6-30 01:18
标题: 拈花云科基于 Apache DolphinScheduler 在文旅业态下的实践

作者|云科NearFar X Lab团队 左益、周志银、洪守伟、陈超、武超
一、导读

无锡拈花云科技服务有限公司(以下简称:拈花云科)是由拈花湾文旅和北京滴普科技共同孵化的文旅目的地数智化服务商。2022年底,拈花云科NearFar X Lab团队开始测试DolphinScheduler作为交付型项目和产品项目的任务调度工具。本文主要分享了拈花云科在任务调度工具的选择、迭代和实践过程中的经验,希望对大家有所启发。
二、业务背景

我们的服务对象主要是国内各个景区、景点,业务范围涵盖文旅行业的多个板块,如票务、交通、零售、住宿、餐饮、演绎、游乐、影院、KTV、租赁、服务、会务、康乐、康养、电商、客服、营销、分销、安防等。由于业务系统来源较多,多系统下的数据源类型差异化较大,所以在实施数据项目时我们需要能够支持多种数据来源(Mysql、Oracle、SqlServer、Hive、Excel……)的数据集成任务。同时根据大部分景区为国有化的特点,我们也需要具备能够提供私有化交付部署及SAAS化数据中台产品解决方案的双重服务支撑能力。
三、DolphinScheduler 调度系统选型过程

在团队成立之初为了快速构建MVP业务版本,我们沿用了团队同事之前用过的Kettle调度方案。该方案下通过Kettle完成可视化调度的配置及对于异构数据的集成任务,通过Python 调用HQL脚本完成基于Hive的传参数据计算。

基于MVP的构建,我们也开始思考,在我们的整体中台架构下该需要一个什么样的调度系统,以及除了调度这件事本身我们还需要哪些功能和能力。带着这些问题我们开始整理自己的需求,以及每个需求下有什么样的产品可以适配。
调度系统需要支撑的应用场景

文旅业态下的数据使用场景与其它业态下的使用场景大体相同,主要分为以下四类:

调度系统需要支撑的项目类型

我们选择的调度系统需要同时具备实施类项目、SAAS产品两种需求下的数据中台支撑能力

基于以上需求我们进行了调度系统的选型对比。网上有非常多关于Oozie、Azkaban、Airflow、DolphinScheduler、Xxl-job、Kettle等调度选型的文章及介绍,在此不过多的展开他们的优缺点。我们觉得每个产品的设计都有它自身的考量,都有适用与不适用的场景。结合我们自身的使用需求最终我们选择了使用DolphinScheduler作为数据中台的调度平台。
主要原因如下:
四、基于DolphinScheduler的项目实践

1、DolphinScheduler ON Kubernetes

DolphinScheduler支持多种部署方式:单机部署(Standalone)、伪集群部署(Pseudo-Cluster)、集群部署(Cluster)、Kubernetes部署(Kubernetes)。在项目实施的场景下由于客户提供的部署环境千变万化,我们需要一种稳定、快速、不挑环境的部署方式。Apache DolphinScheduler on K8S的部署方式很好的满足了我们的需求,此部署方式能极大的提高整体项目的部署效率及动态扩展性。
在部署Apache DolphinScheduler on K8S 的过程中我们也曾遇到过一些问题,下面是我们总结的一些Kubernetes部署要点:
自定义镜像
  1. FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-alert-server:版本号
  2. # 如果你想支持 MySQL 数据源
  3. COPY ./mysql-connector-java-8.0.16.jar /opt/dolphinscheduler/libs
复制代码
dolphinscheduler-api
  1. FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-api:版本号
  2. # 如果你想支持 MySQL 数据源
  3. COPY ./mysql-connector-java-8.0.16.jar /opt/dolphinscheduler/libs
  4. # 如果你想支持 Oracle 数据源
  5. COPY ./ojdbc8-19.9.0.0.jar /opt/dolphinscheduler/libs
复制代码
dolphinscheduler-master
  1. FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-master:版本号
  2. # 如果你想支持 MySQL 数据源
  3. COPY ./mysql-connector-java-8.0.16.jar /opt/dolphinscheduler/libs
复制代码
dolphinscheduler-tools
  1. FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-tools:版本号
  2. # 如果你想支持 MySQL 数据源
  3. COPY ./mysql-connector-java-8.0.16.jar /opt/dolphinscheduler/libs
复制代码
dolphinscheduler-worker
  1. FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-worker:版本号
  2. # 如果你想支持 MySQL 数据源
  3. COPY ./mysql-connector-java-8.0.16.jar /opt/dolphinscheduler/libs
  4. # 如果你想支持 Oracle 数据源
  5. COPY ./ojdbc8-19.9.0.0.jar /opt/dolphinscheduler/libs
  6. # 如果你想支持 hadoop 数据源
  7. COPY ./hadoop-common-2.7.3.jar /opt/dolphinscheduler/libs
  8. COPY ./hadoop-core-1.2.1.jar /opt/dolphinscheduler/libs
  9. # 如果你想支持 hive 数据源
  10. COPY ./hive-common.jar /opt/dolphinscheduler/libs
  11. COPY ./hive-jdbc.jar /opt/dolphinscheduler/libs
  12. COPY ./hive-metastore.jar /opt/dolphinscheduler/libs
  13. COPY ./hive-serde.jar /opt/dolphinscheduler/libs
  14. COPY ./hive-service.jar /opt/dolphinscheduler/libs
  15. # 安装python3环境
  16. RUN apt-get update && \
  17.     apt-get install -y --no-install-recommenApache DolphinScheduler curl && \
  18.     rm -rf /var/lib/apt/lists/*
  19. RUN apt-get update && \
  20.     apt-get install -y --no-install-recommenApache DolphinScheduler libcurl4-openssl-dev libssl-dev && \
  21.     rm -rf /var/lib/apt/lists/*
  22. RUN apt-get update && \
  23.     apt-get install -y --no-install-recommenApache DolphinScheduler python3 && \
  24.     rm -rf /var/lib/apt/lists/*
  25. RUN apt-get update && \
  26.     apt-get install -y --no-install-recommenApache DolphinScheduler python3-pip && \
  27.     rm -rf /var/lib/apt/lists/*
  28.    
  29. # 安装dataX 并且解压缩
  30. COPY ./datax.tar.gz /home
  31. RUN tar -zxvf /home/datax.tar.gz -C /opt
复制代码
配置文件修改
官方教程中的helm进行安装,在安装过程中需要修改源码中 "/deploy/kubernetes/dolphinscheduler/" 路径下的 "values.yaml,Chart.yaml" 里的相关镜像和版本(注:原配置没有指定持久化储存类,会使用默认的存储类,建议是修改并使用可多节点读写并且可以弹性扩展的,多节点读写便于worker节点共用同一套lib)
生产配置
k8s部署总结
采用k8s部署后,最大感受就是可排除环境干扰,快速扩展,迁移,部署,帮助我司实现了数据中台私有化中的调度标准化,该方案已在多个景区中进行快速落地并应用。
2、基于SQL脚本血缘的DolphinScheduler工作流自动化实现

项目背景

基于景区中各个业务系统(票务、营销、安防、酒店、商业、能耗、停车等)在景区机房下建设数据中台,完成以下应用需求:
技术选型

数据仓库:Doris
调度工具:DolphinScheduler 使用版本:3.0.0
版本管理:Gitlab
容器编排:Kubernete
处理流程

下面介绍下我们根据SQL血缘识别自动生成Apache DolphinScheduler调度任务的实现过程:
在DolphinScheduler平台上开发数据调度工作流的过程中我们遇到一个问题:如果一个工作流下的任务量太多,在原有的UI界面中想通过界面方式完成配置更改以及血缘关系的建立等操作会非常不便捷。即便是通过任务定义去配置,当上百个任务同时需要配置依赖关系时也是一个不小的工作量开销而且还容易出错,后期的更新迭代也较为不便。
我们结合工作流下SQL任务本身的特点(数仓SQL任务是整体按照Apache DolphinScheduler、DWD、DWS、Apache DolphinScheduler 的计算流程进行计算,每个表对应一个Apache DolphinScheduler的Task既每个Task都会包含SourceTable及TargetTabe。通过这层关系我们就可以构建起完整的数仓任务血缘依赖)。基于以上想法我们实现了从数据脚本自动生成对应的工作流和任务的自动化方案:
以下是该实现的核心代码:
sql解析
  1. SqlParse是使用阿里的druid中的组件MySqlStatementParser
  2. /**
  3. * sql解析
  4. * @param sqlStr
  5. * @return
  6. */
  7. public static Map<String, Set<String>> sqlParser(String sqlStr) {
  8.     List<String> sqlList = StrUtil.split(sqlStr, ";");
  9.     Map<String, Set<String>> map = new HashMap<>();
  10.     for (String sql : sqlList) {
  11.         if (StrUtil.isBlank(sql)) {
  12.             continue;
  13.         }
  14.         // 这里使用的时 mysql 解析
  15.         MySqlStatementParser parser = new MySqlStatementParser(sql);
  16.         SQLStatement sqlStatement = parser.parseStatement();
  17.         MySqlSchemaStatVisitor visitor = new MySqlSchemaStatVisitor();
  18.         sqlStatement.accept(visitor);
  19.         Map<TableStat.Name, TableStat> tableStatMap = visitor.getTables();
  20.         for (Map.Entry<TableStat.Name, TableStat> tableStatEntry : tableStatMap.entrySet()) {
  21.             String name = tableStatEntry.getKey().getName();
  22.             // 这里的 value 有两种 Select(父级)、Insert(子级)
  23.             String value = tableStatEntry.getValue().toString();
  24.             if (map.containsKey(value)) {
  25.                 map.get(value).add(name);
  26.             } else {
  27.                 Set<String> list = new HashSet<>();
  28.                 list.add(name);
  29.                 map.put(value, list);
  30.             }
  31.         }
  32.     }
  33.     return map;
  34. }
复制代码
节点对象定义
  1. /**
  2. * 任务节点定义
  3. */
  4. public class Apache DolphinSchedulerTaskNode implements Serializable {
  5.     private static final long serialVersionUID = 1L;
  6.     /**
  7.      * 源表
  8.      */
  9.     private List<String> sourceTableName = new ArrayList<>();
  10.     /**
  11.      * 目标表
  12.      */
  13.     private String targetTableName;
  14.     /**
  15.      * 源sql
  16.      */
  17.     private String sql;
  18.     /**
  19.      * 用sql做一个MD5签名
  20.      */
  21.     private String md5;
  22.     /**
  23.      * 用sql名称
  24.      */
  25.     private String name;
  26.     /**
  27.      * 任务code
  28.      */
  29.     private Long taskCode;
  30.     ...
  31. }
  32. /**
  33. * 树型节点定义
  34. */
  35. public class MyTreeNode extenApache DolphinScheduler Apache DolphinSchedulerTaskNode {
  36.     /**
  37.      * 添加一个子节点列表属性
  38.      */
  39.     private List<MyTreeNode> children;
  40.     ...
  41. }
复制代码
树型结构组装
  1. /**
  2. * 解析sql,并组装node
  3. *
  4. * @param files
  5. * @return
  6. */
  7. private static List<MyTreeNode> treeNodeProcess(List<File> files) {
  8.     List<MyTreeNode> sourceList = new ArrayList<>();
  9.     for (File sqlFile : files) {
  10.         // 1 取出里面的 sql 脚本内容
  11.         String sql = FileUtil.readUtf8String(sqlFile);
  12.         // 2 解析里面的脚本内容
  13.         Map<String, Set<String>> map = null;
  14.         try {
  15.             // 血缘解析
  16.             map = AutoCreateTask.sqlParser(sql);
  17.         } catch (Exception e) {
  18.             log.error(" table-parser error: {}", sqlFile.getPath());
  19.         }
  20.         // 3 将每一个sql的 source , target 解析出来
  21.         if (ObjectUtil.isNotNull(map)) {
  22.             MyTreeNode treeNode = new MyTreeNode();
  23.             treeNode.setName(sqlFile.getName());
  24.             if (map.containsKey("Select")) {
  25.                 Set<String> select = map.get("Select");
  26.                 treeNode.setSourceTableName(new ArrayList<>(select));
  27.             }
  28.             treeNode.setSql(sql);
  29.             if (map.containsKey("Insert")) {
  30.                 Set<String> insert = map.get("Insert");
  31.                 treeNode.setTargetTableName(new ArrayList<>(insert).get(0));
  32.             }
  33.             sourceList.add(treeNode);
  34.         }
  35.     }
  36.     // 将sql按照 source , target 组成 树状结构
  37.     return TreeUtil.getTree(sourceList);
  38. }
  39. /**
  40. * 组成树状结构
  41. * @param sourceList
  42. * @return
  43. */
  44. public static List<MyTreeNode> getTree(List<MyTreeNode> sourceList) {
  45.     Map<String, Set<MyTreeNode>> sourceMap = new HashMap<>();
  46.     Map<String, Set<MyTreeNode>> targetMap = new HashMap<>();
  47.     for (MyTreeNode node : sourceList) {
  48.         //源表Map
  49.         List<String> subSourceTableList = node.getSourceTableName();
  50.         if (IterUtil.isNotEmpty(subSourceTableList)) {
  51.             for (String subSourceTable : subSourceTableList) {
  52.                 if (sourceMap.containsKey(subSourceTable)) {
  53.                     sourceMap.get(subSourceTable).add(node);
  54.                 } else {
  55.                     Set<MyTreeNode> nodeSet = new HashSet<>();
  56.                     nodeSet.add(node);
  57.                     sourceMap.put(subSourceTable, nodeSet);
  58.                 }
  59.             }
  60.         }
  61.         //目标表Map
  62.         String targetTableName = node.getTargetTableName();
  63.         if (targetMap.containsKey(targetTableName)) {
  64.             targetMap.get(targetTableName).add(node);
  65.         } else {
  66.             Set<MyTreeNode> nodeSet = new HashSet<>();
  67.             nodeSet.add(node);
  68.             targetMap.put(targetTableName, nodeSet);
  69.         }
  70.     }
  71.     //创建一个列表,用于存储根节点
  72.     List<MyTreeNode> rootList = new ArrayList<>();
  73.     for (MyTreeNode node : sourceList) {
  74.         // 将子节点处理好
  75.         String targetTableName = node.getTargetTableName();
  76.         if (sourceMap.containsKey(targetTableName)) {
  77.             List<MyTreeNode> childrenList = node.getChildren();
  78.             if (IterUtil.isEmpty(childrenList)) {
  79.                 childrenList = new ArrayList<>();
  80.                 node.setChildren(childrenList);
  81.             }
  82.             childrenList.addAll(sourceMap.get(targetTableName));
  83.         }
  84.         List<String> subSourceTableList = node.getSourceTableName();
  85.         boolean isRoot = true;
  86.         for (String subSourceTable : subSourceTableList) {
  87.             if (targetMap.containsKey(subSourceTable)) {
  88.                 isRoot = false;
  89.                 break;
  90.             }
  91.         }
  92.         if (isRoot) {
  93.             rootList.add(node);
  94.         }
  95.     }
  96.     return rootList;
  97. }
复制代码
部分效果图展示:
自动化生成的任务定义

自动化生成的工作流定义图

增量运行结果

自动化实现总结
五、未来规划

六、总结与致谢

本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4