宁睿 发表于 2024-12-5 10:30:27

离线数据同步变迁

第一代-基于Hadoop体系的离线数据同步

一、背景

随着业务的发展,体系举行了微服务的差分,导致数据越来越分散,很难举行一个完整的生命周期的数据查询,对于某些业务的需求支持变得越来越难,越来越复杂,也越来越难以举行职责划分。对着业务的发展,数据量越来越大之后,为了良好的业务支持,举行了分库分表,分库分表规则五花八门,一旦脱离了业务逻辑,很难确定某一条数据在哪个库哪个表。
基于这样的问题和情况,为了满意业务需求,很自然的就想到了使用大数据服务,将业务数据归集到一起,创建完整的数据仓库,便于数据的查询。
二、数据同步架构

为了追求简朴和通用,由于自身的认识现在,选择了最标准的大数据架构,即基于Hadoop的大数据体现。整个集群采用三节点,通过CDH举行集群的部署和维护。
https://img2024.cnblogs.com/blog/779703/202412/779703-20241205100849598-1576079178.png
整个数据链路为:
通过Azkaban调用Spark应用,将数据从RDS同步到Hive,运营平台和报表体系采用Presto加速访问Hive的数据。
三、数据同步具体过程

数据同步采用Spark任务来举行,将任务打包之后,上传到Azkaban调度平台,使用Azkaban举行定时调度,完成T+1级别的数据同步工作。
数据同步代码示例:
object MarketMysqlToHiveEtl extends SparkHivePartitionOverwriteApplication{


/**
   * 删除已存在的分区
   *
   * @param spark SparkSessions实例
   * @param date 日期
   * @param properties 数据库配置
   */
def delete_partition(spark: SparkSession, properties:Properties, date: String):Unit={
    val odsDatabaseName = properties.getProperty("hive.datasource.ods")
    DropPartitionTools
   .dropPartitionIfExists(spark,odsDatabaseName,"ods_t_money_record","ds",date)
    DropPartitionTools
   .dropPartitionIfExists(spark,odsDatabaseName,"ods_t_account","ds",date)
}



/**
   * 抽取数据
   * @param spark SparkSession实例
   * @param properties 数据库配置
   * @param date 日期
   */
def loadData(spark: SparkSession, properties:Properties, date: String): Unit ={
    // 删除历史数据,解决重复同步问题
    delete_partition(spark,properties,date)

    // 获取数据源配置
    val odsDatabaseName = properties.get("hive.datasource.ods")
    val dataSource = DataSourceUtils.getDataSourceProperties(FinalCode.MARKET_MYSQL_FILENAME,properties)

    var sql = s"select id,account_id,type,original_id,original_code,money,reason,user_type,user_id,organization_id," +
    s"create_time,update_time,detail,deleted,parent_id,counts,'${date}' AS ds from TABLENAME where date(update_time) ='${date}'"

    // 同步数据
    MysqlToHiveTools.readFromMysqlIncrement(spark,dataSource,sql.replace("TABLENAME","t_money_record"),
                                          s"${odsDatabaseName}.ods_t_money_record",SaveMode.Append,"ds")


    sql = s"select id,code,customer_code,name,mobile,type,organization_id,organization_name,create_time,update_time,deleted,status,customer_name," +
    s"customer_id,channel_type,nike_name,version,register_Time,'${date}' AS ds from TABLENAME where date(update_time) ='${date}'"
    MysqlToHiveTools.readFromMysqlIncrement(spark,dataSource,sql.replace("TABLENAME","t_account"),
                                          s"${odsDatabaseName}.ods_t_account",SaveMode.Append,"ds")
}



/**
   * 数据etl
   * @param spark SparkSession实例
   * @param SparkSession 数据库配置
   */
def etl(spark: SparkSession, properties:Properties): Unit = {
    val sparkConf = spark.sparkContext.getConf
    // 获取同步的日期
    var lastDate = sparkConf.get("spark.etl.last.day", DateUtils.getLastDayString)
    val dateList = newListBuffer()
    if(lastDate.isEmpty){
      // 未配置,设置为前一天
      lastDate = DateUtils.getLastDayString
    }
    if(lastDate.contains("~")){
      // 如果是时间段,获取时间段中的每一天,解析为时间list
      val dateArray = lastDate.split("~")
      DateUtils.findBetweenDates(dateArray(0), dateArray(1)).foreach(it => dateList.append(it))
    }else if(lastDate.contains(",")){
      // 如果是使用,分隔的多个日期,解析为时间list
      lastDate.split(",").foreach(it => dateList.append(it))
    }else{
      // 添加进时间列表
      dateList.append(lastDate)
    }
    // 循环同步每天的数据
    dateList.foreach(it =>loadData(spark, properties, it))
}


def main(args: Array): Unit = {
    job() {
      val sparkAndProperties = SparkUtils.get()
      val spark = sparkAndProperties.spark
      val properties = sparkAndProperties.properties
      // 调度任务
      etl(spark,properties)
    }
}
}删除Partition的代码示例:
object DropPartitionTools {


/**
   * 删除指定的Partition
   * @param SparkSession实例
   * @param database数据库名称
   * @param table表名称
   * @param partitionKey 分区字段的名称
   * @param partitionValue 具体的分区值
   */
def dropPartitionIfExists(spark: SparkSession, database: String, table: String, partitionKey: String, partitionValue:String): Unit ={

   val df = spark.sql(
       s"""
         | show tables in ${database} like '${table}'
         |""".stripMargin)

    if(df.count() > 0 ){
      // 表存在,删除分区
      spark.sql(
      s"""
         |ALTER TABLE${database}.${table} DROPIF EXISTSPARTITION (${partitionKey}='${partitionValue}')
         |""".stripMargin)
    }
}


/**
   * 删除Partition
   * @param SparkSession实例
   * @param database数据库名称
   * @param table表名称
   * @param partitionKey 分区字段的名称
   */
def dropHistoryPartitionIfExists(spark: SparkSession, database: String, table: String, partitionKey: String): Unit ={

    val df = spark.sql(
      s"""
         | show tables in ${database} like '${table}'
         |""".stripMargin)

    if(df.count() > 0 ){
      // 表存在,删除历史分区,获取8天前的日期
      val sevenDay = DateUtils.getSomeLastDayString(8);
      spark.sql(
      s"""
         |ALTER TABLE${database}.${table} DROPIF EXISTSPARTITION (${partitionKey} ='${sevenDay}')
         |""".stripMargin)
    }
}

}从RDS同步数据到HIVE的代码示例:
object MysqlToHiveTools {


/**
   * 从mysql抽取数据到hive -- 全量
   * @param spark spark实例
   * @param dataSource 数据库配置信息
   * @param tableName 抽取的数据库表名
   * @param destTableName 目标表名
   * @param mode 抽取的模式
   */
def mysqlToHiveTotal(spark: SparkSession, dataSource: JSONObject,tableName: String, destTableName:String,mode: SaveMode, partition: String): Unit = {
   val sql = "(select * from " + tableName + ") as t"
   mysqlToHive(spark, dataSource, sql, destTableName, mode, partition)
}


/**
   * 从mysql抽取数据到hive -- 增量量
   * @param spark spark实例
   * @param dataSource 数据库配置信息
   * @param sql 抽取数据的SQL
   * @param destTableName 目标表名
   * @param mode 抽取的模式
   */
def readFromMysqlIncrement(spark: SparkSession, dataSource: JSONObject,sql: String, destTableName:String,mode: SaveMode, partition: String): Unit = {
    mysqlToHive(spark, dataSource, sql, destTableName, mode, partition)
}


/**
   * 真正的抽取数据
   * @param spark spark实例
   * @param properties 数据库配置信息
   * @param sql 抽取数据的SQL
   * @param destTableName 目标表名
   * @param mode 抽取的模式
   */
def mysqlToHive(spark: SparkSession, dataSource: JSONObject,sql: String, destTableName:String, mode: SaveMode, partition: String):Unit={
    val df = spark.read.format("jdbc")
      .option("url",dataSource.getString("url"))
      .option("driver",dataSource.getString("driver"))
      .option("fetchSize", 10000)
      .option("numPartitions",2)
      .option("dbtable",s"(${sql}) AS t")
      .option("user",dataSource.getString("user"))
      .option("password",dataSource.getString("password"))
      .load()
    if(partition == null || partition.isEmpty){
      df.write.format("parquet").mode(mode).saveAsTable(destTableName)
    }else{
      df.write.format("parquet").mode(mode).partitionBy("ds").saveAsTable(destTableName)
    }
}
}Spark Application代码示例
trait SparkHivePartitionOverwriteApplication extends Logging{


def getProperties(): Properties ={
    val prop:Properties = new Properties()
    val inputStream = this.getClass.getClassLoader.getResourceAsStream("config.properties")
    prop.load(inputStream);
    prop
}

def job(appName: String = null,
          master: String = null)(biz: => Unit): Unit = {
    var spark: SparkSession = null
    System.setProperty("HADOOP_USER_NAME", "mapred")
    val prop:Properties = getProperties()
    if (null == appName) {
      spark = SparkSession.builder
      .config("spark.sql.parquet.writeLegacyFormat", true)
      .config("spark.sql.sources.partitionOverwriteMode","dynamic")
      .config("hive.exec.dynamic.partition.mode","nonstrict")
      .config("spark.sql.hive.convertMetastoreParquet",false)
      .enableHiveSupport
      .getOrCreate
      var sparkAndProperties = SparkAndProperties(spark, prop)
      SparkUtils.set(sparkAndProperties)
    } else {
      spark = SparkSession.builder.master(master).appName(appName)
      .config("spark.sql.parquet.writeLegacyFormat", true)
      .config("spark.sql.sources.partitionOverwriteMode","dynamic")
      .config("hive.exec.dynamic.partition.mode","nonstrict")
      .config("spark.sql.hive.convertMetastoreParquet",false)
      .config("spark.testing.memory","2147480000")
      .config("spark.driver.memory","2147480000")
      .enableHiveSupport.getOrCreate
      var sparkAndProperties = SparkAndProperties(spark, prop)
      SparkUtils.set(sparkAndProperties)
      SparkUtils.set(sparkAndProperties)
    }
    biz
    spark.stop()
    SparkUtils.remove()
}

}

