海豚²来了丨DolphinDB 集成 DolphinScheduler,任务调度更轻松 ...

打印 上一主题 下一主题

主题 672|帖子 672|积分 2016

DolphinDB 是一款高性能时序数据库。DolphinDB 集成了功能强大的编程语言和高容量高速度的批流一体数据分析系统,为海量数据(特别是时间序列数据)的快速存储、检索、计算及分析提供一站式解决方案。在实际生产环境中,经常存在数据导入、转换、查询计算,更新等一系列流程任务,各个部分之间存在依赖,如何将这些 DolphinDB 任务按照需求准确、有效率地调度,可以借用 DolphinScheduler 任务调度器。
本文将从生产环境中的一个 ETL 场景出发,将 DolphinScheduler 引入到 DolphinDB 的高可用集群中,通过使用 DolphinScheduler 提供的功能来调度 DolphinDB 的数据 ETL 作业。
1. Apache DolphinScheduler

Apache DolphinScheduler 是一个分布式易扩展的可视化 DAG 工作流任务调度开源系统。该系统适用于企业级场景,提供了一个支持可视化操作任务、工作流和全生命周期数据处理的解决方案,解决了数据研发 ETL 依赖错综复杂,无法监控任务健康状态的问题。 DolphinScheduler 以 DAG(Directed Acyclic Graph,DAG)流式方式组装任务,可以及时监控任务的执行状态,支持重试、指定节点恢复失败、暂停、恢复、终止任务等操作。
1.1 特性


  • 执行定时任务:在生产环境中,一个普遍的需求是周期性地从数据源中提取、转换、加载数据到 DolphinDB,如每一天、每个小时等,DolphinScheduler 可以方便地进行定时管理,满足需求。
  • 执行历史任务:有时候我们由于业务变动,需要将历史数据进行重新计算和加载入 DolphinDB。在这种情况下,我们仅需在 DolphinScheduler Web 界面上定义相应任务的工作流,并定义和传入开始时间和结束时间的参数。通过这个步骤,我们可以处理任意时间段的数据。
  • 并行执行任务:在业务中,我们可能需要同时处理多项数据ETL任务,如同时导入数据到 DolphinDB 的不同表或同一个表的不同分区。DolphinScheduler 允许我们并行执行多项工作流任务,提高执行效率。
  • 高效处理编排:在生产环境中,大多数情况下存在任务之间、工作流之间有条件地执行,比如在 DolphinDB 的数据ETL中,下游任务依赖于上游任务的执行状态做不同的操作,有的任务之间关系错综复杂。可以在 DolphinScheduler 中按照逻辑对工作流进行定义,轻松编排任务之间的关系。
1.2 安装部署

DolphinScheduler 可在单机、单服务器集群、多服务器集群、K8S环境下部署,本节内容将以单机部署流程作为演示内容,仅供参考。
前置条件

  • JDK:安装JDK(1.8+)并配置JAVA_HOME环境变量,DolphinScheduler的启动依赖于该环境变量,同时将其下的bin目录追加到PATH环境变量中。
  • 二进制包:已配置DolphinDB数据源的DolphinScheduler版本,下载链接在https://cdn.dolphindb.cn/downloads/apache-dolphinscheduler-3.1.7-bin.tar.gz
  • 本教程将MySQL作为 DolphinScheduler 持久化的元数据库,因此要保证服务器已安装好MySQL,若没有,需要下载安装,所有平台的mysql下载地址为:MySQL 下载地址
元数据持久化配置
单机服务使用H2数据库来存储元数据,而H2数据库是一种内存级别的数据库,因此当DolphinScheduler程序重启时,会导致之前定义的工作流等内容全部丢失,需要重新定义,造成效率低下和不必要的麻烦。因此,将元数据持久化是非常有必要的,DolphinScheduler支持MySQLPostgreSQL作为元数据的存储数据库,本文以配置MySQL为例,主要有以下流程:

  • 解压DolphinScheduler程序包
  1. tar -xvzf apache-dolphinscheduler-3.1.7-bin.tar.gz
  2. cd apache-dolphinscheduler-3.1.7-bin