case class SparkAndProperties(spark: SparkSession,
                              properties: Properties)四、配套生态


[*]自定义UDF函数
在使用的过程中,需要将表中的IP地址,解析为地点地的名称,这需要调用第三方的一个服务接口来完成,为了完成这个任务,定义了一个自定义UDF函数,举行解析。
a. 自定义UDF函数
object ParseIp{
    def evaluate(ip: String):String= {
      // 具体的IP解析服务
      SplitAddress.getPlaceFromIp(ip)
   }
}b. 使用自定义UDF函数
object TraceTmpEtl extends SparkHivePartitionOverwriteApplication{

/**
   * 数据同步任务
   * @param spark sparkSession实例
   * @param properties 数据库配置
   * @param date 日期
   */
def tmp_t_trace_user_visit_real_time_statistic(spark: SparkSession,properties:Properties,date: String):Unit ={
    // 获取数据库配置的数据库名称
    val odsDatabaseName = properties.get("hive.datasource.ods")
    val tmpDatabaseName = properties.get("hive.datasource.tmp")

    // 注册自定义的UDF函数
    spark.udf.register("parseIP", (ip: String) => SplitAddress.getPlaceFromIp(ip))
    // 在Spark SQL中使用UDF函数
    spark.sql(
      s"""
         |INSERT OVERWRITE TABLE ${tmpDatabaseName}.tmp_t_statistic partition(ds='${date}')
         |select
         |          `id` ,
         |          `create_time` ,
         |          `update_time` ,
         |          `ip` ,
         |      replace( replace( replace(replace( case when parseIP(ip) rlike '^中国' then replace(parseIP(ip),'中国','')
         |          when parseIP(ip) rlike '^内蒙古' then replace(parseIP(ip),'内蒙古','内蒙古自治区')
         |          when parseIP(ip) rlike '^广西' then replace(parseIP(ip),'广西','广西壮族自治区')
         |          when parseIP(ip) rlike '^西藏' then replace(parseIP(ip),'西藏','西藏自治区')
         |          when parseIP(ip) rlike '^宁夏' then replace(parseIP(ip),'宁夏','宁夏回族自治区')
         |          when parseIP(ip) rlike '^新疆' then replace(parseIP(ip),'新疆','新疆维吾尔自治区')
         |          when parseIP(ip) rlike '^香港' then replace(parseIP(ip),'香港','香港特别行政区')
         |          when parseIP(ip) rlike '^澳门' then replace(parseIP(ip),'澳门','澳门特别行政区')
         |   else parseIP(ip) end, "省", "省."),"市", "市."),"县", "县."),"区", "区.") as ip_place,
         |          `page_view`
         |from ${odsDatabaseName}.ods_t_statistic where ds ='${date}'
         |""".stripMargin)
}

/**
   * 数据etl
   * @param spark SparkSession实例
   * @param properties 数据库配置
   */
def etl(spark: SparkSession, properties:Properties): Unit = {
    val lastDate = DateUtils.getLastDayString
    tmp_t_trace_user_visit_real_time_statistic(spark,properties, lastDate)
}



def main(args: Array): Unit = {
    job() {
      val sparkAndProperties = SparkUtils.get()
      val spark = sparkAndProperties.spark
      val properties = sparkAndProperties.properties
      etl(spark,properties)
    }
}
}
[*]数据库的设置安全性问题
刚开始数据库设置同步设置文件直接写死,但是后续发现这样存在一些安全性的问题,后来采用将数据库相关的设置组合为一个JSON字符串,将其加密之后生存到MongoDB中,在使用时举行查询解密。
public class DataSourceUtils {

    privatestatic Logger logger = LoggerFactory.getLogger(DataSourceUtils.class);

    public static JSONObject getDataSourceProperties(String dataSourceKey,Properties properties){
      List<ServerAddress> adds = new ArrayList<>();
      try {
            String filePath = properties.getProperty("spark.mongo.properties.file.url");
            properties = new Properties();
            File file = new File(filePath);
            FileInputStream inputStream = null;
             inputStream = new FileInputStream(file);
            properties.load(inputStream);
      }catch (Exception e){
            logger.info("not load file, reason:" + e.getMessage());
            e.printStackTrace();
      }
      String mongoUrl = properties.getProperty("mongo_url");
      String mongoPort = properties.getProperty("mongo_port");
      String mongoDbName = properties.getProperty("mongo_dbName");
      String mongoCollect = properties.getProperty("mongo_collect");
      String mongoUser = properties.getProperty("mongo_user");
      String mongoPassword = properties.getProperty("mongo_password");
      String desKey = properties.getProperty("data_des_key");
      ServerAddress serverAddress = new ServerAddress(mongoUrl, Integer.parseInt(mongoPort));
      adds.add(serverAddress);
      List<MongoCredential> credentials = new ArrayList<>();
      MongoCredential mongoCredential = MongoCredential.createScramSha1Credential(mongoUser, mongoDbName, mongoPassword.toCharArray());
      credentials.add(mongoCredential);
      MongoClient mongoClient = new MongoClient(adds, credentials);
      MongoDatabase mongoDatabase = mongoClient.getDatabase(mongoDbName);
      MongoCollection<Document> collection = mongoDatabase.getCollection(mongoCollect);
      //指定查询过滤器
      Bson filter = Filters.eq("key", dataSourceKey);
      //指定查询过滤器查询
      FindIterable findIterable = collection.find(filter);
      //取出查询到的第一个文档
      Document document = (Document) findIterable.first();
      //打印输出
      String content = DESUtil.decrypt(desKey, document.getString("content"));
      return JSON.parseObject(content);
    }