复制代码

  • 进入MySQL,创建数据库和用户
  1. // 创建数据库
  2. CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
  3. // 创建用户,并设置密码
  4. CREATE USER 'dolphinscheduler'@'%' IDENTIFIED BY '密码';
  5. // 给用户赋予库的权限
  6. GRANT ALL PRIVILEGES ON dolphinscheduler.* TO 'dolphinscheduler'@'%';
  7. flush privileges;
复制代码

  • 修改apache-dolphinscheduler-3.1.7-bin/bin/env/dolphinscheduler_env.sh文件设定环境变量,将{user}和{password}改为上一步创建的用户名和密码
  1. export DATABASE=mysql
  2. export SPRING_PROFILES_ACTIVE=${DATABASE}
  3. export SPRING_DATASOURCE_URL="jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
  4. export SPRING_DATASOURCE_USERNAME={user}
  5. export SPRING_DATASOURCE_PASSWORD={password}
复制代码

  • 修改apache-dolphinscheduler-3.1.7-bin/standalone-server/conf/application.yaml文件中的配置(在文件尾部),上半部分由于这里用的不是postgresql,直接注释掉就好。将{user}和{password}改为上面创建的用户名和密码
  1. ---
  2. #spring:
  3. #  config:
  4. #    activate:
  5. #      on-profile: postgresql
  6. #  quartz:
  7. #    properties:
  8. #      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
  9. #  datasource:
  10. #    driver-class-name: org.postgresql.Driver
  11. #    url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
  12. #    username: root
  13. #    password: root
  14. ---
  15. spring:
  16.   config:
  17.     activate:
  18.       on-profile: mysql
  19.   sql:
  20.      init:
  21.        schema-locations: classpath:sql/dolphinscheduler_mysql.sql
  22.   datasource:
  23.     driver-class-name: com.mysql.cj.jdbc.Driver
  24.     url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
  25.     username: {user}
  26.     password: {password}
复制代码

  • 初始化数据库,如果上述步骤没有问题,这里就不会报错:
  1. bash apache-dolphinscheduler-3.1.7-bin/tools/bin/upgrade-schema.sh
复制代码

  • 执行完成之后进入 MySQL 查询会发现名称为dolphinscheduler的数据库已经生成了很多表格

启动DolphinScheduler单机服务器
注意:启动DolphinScheduler需要依赖多个端口号,分别是:12345、50052、25333、25334、1234、5678
使用lsof -i:检查以上端口号是否被占用,如果有端口被别的进程占用,修改apache-dolphinscheduler-3.1.7-bin/standalone-server/conf/application.yaml中对应的端口

  • 当配置好以上内容之后,进入执行apache-dolphinscheduler-3.1.7-bin目录并执行以下命令启动:
  1. bash ./bin/dolphinscheduler-daemon.sh start standalone-server
复制代码
2. 输入以下命令查看是否执行成功
​    a. 运行jps查看相应实例是否已在进程中

​    b. 运行bash ./bin/dolphinscheduler-daemon.sh status standalone-server查看 standalone-server的运行状态

3. 停止运行
  1. bash ./bin/dolphinscheduler-daemon.sh stop standalone-server
复制代码
4. 启动成功后,在浏览器中输入服务器IP:12345/dolphinscheduler/ui/login进行登录
    a. 默认用户名:admin
​    b. 默认密码:dolphinscheduler123
登陆成功后将会看到如下页面:


  • 重启dolphinscheduler服务,测试已经连接好数据库

1.3 DolphinDB 与 DolphinScheduler 结合

DolphinDB 作为强大的高性能时序数据库,能够高效存储和处理 PB 级的海量数据集,可以通过编写脚本实现数据的处理、存储、因子计算、建模、回测等任务。在实际生产中,源数据下载、数据处理、数据入库、数据校验、指标计算等任务之间存在先后关系和条件关系,如果在 DolphinDB 脚本中编写相关的逻辑关系代码,一来会造成与实际任务不相关的脚本冗余,二来如果实际业务变动,需要增加或删减部分任务,相互依赖的任务之间的关系代码也需要变动,造成更新迭代效率低下。
考虑到这种情况,如果能够将 DolphinDB 与 DolphinScheduler 结合起来,在 DolphinDB 中编写相关任务代码模块,在 DolphinScheduler 上将这些任务按照逻辑编排调度,这样,就能够将任务代码和任务之间逻辑关系分开,每个部分专注于发挥自己的作用,实现更高效地运行维护。
1.3.1 如何创建 DolphinDB 数据源


  • 在安装部署好DolphinScheduler之后,登录其Web界面,点击数据源中心并点击创建数据源