    public staticProperties json2Properties(JSONObject jsonObject){
      String tmpKey = "";
      String tmpKeyPre = "";
      Properties properties = new Properties();
      j2p(jsonObject, tmpKey, tmpKeyPre, properties);
      return properties;
    }



    private static void j2p(JSONObject jsonObject, String tmpKey, String tmpKeyPre, Properties properties){
      for (String key : jsonObject.keySet()) {
            // 获得key
            String value = jsonObject.getString(key);
            try {
                JSONObject jsonStr = JSONObject.parseObject(value);
                tmpKeyPre = tmpKey;
                tmpKey += key + ".";
                j2p(jsonStr, tmpKey, tmpKeyPre, properties);
                tmpKey = tmpKeyPre;
            } catch (Exception e) {
                properties.put(tmpKey + key, value);
                System.out.println(tmpKey + key + "=" + value);
            }
      }
    }
    public static void main(String[] args) {

    }
}
[*]Spark任务脚本示例
#!/bin/sh

##### env ###########
export JAVA_HOME=/usr/java/jdk1.8.0_151
export SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark
export PATH=${JAVA_HOME}/bin:${SPARK_HOME}/bin:${PATH}
export SPARK_USER=hadoop
export HADOOP_USER_NAME=hadoop
LAST_DAY="$1"
echo LAST_DAY

spark-submit \
--class net.app315.bigdata.operatereport.ods.MarketMysqlToHiveEtl \
--conf spark.sql.hive.metastore.version=2.1.1 \
--conf spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH/lib/hive/lib/* \
--jars /opt/cloudera/parcels/CDH/lib/spark/jars/mysql-connector-java-5.1.48.jar,/opt/cloudera/parcels/CDH/lib/spark/jars/druid-1.1.10.jar \
--master yarn \
--deploy-mode cluster \
--executor-memory 4G \
--driver-memory 2G \
--num-executors 4 \
--executor-cores 2 \
--conf spark.dynamicAllocation.minExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=8 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
--conf spark.yarn.max.executor.failures=128 \
--conf spark.yarn.executor.failuresValidityInterval=1h \
--conf spark.task.maxFailures=4 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.scheduler.mode=FIFO \
--conf spark.network.timeout=420000 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.executor.heartbeatInterval=360000 \
--conf spark.sql.crossJoin.enabled=true \
--conf spark.mongo.properties.file.url=/opt/conf/mongo.properties \
--conf spark.etl.last.day="${LAST_DAY}" \
./target/spark-operate-report-project-1.0.jar
[*]Job任务脚本实例
nodes:

- name: bigdata_market_ods_etl
    type: command
    config:
      command: sh -x ./script/bigdata_market_ods_etl.sh "${spark.etl.last.day}"
      failure.emails: mxx@xxx.com

- name: bigdata_market_dim_etl
    type: command
    config:
      command: sh -x ./script/bigdata_market_dim_etl.sh "${spark.etl.last.day}"
      failure.emails: mxx@xxx.com
    dependsOn:
          - bigdata_market_ods_etl
         
- name: bigdata_market_dw_etl
    type: command
    config:
      command: sh -x ./script/bigdata_market_dw_etl.sh "${spark.etl.last.day}"
      failure.emails: mxx@xxx.com
    dependsOn:
          - bigdata_market_dim_etl
          - bigdata_user_dw_etl五、备注


[*]Davinci报表   一个开源的报表平台
第二代-基于DolphinScheduler的离线数据同步

一、背景

自从上次开始使用基于Hadoop的大数据体现方案之后,业务安稳发展,但是随着时间的推移,新的问题开始出现,重要出现的问题为两个:

[*]数据的变更越来越频繁,基于之前SparkSQL任务的方式,只要需要对表结构举行变更,就需要重新修改Scala代码,然后重新举行任务的打包,这对于一些不认识代码的人来说,不太友好,而且本钱也很高。
[*]虽然使用了Presto对HIVE的数据查询举行了加速,但是地点数据量越来越大,分析要求越来越复杂,即席查询越来越多,由于集群本身资源有限,查询能力出现了显著瓶颈。
二、数据同步架构

随着技术的发展已经对大数据的认识,接触到了更多的大数据相关的知识与组件,基于此,通过认真分析与思考之后,对数据的同步方案举行了如下的重新设计。

[*]数据存储与查询放弃了HDFS+HIVE+Presto的组合,转而采用现代化的MPP数据库StarRocks,StarRocks在数据查询的效率层面非常良好,在相同资源的情况下,可以解决现在遇到的数据查询瓶颈。
[*]数据同步放弃了SparkSQL,转而采用更加轻量级的DATAX来举行,其只需要通过简朴的设置,即可完成数据的同步,同时其也支持StarRocks Writer,开辟职员只需要具备简朴的SQL知识,就可以完成整个数据同步任务的设置,难度大大降低,效率大大提升,友好度大大提升。
[*]定时任务调度放弃Azkaban,采用现代化的任务调度工作Apache DolphinScheduler,通过可视化的页面举行调度任务工作流的设置,更加友好。
https://img2024.cnblogs.com/blog/779703/202412/779703-20241205100909138-797225882.png
三、数据同步的具体流程

数据同步在这种方式下变动非常简朴,只需要可视化的设置DataX任务,即可自动调度。下面的一个任务的设置示例
{
"job": {
    "setting": {
      "speed": {
      "channel":1
      }
    },
    "content": [
      {
      "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "",
            "password": "",
            "connection": [
            {
                "querySql": [
                  "SELECT CustomerId AS customer_id FROM base_info.base_customer where date(UpdateTime) > '${sdt}' and date(UpdateTime) < '${edt}'"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://IP:3306/base_info?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"
                ]
            }
            ]
          }
      },
      "writer": {
          "name": "starrockswriter",
          "parameter": {
            "username": "xxx",
            "password": "xxx",
            "database": "ods_cjm_test",
            "table": "ods_base_customer",
            "column": ["id"],
            "preSql": [],
            "postSql": [],
            "jdbcUrl": "jdbc:mysql://IP:9030/",
            "loadUrl": ["IP:8050", "IP:8050", "IP:8050"],
            "loadProps": {
            "format": "json",
            "strip_outer_array": true
            }
          }
      }
      }
      ]
    }
}数据同步过程中,遇到了另外一个问题,即业务存在大量的分库分表的,这些分库分表的逻辑五花八门,60张左右的逻辑板,经太过库分表之后达到了惊人的5000多张,为每张表设置任务很显然不太正常,这就需要能够在举行数据同步的时候动态生成需要的表列表,把表列表设置到DataX的设置文件中去。
颠末技术的调用,Apache DolphinScheduler的Python任务类型很适合做这个事情,由于公司本身使用了Apache DolphinScheduler3.0的版本,其Python任务还不支持返回数据到鄙俚节点,但是社区最新版本已经支持该能力,因为按照已实现版本对其举行改造。
改造之后,Python节点能够将数据通报给他的鄙俚节点,因此使用Python脚本查询获取需要举行同步的表列表,将其通报给DataX节点,完成动态表的数据同步
import pymysql
import datetime


def select_all_table(date: str):
    result_list = []
    sql = """
    SELECT concat('"', table_name, '"')
    FROM information_schema.`TABLES`
    WHERE table_schema='hydra_production_flow'
      and table_name like 't_package_flow_log_%'
      and table_name like '%_{}'
    """.format(date)
    conn = pymysql.connect(host='', port=3306, user='', passwd='',
                           db='information_schema')
    cur = conn.cursor()
    cur.execute(query=sql)
    while 1:
      res = cur.fetchone()
      if res is None:
            break
      result_list.append(res)
    cur.close()
    conn.close()
    return result_list


if __name__ == '__main__':
    # 获取当前年月
    # 获取当前日期
    today = datetime.date.today()
    # 计算前一天的日期
    yesterday = today - datetime.timedelta(days=1)
    current_date = yesterday.strftime("%Y_%m")
    table_list = select_all_table(current_date)
    table_str = ",".join(table_list)
    # 设置变量,传递给下游节点
    print('${setValue(table_list=%s)}' % table_str){
"job": {
    "setting": {
      "speed": {
      "channel":1
      }
    },
    "content": [
      {
      "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "xxx",
            "password": "xxxx",
            "column": [
            "id",
            "concat('t_package_flow_log_',DATE_FORMAT(create_time,'%Y_%m'))",
            "operation_type"
            ],
            "where": "date(create_time) ${operator_symbol} '${dt}'",
            "connection": [
            {
                "table": [
                  ${table_list}
                ],
                "jdbcUrl": [
                  "jdbc:mysql://xx:3306/hydra_production_flow?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"
                ]
            }
            ]
          }
      },
      "writer": {
                  "name": "starrockswriter",
                  "parameter": {
                        "username": "xxxxxx",
                        "password": "xxxxxxx",
                        "database": "ods_cjm",
                        "table": "ods_t_package_flow_log",
                        "column": ["id", "table_name","operation_type"],
                        "preSql": [],
                        "postSql": [],
                        "jdbcUrl": "jdbc:mysql://IP:9030/",
                        "loadUrl": ["IP:8050", "IP:8050", "IP:8050"],
                        "loadProps": {
                            "format": "json",
                            "strip_outer_array": true
                        }
                  }
                }
            }
      ]
    }
}四、踩坑记录


[*]DATAX只支持python2.x
下载支持python3.x的相关文件,替换DataX中的相同文件,即可支持python3.x使用
五、备注


[*]StarRocks    高性能的MPP数据库
[*]DataX离线数据同步
[*]Apache DolphinScheduler任务调度工具
第三代-基于Python自定义的离线数据同步

一、背景

自从采用Apache DolphinScheduler + StarRocks数据方案以来,统统都很安稳发展;但是随着时间的推移,总会出现新的问题。
随着数据量的增多,使用方需求的增长,已经一些其他因素的影响,对现在的数据同步架构带来了一些不小的挑战,这些问题导致任务的维护和更新越来越贫苦,需要耗费大量的时间来举行,急需一种新的方式来处理。

[*]由于等保的要求,线上RDS数据库不再支持通过公网访问,又因为StarRocks也在内网,这就导致了之前的数据同步链路彻底断裂,需要新的方案。
[*]由于数据结构的频繁变更、服务器资源导致的任务调度非常等等原因,需要重跑数据的需求越来越多,这就导致需要不停的修改任务的调度参数(如日期),现在已经上线了10个业务的调度任务,也就是重新同步一次,就需要依次修改调度这10个任务,这期间还需要专人举行状态的跟踪,纵然修改调度,压力很大。
二、数据同步架构

鉴于数据链路变更,导致原本数据链路断裂的问题,通过调研之后,决定采用KAFKA举行数据的中转,在内网部署KAFKA集群,同时该集群提供公网访问地址;在RDS地点的内网呆板上使用DataX将RDS数据通过公网地址写入KAFKA,在内网中通过KafkaConnector消耗数据写入StarRocks。
鉴于新的资源有限,原本内网提供了4台8C32G的服务器,但是新的RDS地点内网只能提供一台最大4C8G的服务器。因此放弃了使用Apache DolphinScheduler来举行调度,直接使用crontab调用对应的Python脚本举行DataX任务调度。
https://img2024.cnblogs.com/blog/779703/202412/779703-20241205100921285-1407264702.png
三、具体的数据同步

新的方案,重要解决的问题有两个,一是DataX怎样将数据写入KAFKA,二是Python脚本怎么解决前面遇到的修改复杂的问题。

[*]DataX写KAFKA
DataX本身并没有kafkawriter实现,这就需要我们本身实现一个KafkaWriter来支持我们的需求,同时为了数据安全,希望能够对数据举行加密。
DataX的KafkaWriter实现
public class KafkaWriter extends Writer {

    public static class Job extends Writer.Job {

      private static final Logger logger = LoggerFactory.getLogger(Job.class);
      private Configuration conf = null;

      @Override
      public List<Configuration> split(int mandatoryNumber) {
            List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
            for (int i = 0; i < mandatoryNumber; i++) {
                configurations.add(conf);
            }
            return configurations;
      }

      private void validateParameter() {
            this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE);
            this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
      }

      @Override
      public void init() {
            this.conf = super.getPluginJobConf();
            logger.info("kafka writer params:{}", conf.toJSON());
            this.validateParameter();
      }


      @Override
      public void destroy() {

      }
    }

    public static class Task extends Writer.Task {
      private static final Logger logger = LoggerFactory.getLogger(Task.class);
      private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");

      private Producer<String, String> producer;
      private String fieldDelimiter;
      private Configuration conf;
      private Properties props;
      private AesEncryption aesEncryption;
      private List<String> columns;

      @Override
      public void init() {
            this.conf = super.getPluginJobConf();
            fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
            columns = conf.getList(Key.COLUMN_LIST, new ArrayList<>(), String.class);

            props = new Properties();
            props.put("bootstrap.servers", conf.getString(Key.BOOTSTRAP_SERVERS));
            props.put("acks", conf.getUnnecessaryValue(Key.ACK, "0", null));//这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
            props.put("retries", conf.getUnnecessaryValue(Key.RETRIES, "5", null));
            props.put("retry.backoff.ms", "1000");
            props.put("batch.size", conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null));
            props.put("linger.ms", 100);
            props.put("connections.max.idle.ms", 300000);
            props.put("max.in.flight.requests.per.connection", 5);
            props.put("socket.keepalive.enable", true);
            props.put("key.serializer", conf.getUnnecessaryValue(Key.KEYSERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
            props.put("value.serializer", conf.getUnnecessaryValue(Key.VALUESERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
            producer = new KafkaProducer<String, String>(props);
            String encryptKey = conf.getUnnecessaryValue(Key.ENCRYPT_KEY, null, null);
            if(encryptKey != null){
                aesEncryption = new AesEncryption(encryptKey);
            }
      }

      @Override
      public void prepare() {
            AdminClient adminClient = AdminClient.create(props);
            ListTopicsResult topicsResult = adminClient.listTopics();
            String topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
            try {
                if (!topicsResult.names().get().contains(topic)) {
                  new NewTopic(
                            topic,
                            Integer.parseInt(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)),
                            Short.parseShort(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null))
                  );
                  List<NewTopic> newTopics = new ArrayList<NewTopic>();
                  adminClient.createTopics(newTopics);
                }
                adminClient.close();
            } catch (Exception e) {
                throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription());
            }
      }

      @Override
      public void startWrite(RecordReceiver lineReceiver) {
            logger.info("start to writer kafka");
            Record record = null;
            while ((record = lineReceiver.getFromReader()) != null) {
                if (conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null)
                        .equalsIgnoreCase(WriteType.TEXT.name())) {
                  producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
                            Md5Encrypt.md5Hexdigest(recordToString(record)),
                            aesEncryption ==null ? recordToString(record): JSONObject.toJSONString(aesEncryption.encrypt(recordToString(record))))
                  );
                } else if (conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null)
                        .equalsIgnoreCase(WriteType.JSON.name())) {
                  producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
                            Md5Encrypt.md5Hexdigest(recordToString(record)),
                            aesEncryption ==null ? recordToJsonString(record) : JSONObject.toJSONString(aesEncryption.encrypt(recordToJsonString(record))))
                  );
                }
                producer.flush();
            }
      }

      @Override
      public void destroy() {
            if (producer != null) {
                producer.close();
            }
      }

      /**
         * 数据格式化
         *
         * @param record
         * @return
         */
      private String recordToString(Record record) {
            int recordLength = record.getColumnNumber();
            if (0 == recordLength) {
                return NEWLINE_FLAG;
            }
            Column column;
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < recordLength; i++) {
                column = record.getColumn(i);
                sb.append(column.asString()).append(fieldDelimiter);
            }

            sb.setLength(sb.length() - 1);
            sb.append(NEWLINE_FLAG);
            return sb.toString();
      }

      /**
         * 数据格式化
         *
         * @param record 数据
         *
         */
      private String recordToJsonString(Record record) {
            int recordLength = record.getColumnNumber();
            if (0 == recordLength) {
                return "{}";
            }
            Map<String, Object> map = new HashMap<>();
            for (int i = 0; i < recordLength; i++) {
                String key = columns.get(i);
                Column column = record.getColumn(i);
                map.put(key, column.getRawData());
            }
            return JSONObject.toJSONString(map);
      }
    }
}举行数据加密的实现:
public class AesEncryption {