2. 输入相关参数定义,创建DolphinDB数据源

注意:数据库名和jdbc连接参数不用填,不然会报错 JDBC connect failed。
1.3.2 如何调度 DolphinDB 任务

创建DolphinDB数据源后,需要创建租户、项目、工作流,具体流程可参考:DolphinScheduler 部署流程
定义好工作流之后,点击该工作流进入操作界面,接着在左端拖拽SQL 节点进行DolphinDB任务定义
注意:SQL类型分为查询类型和非查询类型,这两种类型分别适用于不同的使用场景,在本文 2.5.2 小节中会详细介绍。


由于在SQL任务节点中,每次只能执行一行DolphinDB代码。因此,调度DolphinDB任务主要有以下两种途径:

  • run 函数
假设DolphinDB脚本在服务器上的路径是:/data/script.dos,那么,在SQL语句上可以输入:
  1. run("/data/script.dos");
复制代码
2. 函数视图
​    a. 在DolphinDB编写函数:
  1. // 在DolphinDB中定义一个函数,用于创建数据库表
  2. def createTable(dbName, tbName){
  3.         login("admin", "123456")
  4.         if(!existsDatabase(dbName)){
  5.                 db1 = database(, VALUE, 2020.01.01..2021.01.01)
  6.                 db2 = database(, HASH, [SYMBOL, 10])
  7.                 db = database(dbName, COMPO, [db1, db2], , "TSDB")
  8.         }else{
  9.                 db = database(dbName)
  10.         }
  11.     if(!existsTable(dbName,tbName)){
  12.         name =`SecurityID`ChannelNo`ApplSeqNum`MDStreamID`SecurityIDSource`Price
  13.               `OrderQty`Side`TradeTIme`OrderType`OrderIndex`LocalTime`SeqNo
  14.               `Market`DataStatus`BizIndex
  15.         type = [SYMBOL, INT, LONG, INT, INT, DOUBLE, INT,
  16.                 SYMBOL, TIMESTAMP, SYMBOL, INT, TIME, LONG, SYMBOL,INT,LONG]
  17.         schemaTable = table(1:0, name, type)
  18.         db.createPartitionedTable(table=schemaTable, tableName=tbName,
  19.                 partitionColumns=`TradeTime`SecurityID,
  20.                 compressMethods={TradeTime:"delta"},
  21.                 sortColumns=`Market`SecurityID`TradeTime, keepDuplicates=ALL)
  22.     }
  23.     else{
  24.         writeRunLog("数据库:" + dbName + " 数据表:" + tbName + " 已存在...")
  25.     }
  26. }
复制代码
​    b. 将函数添加为函数视图,作用是使得该函数视图全局可用
  1. // 添加为函数视图
  2. addFunctionView(createTable)
复制代码
​    c. 在dolphinScheduler上调用该函数视图
  1. // 在SQL语句内输入以下内容:
  2. createTable("dfs://testDb", "testTb");
复制代码
上述两种方法的区别是:

  • 使用run执行脚本不能传递参数,灵活性较差
  • 使用函数视图可以在DolphinScheduler上面传入参数,以上面为例,可以在SQL语句中输入:
  1. createTable(${dbName}, ${tbName});
复制代码
传入参数的方法有局部参数和全局参数两种方法,它们的区别是全局参数可以在创建工作流实例时直接进行修改,而局部参数需要进入到工作流中的具体任务节点进行修改:

  • 局部参数在定义任务节点时定义

2. 全局参数在保存工作流时定义

2. 调度 DolphinDB 数据 ETL 任务

2.1 任务流程结构


  • 文件结构

2.1.1 DolphinDB 功能模块部分


  • stockData: 数据导入模块

    • createStockTable.dos: 定义创建相应的股票数据库表的函数。
    • stockDataProcess.dos: 定义将源数据进行清洗、处理与格式转换的函数。
    • stockDataLoad.dos: 定义将数据导入相应的DolphinDB库表的函数。

  • minFactor: 分钟线因子指标相关模块

    • createMinFactorTable.dos: 定义创建分钟线因子指标库表的函数。
    • computeMinFactor.dos: 定义计算分钟线因子指标的函数。

  • dataCheck: 数据校验模块

    • stockCheck.dos: 定义校验导入的股票逐笔委托、快照行情、逐笔成交数据的函数。
    • minFactorCheck.dos: 定义校验分钟线因子指标数据的函数。

2.1.2 DolphinDB 脚本部分


  • initTable.dos: 执行该脚本可以创建相应的股票数据库表和分钟因子指标库表。
  • createFuncView.dos: 执行该脚本可以定义需要在dolphinScheduler上用到的函数视图。
2.2 数据介绍

本文选取了 20230201 上交所某股票 level 2 委托数据、快照数据、成交数据作为演示。以下是逐笔委托表在DolphinDB的结构。快照数据和成交数据结构可在附件中查看:
字段名字段含义数据类型(DolphinDB)ChannelNo通道代码INTApplSeqNum消息记录号LONGMDStreamID行情类别INTSecurityID证券代码SYMBOLSecurityIDSource证券代码源INTPrice委托价格DOUBLEOrderQty委托数量INTSide委托买卖方向SYMBOLTradeTime委托时间TIMESTAMPOrderType委托类型SYMBOLOrderIndex委托序号INTLocalTime本地接受时间戳TIMESeqNo消息序列号LONGMarket交易市场SYMBOLDataStatus数据状态INTBizIndex业务序列号LONG2.3 数据导入、指标计算与校验任务

注意:以下各部分内容均将相关函数定义在模块中,以方便进行工程化管理,关于DolphinDB模块的创建、加载、调用方法,请参照:DolphinDB 模块复用教程
2.3.1 数据清洗、处理、入表

由于源数据的字段结构有时候不符合我们的业务需求,因此需要增删、处理一些字段后再导入数据库,下面以逐笔委托数据为例,介绍源数据经清洗、处理后再导入库表的过程,快照和成交数据的处理逻辑与委托数据相同,详细内容可以在附件中查看。

  • 创建数据库表
逐笔委托、快照、逐笔成交数据都保存在同一个库中,本文采用了组合分区作为分区方案,第一层按天分区,第二层对股票代码分25个哈希分区。如何确定数据分区请参照:DolphinDB数据库分区教程
  1. module createStockTable
  2. // 创建逐笔委托数据存储库表
  3. def createEntrust(dbName, tbName,userName = "admin",password = "123456")
  4. {
  5.     login(userName, password)
  6.     if(!existsDatabase(dbName))
  7.     {
  8.         db1 = database(, VALUE, 2020.01.01..2021.01.01)
  9.         db2 = database(, HASH, [SYMBOL, 25])
  10.         // 按天和股票组合分区
  11.         db = database(dbName, COMPO, [db1, db2], , "TSDB")
  12.     }
  13.     else
  14.     {
  15.         db = database(dbName)
  16.     }
  17.     name=`ChannelNo`ApplSeqNum`MDStreamID`SecurityID`SecurityIDSource`Price`OrderQty`Side`TradeTIme`OrderType`OrderIndex`LocalTime`SeqNo`Market`DataStatus`BizIndex
  18.     type = [INT, LONG, INT, SYMBOL, INT, DOUBLE, INT, SYMBOL, TIMESTAMP, SYMBOL, INT, TIME, LONG, SYMBOL,INT,LONG]
  19.     schemaTable = table(1:0, name, type)
  20.     // 创建分区表
  21.     db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`TradeTime`SecurityID, compressMethods={TradeTime:"delta"}, sortColumns=`Market`SecurityID`TradeTime, keepDuplicates=ALL)
  22. }
复制代码

  • csv数据清洗与处理
  1. module stockData::stockDataProcess
  2. // 定义逐笔委托csv数据文件中各个字段的名称和字段类型
  3. def schemaEntrust()
  4. {
  5.     name = `DataStatus`OrderIndex`ChannelNo`SecurityID`TradeTime`OrderType`ApplSeqNum`Price`OrderQty`Side`BizIndex`LocalTime`SeqNo
  6.     typeString = `INT`LONG`INT`SYMBOL`TIME`SYMBOL`INT`DOUBLE`INT`SYMBOL`INT`TIME`INT
  7.     return table(name, typeString)
  8. }
  9. // 数据处理函数,包括字段增加,数据去重等操作
  10. def processEntrust(loadDate, mutable t)
  11. {
  12.     // 字段名替换
  13.     t.replaceColumn!(`TradeTime, concatDateTime(day, t.TradeTime))
  14.     n1 = t.size()
  15.     // 数据去重
  16.     t = select * from t where isDuplicated([DataStatus, OrderIndex, ChannelNo, SecurityID, TradeTime, OrderType, ApplSeqNum, Price, OrderQty, Side, BizIndex],FIRST)=false
  17.     n2 = t.size()
  18.     // 增加字段
  19.     update t set Market = `sh
  20.     update t set MDStreamID = int(NULL)
  21.     update t set SecurityIDSource = int(NULL)
  22.     reorderColumns!(t, `ChannelNo`ApplSeqNum`MDStreamID`SecurityID`SecurityIDSource`Price`OrderQty`Side`TradeTime`OrderType`OrderIndex`LocalTime`SeqNo`Market`DataStatus`BizIndex)
  23.     return t,n1,n2
  24. }
复制代码

  • 将处理后的数据导入数据库表
  1. module stockData::stockDataLoad
  2. use stockData::stockDataProcess
  3. def loadEntrust(userName, userPassword, startDate, endDate, dbName, tbName, filePath, loadType,mutable infoTb)
  4. {
  5.         for(loadDate in startDate..endDate)
  6.         {
  7.                 // 删除已有数据
  8.                 dateString = temporalFormat(loadDate,"yyyyMMdd")
  9.                 dataCount = exec count(*) from loadTable(dbName, tbName) where date(tradeTime)=loadDate
  10.                 // 如果表里面已经存在当天要处理的数据,删除库里面已有数据
  11.                 if(dataCount != 0){
  12.                         msg = "Start to delete the entrust data, the delete date is: " + dateString
  13.                         print(msg)
  14.                         infoTb.tableInsert(msg)
  15.                         dropPartition(database(dbName), loadDate, tbName)
  16.                         msg = "Successfully deleted the entrust data, the delete date is: " + dateString
  17.                         print(msg)
  18.                         infoTb.tableInsert(msg)
  19.                 }
  20.                 // 数据导入
  21.                 // 判断数据csv文件是否存在
  22.                 fileName = filePath + "/" + dateString + "/" + "entrust.csv"
  23.                 if(!exists(fileName))
  24.                 {
  25.                         throw fileName + "不存在!请检查数据源!"
  26.                 }
  27.                 // 如果是全市场数据,数据量较大,因此分批导入
  28.                 schemaTB = schemaEntrust()
  29.                 tmpData1 = loadText(filename=fileName, schema=schemaTB)
  30.                 tmpData1,n1,n2 = processEntrust(loadDate,tmpData1)
  31.                 pt = loadTable(dbName,tbName)
  32.                 msg = "the data size in csv file is :" + n2 + ", the duplicated count is " + (n1 - n2)
  33.                 print(msg)
  34.                 infoTb.tableInsert(msg)
  35.                 for(i in 0..23)
  36.                 {
  37.                         startTime = 08:00:00.000 + 1200 * 1000 * i
  38.                         tmpData2 = select * from tmpData1 where time(TradeTime)>=startTime and time(TradeTime)<(startTime+ 1200 * 1000)
  39.                         if(size(tmpData2) < 1)
  40.                         {
  41.                                 continue
  42.                         }
  43.                         //数据入库
  44.                         pt.append!(tmpData2)
  45.                 }
  46.                 msg = "successfully loaded!"
  47.                 print(msg)
  48.                 infoTb.tableInsert(msg)
  49.         }
  50. }
复制代码
2.3.3 数据校验与 K 分钟线指标校验

由于K分钟线因子指标计算依赖于上游导入数据的正确性,而业务中又依赖于K分钟线指标数据的正确性,因此,对这两部分数据进行校验是有必要的,下面介绍部分校验步骤,详细校验内容请参照附件

  • 股票数据校验
  1. module minFactor::createMinFactorTable
  2. def createMinuteFactor(dbName, tbName)
  3. {
  4.         if(existsDatabase(dbName)){
  5.                 dropDatabase(dbName)
  6.         }
  7.         //按天分区
  8.         db = database(dbName, VALUE, 2021.01.01..2021.01.03,engine = `TSDB)
  9.         colName = `TradeDate`TradeTime`SecurityID`Open`High`Low`Close`Volume`Amount`Vwap
  10.         colType =[DATE, MINUTE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE]
  11.         tbSchema = table(1:0, colName, colType)
  12.           db.createPartitionedTable(table=tbSchema,tableName=tbName,partitionColumns=`TradeDate,sortColumns=`SecurityID`TradeTime,keepDuplicates=ALL)
  13. }
复制代码

  • K分钟线指标校验
  1. module minFactor::computeMinFactor
  2. def calFactorOneMinute(dbName, startDate, endDate, mutable factorTb,mutable infoTb)
  3. {
  4.         pt = loadTable(dbName, "trade")
  5.         dayList = startDate..endDate
  6.         if(dayList.size()>12) dayList = dayList.cut(12)
  7.         for(days in dayList){
  8.                 //计算分钟 K 线
  9.                 res = select first(TradePrice) as open, max(TradePrice) as high, min(TradePrice) as low, last(TradePrice) as close, sum(tradeQty) as volume,sum(TradePrice*TradeQty) as amount,sum(TradePrice*TradeQty)\sum(TradeQty) as vwap from pt where date(tradeTime) in days group by date(tradeTime) as TradeDate,minute(tradeTime) as TradeTime, SecurityID
  10.                 msg = "Start to append minute factor result , the days is: [" + concat(days, ",")+"]"
  11.                 print(msg)
  12.                 infoTb.tableInsert(msg)
  13.                 //分钟 K 线入库
  14.                 factorTb.append!(res)
  15.                 msg = "Successfully append the minute factor result to databse, the days is: [" + concat(days, ",")+"]"
  16.                 print(msg)
  17.                 infoTb.tableInsert(msg)
  18.         }
  19. }
复制代码
2.4 实现 DolphinDB 任务调度

在定义好数据ETL模块之后,就可以通过以下步骤实现DolphinDB ETL任务的调度:

  • 创建相关库表
第一次执行时,需要创建相关数据库表
  1. module dataCheck::stockCheck
  2. def checkStockCounts(idate,dbName)
  3. {
  4.         // 校验逐笔委托、快照行情、逐笔成交表的股票个数是否一致
  5.         getCodes = def (dbName,tbName,idate) {
  6.                 tb = loadTable(dbName,tbName)
  7.                 return exec distinct(SecurityID) from tb where date(tradetime)=idate and ((Market=`sh and SecurityID like "6%")or(Market=`sz and (SecurityID like "0%" or SecurityID like "3%" ) ))
  8.         }
  9.         entrustCodes = getCodes(dbName,"entrust",idate)
  10.         tradeCodes = getCodes(dbName,"trade",idate)
  11.     snapshotCodes = exec distinct(SecurityID) from loadTable(dbName,"snapshot") where date(tradetime)=idate and ((Market=`sh and SecurityID like "6%")or(Market=`sz and (SecurityID like "0%" or SecurityID like "3%" ))) and  HighPrice != 0
  12.         if(entrustCodes.size() != snapshotCodes.size() or entrustCodes.size() != tradeCodes.size() or snapshotCodes.size() != tradeCodes.size())
  13.         {
  14.                 throw "逐笔委托股票数量:" + size(entrustCodes) + " 快照行情股票数量:" + size(snapshotCodes) + " 逐笔成交股票数量:" + size(tradeCodes) + ", 它们数量不一致!"
  15.         }
  16. }
复制代码

  • 定义函数视图并执行
由于每日定时ETL任务与历史批量ETL任务的整体处理逻辑相同,因此,在定义函数视图时通过传入参数来区分不同类型的任务,以定义逐笔委托数据导入任务函数视图为例,具体如下(全部函数视图定义请参照附件):
  1. module dataCheck::minFactorCheck
  2. def checkHighLowPrice(idate,dbName,tbName)
  3. {
  4.         // 分钟线最高价指标与最低价指标校验
  5.         tb= loadTable(dbName,tbName)
  6.         temp=select * from tb where tradedate=idate and High < Low
  7.         if(size(temp)>0)
  8.         {
  9.                 throw "分钟线计算错误!分钟线最高价小于最低价!"
  10.         }
  11. }
  12. def checkVolumeAmount(idate,dbName,tbName)
  13. {
  14.         // 分钟线交易量与交易额指标校验
  15.         tb = loadTable(dbName,tbName)
  16.         temp = select * from loadTable(dbName,tbName) where tradedate=idate and ((Volume == 0 and Amount != 0) or (Volume != 0 and Amount == 0))
  17.         if(size(temp)>0)
  18.         {
  19.                 throw "分钟线计算错误!交易量和交易额不同时为0!"
  20.         }
  21. }
复制代码
注意:在定义函数视图时,一些默认参数如数据存放路径:filePath 需要根据实际情况进行更改


  • 创建DolphinDB任务节点
创建好每个任务的函数视图之后,每个函数视图对应于DolphinScheduler上的一个任务节点,以逐笔委托数据导入任务为例,我们分以下两种情况:

  • 对于每日定时任务,由于我们在定义函数视图时已经针对每日定时任务做了默认参数处理,因此仅需在DolphinScheduler任务节点的SQL语句中输入:
  1. use stockData::createStockTable
  2. use minFactor::createMinFactorTable
  3. // 创建数据库表,库表名可以根据实际需要修改
  4. createEntrust("dfs://stockData", "entrust")
  5. createSnapshot("dfs://stockData", "snapshot")
  6. createTrade("dfs://stockData", "trade")
  7. createMinuteFactor("dfs://factorData", "stockMinFactor")
复制代码
2. 对于历史批量任务,我们需要传入三个参数:开始时间_startDate_,结束时间_endDate_以及任务类型_loadType_,在DolphinScheduler任务节点的SQL语句中需要输入:
  1. use stockData::stockDataLoad
  2. // 定义函数
  3. def loadEntrustFV(userName="admin" , userPassword="123456", startDate = 2023.02.01, endDate = 2023.02.01, dbName = "dfs://stockData", tbName = "entrust", filePath = "/hdd/hdd8/ymchen", loadType = "daily")
  4. {
  5.         infoTb = table(1:0,["info"] ,[STRING])
  6.         if(loadType == "daily")
  7.         {
  8.                 sDate = today()
  9.                 eDate = today()
  10.                 loadEntrust(userName, userPassword, sDate, eDate, dbName, tbName, filePath, loadType,infoTb)
  11.         }
  12.         else if(loadType == "batch")
  13.         {
  14.                 loadEntrust(userName, userPassword, date(startDate), date(endDate), dbName, tbName, filePath, loadType,infoTb)
  15.         }
  16.         return infoTb
  17. }
复制代码
注意:需要在DolphinScheduler上面定义局部参数或全局参数_startDate_和_endDate_,如何定义请参照本文 1.3.2 小节


  • 创建DolphinDB任务工作流
我们需要在DolphinScheduler上创建两个工作流,一个是定时任务工作流,一个是历史批量任务工作流。在每个工作流中需要根据ETL流程编排具有逻辑关系的任务节点。以历史批量任务为例,创建如下工作流:

创建任务工作流之后,点击运行按钮就可以开始执行,点击定时按钮就可以进行定时管理

运行任务后,工作流实例为绿色代表整个工作流运行成功;黑色则表示存在失败任务,可以通过双击失败的工作流实例查看具体是哪个任务执行失败。
在DolphinScheduler中,可以导入工作流导出工作流,以上介绍的DolphinDB每日任务与批量任务,可以直接通过附件中对应的json文件直接导入。


  • 调度DolphinDB ETL 工作流
导入工作流之后,点击执行按钮就可以开始执行
2.5 获取 DolphinDB 任务调度结果

2.5.1 查看 DolphinDB 任务执行情况

在DolphinScheduler上运行DolphinDB工作流任务之后,可以通过以下步骤查看工作流任务运行情况:

  • 进入工作流界面,可以看到所有工作流实例的状态,在状态栏,齿轮形状代表正在运行,绿色打勾代表工作流任务成功运行,黑色打叉代表工作流任务运行失败。


  • 在工作流实例名称下,点击想要查看的工作流实例,进入该工作流详情界面:


  • 如上图所示,我们可以看到股票委托、快照、成交数据导入任务成功了,但是股票数据校验任务失败了,导致整个工作流任务执行失败。在该任务节点上点击鼠标右键,然后点击查看日志,就可以查看该任务节点具体的报错信息:

2.5.2 获取 DolphinDB 任务运行过程中的信息

在任务执行过程中,在 DolphinScheduler 日志中并不能显示 DolphinDB 脚本中通过print函数输出的信息,但是在实践中大多数情况下存在保存任务运行信息以查看任务具体执行情况的需求。以下内容首先介绍DolphinDB SQL任务节点查询类型非查询类型的特点,然后讲述如何在DolphinScheduler 任务节点的日志信息中显示DolphinDB任务的运行信息。
SQL 任务节点非查询类型
非查询类型主要用于无结果返回的函数,它的特点主要有:

  • 可以以分段执行符号为界,执行多段代码。
  • 在任务节点的日志只能在报错时查看报错信息,在任务执行成功时并没有详细的运行信息。
SQL 任务节点查询类型
查询类型主要用于有结果集返回的函数,它的特点主要有:

  • 在任务节点的日志不仅能在报错时查看报错信息,而且当任务执行成功时,能够在节点日志中查看在DolphinDB脚本中设定的运行信息。
  • 只能执行一行语句,不能执行多段代码。
如何获取 DolphinDB 任务运行过程中的信息
下面,以股票委托数据导入任务为例,介绍如何在DolphinDB脚本中在不同运行阶段设定运行日志信息,以及在DolphinScheduler上当任务执行完毕之后在日志中显示完整的运行信息。

  • 在股票委托数据导入模块的函数中引入一个参数,该参数为内存表_infoTb_,当任务执行过程中,将需要记录的运行信息写入该表。
[code]// 模块 stockData::stockDataLoad 的 loadEntrust 函数定义如下:module stockData::stockDataLoaduse stockData::stockDataProcessdef loadEntrust(userName, userPassword, startDate, endDate, dbName, tbName, filePath, loadType,mutable infoTb){    for(loadDate in startDate..endDate)    {        // 删除已有数据        dateString = temporalFormat(loadDate,"yyyyMMdd")        dataCount = exec count(*) from loadTable(dbName, tbName) where date(tradeTime)=loadDate        // 如果表里面已经存在当天要处理的数据,删除库里面已有数据        if(dataCount != 0){            msg = "Start to delete the entrust data, the delete date is: " + dateString            print(msg)            // 将运行信息添加到表中            infoTb.tableInsert(msg)            dropPartition(database(dbName), loadDate, tbName)            msg = "Successfully deleted the entrust data, the delete date is: " + dateString            print(msg)            infoTb.tableInsert(msg)        }        // 数据导入        // 判断数据csv文件是否存在        fileName = filePath + "/" + dateString + "/" + "entrust.csv"        if(!exists(fileName))        {            throw fileName + "不存在!请检查数据源!"        }        // 如果是全市场数据,数据量较大,因此分批导入        schemaTB = schemaEntrust()        tmpData1 = loadText(filename=fileName, schema=schemaTB)        tmpData1,n1,n2 = processEntrust(loadDate,tmpData1)        pt = loadTable(dbName,tbName)        msg = "the data size in csv file is :" + n2 + ", the duplicated count is " + (n1 - n2)        print(msg)        infoTb.tableInsert(msg)        for(i in 0..23)        {            startTime = 08:00:00.000 + 1200 * 1000 * i            tmpData2 = select * from tmpData1 where time(TradeTime)>=startTime and time(TradeTime)

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

飞不高

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表