    private SecretKey secretKey;

    public AesEncryption(String secretKey) {
      byte[] keyBytes = Base64.getDecoder().decode(secretKey);
      this.secretKey = new SecretKeySpec(keyBytes, 0, keyBytes.length, "AES");
    }


    public String encrypt(String data) {
      try {
            Cipher cipher = Cipher.getInstance("AES");
            cipher.init(Cipher.ENCRYPT_MODE, secretKey);
            byte[] encryptedBytes = cipher.doFinal(data.getBytes());
            return Base64.getEncoder().encodeToString(encryptedBytes);
      } catch (Exception e) {
            throw new RuntimeException(e);
      }
    }

    public String decrypt(String encryptedData) throws Exception {
      Cipher cipher = Cipher.getInstance("AES");
      cipher.init(Cipher.DECRYPT_MODE, secretKey);
      byte[] decodedBytes = Base64.getDecoder().decode(encryptedData);
      byte[] decryptedBytes = cipher.doFinal(decodedBytes);
      return new String(decryptedBytes);
    }
}Kafka的公网设置
Kafka的表里网设置,只需要修改kafka/config下面的server.properties文件中的如下设置即可。
# 配置kafka的监听端口,同时监听9093和9092
listeners=INTERNAL://kafka节点3内网IP:9093,EXTERNAL://kafka节点3内网IP:9092

# 配置kafka的对外广播地址, 同时配置内网的9093和外网的19092
advertised.listeners=INTERNAL://kafka节点3内网IP:9093,EXTERNAL://公网IP:19092

# 配置地址协议
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT

# 指定broker内部通信的地址
inter.broker.listener.name=INTERNAL
[*]自定义的设置文件
Python脚本需要能够自动生成对应的DataX调度的设置文件和shell脚本,自动调度DataX举行任务的执行。因此颠末调研,采用自定义设置文件,通过读取设置文件,动态生成对应的DataX任务脚本和调度脚本,调度任务执行。
自定义的设置文件示例1:
{
"datasource": {
    "host": "xxxxxx",
    "port": "3306",
    "username": "xxxxx",
    "password": "xxxxxxx",
    "properties": {
      "characterEncoding": "utf-8",
      "useSSL": "false",
      "tinyInt1isBit": "false"
    }
},
"table": {
    "database": "app",
    "table": "device",
    "column": [
      "Id AS id",
      "CompanyName AS company_name",
      "CompanyId AS company_id",
      "SecretKey AS secret_key",
      "Brand AS brand",
      "ModelType AS model_type",
      "Enable AS enable",
      "CAST(CreateTime as CHAR) AS create_time",
      "CAST(UpdateTime as CHAR) AS update_time"
    ],
    "where": "date(UpdateTime) >= '$'",
    "searchTableSql": []
},
"kafka": {
    "topic": "mzt_ods_cjm.ods_device"
}
}支持分库分表的设置文件示例2
{
"datasource": {
    "host": "xxxxxxx",
    "port": "3306",
    "username": "xxxxxxx",
    "password": "xxxxxxxx",
    "properties": {
      "characterEncoding": "utf-8",
      "useSSL": "false",
      "tinyInt1isBit": "false"
    }
},
"table": {
    "database": "hydra_logistics_flow",
    "table": "",
    "column": [
      "id",
      "concat('t_logistics_sweep_out_code_flow_',DATE_FORMAT(create_time,'%Y')) AS table_name",
      "cus_org_id",
      "CAST(create_time as CHAR) AS create_time",
      "replace_product_id",
      "replace_product_name",
      "replace_product_code"
    ],
    "where": "date(create_time) >= '$'",
    "searchTableSql": [
      "SELECT concat('t_logistics_sweep_out_code_flow_',YEAR(SUBDATE(CURDATE(), 1))) AS TABLE_NAME",
      "SELECT concat('t_logistics_sweep_out_code_flow_',YEAR(DATE_SUB(DATE_SUB(CURDATE(), INTERVAL 1 DAY), INTERVAL 1 YEAR))) AS TABLE_NAME"
    ]
},
"kafka": {
    "topic": "mzt_ods_cjm.ods_t_logistics_sweep_out_code_flow"
}
}如上的设置文件,表明如下:
KEY说明datasourceRDS数据源datasource.hostRDS数据库的hostdatasource.port>RDS数据库的端口datasource.usernameRDS数据库的用户名datasource.passwordRDS数据库的密码datasource.propertiesjdbc连接的参数,连接时拼接为?key=value&key=valuetable要同步的表信息table.databaseRDS数据库名称table.tableRDS中表的名称,分库分表的可以为空table.columnRDS表中要同步的字段列表,支持取别名和使用函数table.where同步数据的过滤条件table.searchTableSql查询表名称的SQL语句,用于动态分库分表kafkakafka相关的设置kafka.topic数据要写入的kafka topic的名称
[*]Python调度脚本
import json
import os
import pymysql
import re
from datetime import datetime
from dateutil.relativedelta import relativedelta
import uuid
import subprocess
import logging
import hmac
import hashlib
import base64
import urllib.parse
import urllib
import requests
import time
from typing import List, Mapping


def list_files_in_directory(directory_path: str) -> List:
    """
    获取目录下的所有以.json结尾的文件
    :param directory_path: 目录
    :return: 文件列表
    """
    entries = os.listdir(directory_path)
    # 过滤出所有文件
    files = [entry for entry in entries if
             os.path.isfile(os.path.join(directory_path, entry)) and entry.endswith(".json")]
    logging.info(f"读取配置文件数量:{len(files)}")
    return files


def read_file_content(file_path: str) -> str:
    """
    读取文件内容
    :param file_path: 文件路径
    :return: 文件内容
    """
    with open(file_path, 'r', encoding='utf-8') as file:
      content = file.read()
    return content


def read_all_files_in_directory(directory_path: str) -> Mapping:
    """
    读取文件夹下面的所有文件的内容
    :param directory_path: 文件夹路径
    :return: 内容map
    """
    logging.info(f"开始读取所有的配置文件信息")
    files = list_files_in_directory(directory_path)
    file_contents = {}
    for file in files:
      file_path = os.path.join(directory_path, file)
      content = read_file_content(file_path)
      file_contents = content
    sorted_items = sorted(file_contents.items())
    sorted_dict = dict(sorted_items)
    return file_contents


def search_table_list(datasource: json, search_table_sql_list: List) -> List:
    """
    执行语句获取表信息
    :param datasource: 数据源信息
    :param search_table_sql_list: 查询表的SQL语句
    :return: 表列表
    """
    logging.info(f"开始查询需要同步的表")
    host = datasource['host']
    port = int(datasource['port'])
    username = datasource['username']
    password = datasource['password']
    conn = pymysql.connect(host=host,
                           port=port,
                           user=username,
                           passwd=password,
                           db='',
                           charset='utf8',
                           connect_timeout=200,
                           autocommit=True,
                           read_timeout=2000
                        )
    table_name_list = []
    for search_table_sql in search_table_sql_list:
      search_table_sql = parse_where_sql(search_table_sql)
      with conn.cursor() as cursor:
            cursor.execute(query=search_table_sql)
            while 1:
                res = cursor.fetchone()
                if res is None:
                  break
                table_name_list.append(res)
    return table_name_list


def general_default_job_config() -> json:
    """
    生成默认的datax配置
    :return: 默认的配置
    """
    default_job_json = """
    {
    "job": {
      "setting": {
            "speed": {
               "channel":1
            }
      },
      "content": [
            {
                "reader": {
                  "name": "mysqlreader",
                  "parameter": {
                        "username": "test",
                        "password": "test1234",
                        "connection": [
                            {
                              "querySql": [
                                    "SELECT id, code from test.t_open_api_classify"
                              ],
                              "jdbcUrl": [
                                    "jdbc:mysql://IP:3306/test?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"
                              ]
                            }
                        ]
                  }
                },
               "writer": {
                  "name": "kafkawriter",
                  "parameter": {
                        "bootstrapServers": "IP:9092,IP:9092,IP:9092",
                        "topic": "test-m-t-k",
                        "ack": "all",
                        "batchSize": 1000,
                        "retries": 0,
                        "keySerializer": "org.apache.kafka.common.serialization.StringSerializer",
                        "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer",
                        "fieldDelimiter": ",",
                        "writeType": "json",
                        "topicNumPartition": 1,
                        "topicReplicationFactor": 1,
                        "encryptionKey": "5s8FGjerddfWkG/b64CGHHZYvQ=="
                  }
                }
            }
      ]
    }
}
    """
    return json.loads(default_job_json, encoding='utf-8')


def general_jdbc_url(json_config: json) -> str:
    """
    根据数据源信息生成jdbc url
    :param json_config: 配置
    :return: jdbc url
    """
    logging.info(f"开始解析jdbc url")
    host = json_config['datasource']['host']
    port = int(json_config['datasource']['port'])
    database = json_config['table']['database']
    url = "jdbc:mysql://{}:{}/{}".format(host, port, database)
    # 解下properties
    properties = json_config['datasource']['properties']
    properties_list = []
    if properties is not None and len(properties) > 0:
      for key, value in properties.items():
            properties_list.append(key + "=" + str(value))
      url = url + "?" + "&".join(properties_list)
    logging.info(f"jdbc url: {url}")
    return url


def parse_where_sql(where_sql: str) -> str:
    """
    解析where语句
    :param where_sql: 原始where语句
    :return: 转换之后的where语句
    """
    # 定义支持的类型 $$
    # 正则表达式模式
    logging.info(f"还是解析where语句:where_sql: {where_sql}")
    pattern = r"\$\[.*?\]"
    return re.sub(pattern, replacement_function, where_sql)


def replacement_function(match):
    """
    替换函数
    :param match: 匹配结果
    :return: 替换之后的结果
    """
    matched_text = match.group(0)
    return calc_datetime(matched_text)


def calc_datetime(expression: str) -> str:
    """
    计算时间表达式
    :param expression: 表达式
    :return: 计算之后的值
    """
    logging.info(f"开始计算时间参数:expression: {expression}")
    # 设置映射
    format_units = {
      "yyyy": "%Y",
      "MM": "%m",
      "dd": "%d",
      "HH": "%H",
      "mm": "%M",
      "ss": "%S"
    }

    unit_map = {
      "Y": "yyyy",
      "M": "MM",
      "d": "dd",
      "H": "HH",
      "m": "mm",
      "s": "ss"
    }
    # 解析参数
    expression = expression
    # 判断其开头,截取尾部
    min_unit = None
    for key, value in format_units.items():
      if key in expression:
            min_unit = key
            expression = expression.replace(key, value)

    # 替换完毕,确定是否有数字
    logging.info(f"转换为Python格式的表达式:expression: {expression}")
    # 定义正则表达式模式
    pattern = r'([^0-9]+)([-+]\d+(\*\d+)?)(?:_())?'
    matches = re.match(pattern, expression)
    # 输出拆分结果
    if matches:
      date_part = matches.group(1)
      remainder = matches.group(2)
      unit = matches.group(4)
      if unit is not None and unit in unit_map.keys():
            min_unit = unit_map
      return calculate_expression(min_unit, date_part, remainder)
    else:
      return expression


def calculate_expression(min_unit: str, date_part: str, remainder: str) -> str:
    """
    计算表达式
    :param min_unit: 最小单位
    :param date_part: 日期表达式部分
    :param remainder: 偏移量部分
    :return: 计算之后的结果
    """
    logging.info(f"开始计算表达式:min_unit: {min_unit}, date_part: {date_part}, remainder:{remainder}")
    # 获取当前日期和时间
    now = datetime.now()
    # 计算时间的偏移量
    if remainder is None:
      # 格式化的日期
      formatted_datetime = now.strftime(date_part)
      logging.info(f"日期偏移量为空,返回值:{formatted_datetime}")
      return formatted_datetime
    else:
      # 计算偏移量
      plus_or_sub = remainder
      offset = eval(remainder)
      logging.info(f"计算偏移量,plus_or_sub:{plus_or_sub}, offset:{offset}")
      if min_unit == 'yyyy':
            if plus_or_sub == '-':
                now = now - relativedelta(years=offset)
            else:
                now = now + relativedelta(years=offset)
      elif min_unit == 'MM':
            if plus_or_sub == '-':
                now = now - relativedelta(months=offset)
            else:
                now = now + relativedelta(months=offset)
      elif min_unit == 'dd':
            if plus_or_sub == '-':
                now = now - relativedelta(days=offset)
            else:
                now = now + relativedelta(days=offset)
      elif min_unit == 'HH':
            if plus_or_sub == '-':
                now = now - relativedelta(hours=offset)
            else:
                now = now + relativedelta(hours=offset)
      elif min_unit == 'mm':
            if plus_or_sub == '-':
                now = now - relativedelta(minutes=offset)
            else:
                now = now + relativedelta(minutes=offset)
      elif min_unit == 'ss':
            if plus_or_sub == '-':
                now = now - relativedelta(seconds=offset)
            else:
                now = now + relativedelta(seconds=offset)
      formatted_datetime = now.strftime(date_part)
      logging.info(f"日期偏移量为空,返回值:{formatted_datetime}")
      return formatted_datetime


def general_reader(json_config: json) -> json:
    """
    生成配置的reader部分
    :param json_config: 配置
    :return: JSON结果
    """
    logging.info(f"开始生成DataX的配置JSON文件的reader内容")
    reader_json = json.loads("{}", encoding='utf-8')
    reader_json['name'] = "mysqlreader"
    reader_json['parameter'] = {}
    reader_json['parameter']['username'] = json_config['datasource']['username']
    reader_json['parameter']['password'] = json_config['datasource']['password']
    reader_json['parameter']['column'] = json_config['table']['column']
    reader_json['parameter']['connection'] = [{}]
    reader_json['parameter']['connection']['table'] = json_config['table']['table']
    reader_json['parameter']['connection']['jdbcUrl'] =
    where_sql = json_config['table']['where']
    if where_sql is not None and where_sql != '':
      reader_json['parameter']['where'] = parse_where_sql(where_sql)
    return reader_json


def general_writer(json_config: json) -> json:
    """
    生成配置的Writer部分
    :param json_config: 配置
    :return: JSON结果
    """
    columns = json_config['table']['column']
    new_columns = []
    for column in columns:
      column = str(column).replace("`", "")
      if " AS " in str(column).upper():
            new_columns.append(str(column).split(" AS ").strip())
      else:
            new_columns.append(str(column).strip())
    logging.info(f"开始生成DataX的配置JSON文件的Writer内容")
    writer_json = json.loads("{}", encoding='utf-8')
    writer_json['name'] = "kafkawriter"
    writer_json['parameter'] = {}
    writer_json['parameter']['bootstrapServers'] = "IP:19092,IP:19093,IP:19094"
    writer_json['parameter']['topic'] = json_config['kafka']['topic']
    writer_json['parameter']['ack'] = "all"
    writer_json['parameter']['batchSize'] = 1000
    writer_json['parameter']['retries'] = 3
    writer_json['parameter']['keySerializer'] = "org.apache.kafka.common.serialization.StringSerializer"
    writer_json['parameter']['valueSerializer'] = "org.apache.kafka.common.serialization.StringSerializer"
    writer_json['parameter']['fieldDelimiter'] = ","
    writer_json['parameter']['writeType'] = "json"
    writer_json['parameter']['topicNumPartition'] = 1
    writer_json['parameter']['topicReplicationFactor'] = 1
    writer_json['parameter']['encryptionKey'] = "5s8FGjerddfWkG/b64CGHHZYvQ=="
    writer_json['parameter']['column'] = new_columns
    return writer_json


def general_datax_job_config(datax_config: str):
    """
    生成job的配置内容
    :param datax_config: 配置
    :return: 完整的JSON内容
    """
    logging.info(f"开始生成DataX的配置JSON文件内容, {datax_config}")
    json_config = json.loads(datax_config, encoding='utf-8')
    # 判定是否需要查询表
    datasource = json_config['datasource']
    table = json_config['table']['table']
    search_table_sql_list = json_config['table']['searchTableSql']
    if search_table_sql_list is not None and len(search_table_sql_list) > 0:
      # 查询表列表,覆盖原来的配置信息
      table_list = search_table_list(datasource, search_table_sql_list)
    else:
      table_list =
    json_config['table']['table'] = table_list

    # 开始生成配置文件
    job_json = general_default_job_config()
    job_json['job']['content']['reader'] = general_reader(json_config)
    job_json['job']['content']['writer'] = general_writer(json_config)
    return job_json


def write_job_file(base_path: str, job_config: json) -> str:
    """
    生成job的JSON配置文件
    :param base_path: 根路径
    :param job_config: 配置信息
    :return: 完整的JSON文件路径
    """
    # 生成一个脚本
    logging.info(f"开始创建DataX的配置JSON文件")
    date_day = datetime.now().strftime('%Y-%m-%d')
    timestamp_milliseconds = int(datetime.now().timestamp() * 1000)
    # 生成UUID
    file_name = str(uuid.uuid4()).replace("-", "") + "_" + str(timestamp_milliseconds) + ".json"
    # 完整文件路径
    # 创建文件夹
    mkdir_if_not_exist(base_path + "/task/datax/json/" + date_day)
    complex_file_path = base_path + "/task/datax/json/" + date_day + "/" + file_name
    logging.info(f"完整的DataX的配置JSON文件路径:{complex_file_path}")
    with open(complex_file_path, 'w+', encoding='utf-8') as f:
      f.write(json.dumps(job_config, ensure_ascii=False))
    return complex_file_path


def mkdir_if_not_exist(path):
    """
    创建目录
    :param path: 目录路径
    :return: None
    """
    os.makedirs(path, exist_ok=True)


def write_task_file(base_path: str, python_path: str, datax_path: str, job_file_path: str) -> str:
    """
    写shell脚本文件
    :param base_path: 跟路径
    :param python_path: python执行文件路径
    :param datax_path: datax执行文件路径
    :param job_file_path: JSON配置文件路径
    :return: shell脚本的完整路径
    """
    # 组合内容
    logging.info(f"开始创建Shell脚本文件")
    task_content = python_path + " " + datax_path + " " + job_file_path
    # 生成一个脚本
    date_day = datetime.now().strftime('%Y-%m-%d')
    timestamp_milliseconds = int(datetime.now().timestamp() * 1000)
    # 生成UUID
    task_file_name = str(uuid.uuid4()).replace("-", "") + "_" + str(timestamp_milliseconds) + ".sh"
    # 完整文件路径
    # 创建文件夹
    mkdir_if_not_exist(base_path + "/task/datax/shell/" + date_day)
    complex_file_path = base_path + "/task/datax/shell/" + date_day + "/" + task_file_name
    logging.info(f"完整的shell脚本路径: {complex_file_path}")
    with open(complex_file_path, 'w+', encoding='utf-8') as f:
      f.write(task_content)
    # 添加执行权限
    current_permissions = os.stat(complex_file_path).st_mode
    # 添加执行权限 (权限值 0o111 表示用户、组和其他人的执行权限)
    new_permissions = current_permissions | 0o111
    # 使用 os.chmod 设置新的权限
    os.chmod(complex_file_path, new_permissions)
    return complex_file_path


def signs(dd_secret: str, timestamp: str) -> str:
    """
    钉钉机器人签名
    :param dd_secret: 秘钥
    :param timestamp: 时间戳
    :return: 签名
    """
    secret_enc = dd_secret.encode('utf-8')
    string_to_sign = '{}\n{}'.format(timestamp, dd_secret)
    string_to_sign_enc = string_to_sign.encode('utf-8')
    hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
    sign = urllib.parse.quote(base64.b64encode(hmac_code))
    return sign


def real_send_msg(dd_secret: str, dd_access_token: str, text: json):
    """
    发送钉钉机器人消息
    :param dd_secret: 秘钥
    :param dd_access_token: token
    :param text: 内容
    :return: None
    """
    timestamp = str(round(time.time() * 1000))
    sign = signs(dd_secret, timestamp)
    headers = {'Content-Type': 'application/json'}
    web_hook = f'https://oapi.dingtalk.com/robot/send?access_token={dd_access_token}&timestamp={timestamp}&sign={sign}'
    # 定义要发送的数据
    requests.post(web_hook, data=json.dumps(text), headers=headers)


def send_msg(dd_secret: str, dd_access_token: str, job_start_time: str, total_count: int, success_count: int, fail_task_list: List):
    """
    组合钉钉消息
    :param dd_secret: 秘钥
    :param dd_access_token: token
    :param job_start_time: 任务开始时间
    :param total_count: 总任务数
    :param success_count: 成功任务数
    :return: NONE
    """
    title = '### <font color=#CCCC00>数据同步结果</font>'
    if success_count == total_count:
      title = '### <font color=#00FF00>数据同步结果</font>'
    elif success_count == 0:
      title = '### <font color=#FF0000>数据同步结果</font>'

    end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    result = {
      "msgtype": "markdown",
      "markdown": {
            "title": "数据同步结果",
            "text": title + ' \n\n\n\n- '
                  + "总同步任务数:" + str(total_count) + "\n\n- "
                  + "成功任务数:" + str(success_count) + "\n\n- "
                  + "失败任务数" + str(total_count - success_count) + "\n\n- "
                  + "开始时间:" + str(job_start_time) + "\n\n- "
                  + "结束时间:" + str(end_time) + "\n\n- "
                  + "失败列表:" + str(fail_task_list) + "\n\n "
      }
    }
    if success_count < total_count:
      result['markdown']['at'] = json.loads("{\"atMobiles\": [\"12345678997\"]}")
    real_send_msg(dd_secret, dd_access_token, result)


def run_job(dd_secret, dd_access_token, job_start_time, base_path: str, python_script_path: str, datax_json_path: str):
    """
    运行任务
    :param dd_secret: 秘钥
    :param dd_access_token: token
    :param job_start_time: 任务开始时间
    :param base_path: 根路径
    :param python_script_path: Python执行路径
    :param datax_json_path: datax执行路径
    :return: NONE
    """
    task_content_list = read_all_files_in_directory(base_path + "/task/config/")
    success_count = 0
    total_count = len(task_content_list)
    fail_task_list = []
    for task_content in task_content_list:
      try:
            logging.info(f"开始生成,配置文件名称:{task_content}")
            job_config = general_datax_job_config(task_content_list)
            job_file_path = write_job_file(base_path, job_config)
            shell_path = write_task_file(base_path, python_script_path, datax_json_path, job_file_path)
            logging.info(f"shell脚本创建成功,路径为:{base_path}")
            # 调用脚本
            call_shell(shell_path)
            success_count += 1
      except Exception as e:
            fail_task_list.append(task_content)
            logging.error(f"配置文件:{task_content} 执行失败", e)
    # 发送消息
    send_msg(dd_secret, dd_access_token, job_start_time, total_count, success_count, fail_task_list)


def call_shell(shell_path: str):
    """
    执行shell脚本
    :param shell_path: shell脚本路径
    :return: NONE
    """
    logging.info(f"调用shell脚本,路径为:{shell_path}")
    result = subprocess.run(shell_path,
                            check=True,
                            shell=True,
                            universal_newlines=True,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)

    # 输出标准输出
    logging.info(f"shell脚本{shell_path}标准输出:%s", result.stdout)
    # # 输出标准错误输出
    logging.info(f"shell脚本{shell_path}标准错误输出:%s", result.stderr)
    # # 输出返回码
    logging.info(f"shell脚本{shell_path}的返回码:%s", result.returncode)


if __name__ == '__main__':
    """
    码中台数据同步任务脚本
    使用前请修改如下配置信息:
      - secret钉钉机器人的秘钥
      - access_token钉钉机器人的token
      - python_path   Python的安装路径
      - datax_path   datax的执行文件路径
    """
    # 钉钉配置
    start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    secret = ''
    access_token = ''
    python_path = "/usr/bin/python3"
    datax_path = "/opt/datax-k/bin/datax.py"
    # 当前脚本文件的目录路径
    script_dir = '/opt/data-job'
    curr_date_day = datetime.now().strftime('%Y-%m-%d')
    # 创建文件夹
    mkdir_if_not_exist(script_dir + "/logs/" + curr_date_day)
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s - %(levelname)s - %(lineno)d - %(message)s',
                        filename='logs/' + curr_date_day + '/app.log',
                        filemode='w')
    run_job(secret, access_token, start_time, script_dir, python_path, datax_path)
    logging.shutdown()
[*]同步日期的控制
我们在之前的任务同步中,遇到的问题便是日期的修改很贫苦,因此我们需要一个更加简朴的方式来举行日期的批量更新。在我们上面的调度脚本中,包含了对日期表达式的解析,我们自定义了一种时间的表达式$ 通过解析该表达式,我们可以生成需要的任意时间,该时间表达式的含义为:

[*]yyyy 表示年份
[*]MM 表示月份
[*]dd 表示日期
[*]HH表示24进制小时
[*]mm表示分钟
[*]ss表示秒
[*]

[*]表示当前时间加上N

[*]

[*]表示当前时间减去N

[*]_Y表示加减的单位,可以是YMdHms(年、月、日、时、分、秒)
通过对该表达式的解析,我们可以生成相对于当前之前或之后的任何格式的时间字符串,将其用于同步的where条件中,既可以完成针对时间的解析。

[*]怎样更新日期
日期现在可以盘算,但是我们需要能够批量修改设置文件中的WHERE条件中的时间表达式,如我们想同步8天前的数据,我们就需要将脚本中的表达式修改为$ ,即代表当前时间减去8天,这样我们就可以同步八天前那一天的数据,但是我们可能想同步从8天气到现在的所有数据,那么我们希望我们也能批量修改where表达式中的条件,如将=改为>=。
鉴于以上的需求,我们开辟了一个新的Python脚本,通过简朴的设置,即可一次修改所有脚本中的where条件中的表达式,这样,我们只需要执行两个脚本,就完成了统统,再也不需要依次修改执行10个工作流了。
import jsonimport osimport loggingfrom typing import List, Mappingimport refrom datetime import datetime, datedef list_files_in_directory(directory_path: str) -> List:    """    获取目录下的所有以.json结尾的文件    :param directory_path: 目录    :return: 文件列表    """    entries = os.listdir(directory_path)    # 过滤出所有文件    files =     logging.info(f"读取设置文件数量:{len(files)}")    return filesdef read_file_content(file_path: str) -> str:    """    读取文件内容    :param file_path: 文件路径    :return: 文件内容    """    with open(file_path, 'r', encoding='utf-8') as file:      content = file.read()    return contentdef read_all_files_in_directory(directory_path: str) -> Mapping:    """    读取文件夹下面的所有文件的内容    :param directory_path: 文件夹路径    :return: 内容map    """    logging.info(f"开始读取所有的设置文件信息")    files = list_files_in_directory(directory_path)    file_contents = {}    for file in files:      file_path = os.path.join(directory_path, file)      content = read_file_content(file_path)      file_contents = content    sorted_items = sorted(file_contents.items())    sorted_dict = dict(sorted_items)    return file_contentsdef parse_where_sql(where_sql: str, sub_day: int, comparator: str = None) -> str:    """    解析where语句    :param where_sql: 原始where语句    :param sub_day: 天数    :param comparator: 比较符包罗 = != > < >=
页: [1]
查看完整版本: 离线数据同步变迁