立聪堂德州十三局店 发表于 2024-9-12 01:39:37

不同数据库进行同步和增量数据(SQL server 与MySQL数据库为例)

场景

最近在做的一个项目需要将远程服务器的SQL server数据库中表的数据传输到本机的MySQL数据库中,而且远程的SQL server数据库表的数据会实时进行更新,而且差不多是一分钟内传输18条数据,例如现在是2023-12-4 15:09,在15:08这个时间内有18条数据需要首先辈SQL server数据库,再更新到MySQL数据库中,这种场景如果每分钟都能将18条数据放入SQL server数据库的话就非常简单了,但是在15:08的时候,这18条数据大概只来11条,剩下的7条大概在15:09或背面的时间陆续过来。我开始的想法是通过末了更新的时间的时间戳来查询新来的数据然后更新到MySQL中,但是由于在最终的时间内还会来前面时间的数据,这样会导致前面时间的数据丢失,所以我想了另外一方法。

[*]首先利用python写一个步调来同步SQL sever的历史数据到MySQL数据库中
[*]在SQL server中创建一个中间表。
[*]在SQL server中要传输的表中创建一个触发器,当这个表更新数据则触发将更新的数据放入到中间表中
[*]在python脚本中写一个循环来定期查抄中间表,我的SQL server表中由两个主键界说一条数据,所以中间表也是由两个字段界说一条数据,由于入库历史数据的数据量非常大,有几十万条,在这个入库历史数据的时间段内更新了许多条数据,所以大概中间表的数据与入库到MySQL中的字段有重复,所以我需要先验证中间表中的数据MySQL是否存在。
[*]存在则删除中间表中这条数据
[*]不存在则插入MySQL后删除这条数据

[*]末了完成了入库步调,经过验证没有数据丢失
1.历史数据入库

历史数据入库我利用的python写的,首先界说两个数据库的信息
# 使用示例
sql_server_conn_params = {
    'driver': '{SQL Server}',
    'server': 'ip',
    'database': '数据库名',
    'uid': 'jzyg',
    'pwd': ''
}

mysql_conn_params = {
    'host': 'localhost',
    'user': 'root',
    'password': '123456',
    'database': '数据库名',
    'charset': 'utf8mb4'
}
界说查询语句
querySolar = 'SELECT dtime,stationID,staionName,electric,tiltSolar,levelSolar,scatterSolar,directSolar,tiltSolar_day,levelSolar_day,scatterSolar_day,directSolar_day,sunShine_day FROM realData_Solar'
界说入库历史数据函数
    def transfer_wind_data(self):
      # 连接到 SQL Server
      with pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:
            with sql_server_conn.cursor() as sql_server_cursor:
                # 修改查询,仅选择上次同步后的数据
                modified_query = self.queryWind + " WHERE dtime >= ? ORDER BY dtime"
                sql_server_cursor.execute(modified_query, (self.wind_last_dtime,))
                rows = sql_server_cursor.fetchall()
      if not rows:
            return# 没有新数据
      # 连接到 MySQL
      with pymysql.connect(**self.mysql_conn_params) as mysql_conn:
            with mysql_conn.cursor() as mysql_cursor:
                data_list = []
                for row in rows:
                  observe_time = row.strftime("%Y-%m-%d %H:%M:00")
                  fsz_id = row.replace('"', "").strip()
                  station_name = row
                  farmName = station_name.split("-")
                  # 根据电站名查询电站号
                  sql = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"
                  mysql_cursor.execute(sql, ('%' + farmName + '%',))
                  result = mysql_cursor.fetchone()
                  farm_id = None
                  if result is not None:
                        farm_id = result
                  # 处理查询结果为空的情况
                  if farm_id is not None:
                        farm_id = farm_id
                  staion_name =row
                  wind_direction_instant = row
                  wind_speed_instant = row
                  wind_speed_two_min = row
                  wind_speed_ten_min = row
                  data = (observe_time,fsz_id,staion_name,farm_id,wind_direction_instant,
                            wind_speed_instant,wind_speed_two_min,wind_speed_ten_min)
                  data_list.append(data)
                  self.wind_last_dtime = row.strftime("%Y-%m-%d %H:%M:%S")
                if data_list:
                  result = mysql_cursor.executemany('INSERT INTO wind_monitor'
                                                      '(observe_time,fsz_id,station_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min) '
                                                      'VALUES (%s,%s,%s,%s,%s,%s,%s,%s)', data_list)# 根据你的表结构修改
                  mysql_conn.commit()
                  print(f'wind_monitor表插入了{result}行数据.' if result else '没有新数据插入。')
2.创建中间表和触发器

创建中间表
CREATE TABLE intermediateData_Wind AS SELECT * FROM realData_Wind WHERE 1=0;
创建触发器
CREATE TRIGGER CopyToIntermediateTable
ON realData_Wind
AFTER INSERT
AS
BEGIN
    -- 插入操作
    INSERT INTO intermediateData_Wind (dtime, stationID, staionName, windDirectionInstant, windSpeedInstant, windSpeed2min, windSpeed10min)
    SELECT dtime, stationID, staionName, windDirectionInstant, windSpeedInstant, windSpeed2min, windSpeed10min
    FROM inserted;
END;
3.创建轮询中间表代码

def transfer_insert_intermediateData_Wind(self):
    # 连接到 SQL Server
    with pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:
      with sql_server_conn.cursor() as sql_server_cursor:
            # 查询中间表中所有数据
            sql_server_cursor.execute(self.queryIntermediateData)
            intermediate_rows = sql_server_cursor.fetchall()

    # 用于跟踪删除和插入的数量
    deleted_count = 0
    inserted_count = 0

    # 连接到 MySQL
    with pymysql.connect(**self.mysql_conn_params) as mysql_conn:
      with mysql_conn.cursor() as mysql_cursor:
            for row in intermediate_rows:
                observe_time = row.strftime("%Y-%m-%d %H:%M:00")
                fsz_id = row.replace('"', "").strip()

                # 检查wind_monitor表中是否存在相同数据
                check_query = "SELECT COUNT(*) FROM wind_monitor WHERE observe_time = %s AND fsz_id = %s"
                mysql_cursor.execute(check_query, (observe_time, fsz_id))
                count = mysql_cursor.fetchone()
                dtime = row.strftime("%Y-%m-%d %H:%M:00.000")
                if count > 0:

                  # 数据存在,从中间表删除
                  delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"
                  sql_server_cursor.execute(delete_query, (dtime, row))
                  sql_server_conn.commit()
                  deleted_count += 1
                else:
                  # 数据不存在,插入到wind_monitor并从中间表删除
                  station_name = row
                  farmName = station_name.split("-")
                  # 根据电站名查询电站号
                  farm_query = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"
                  mysql_cursor.execute(farm_query, ('%' + farmName + '%',))
                  farm_result = mysql_cursor.fetchone()
                  farm_id = farm_result if farm_result else None

                  wind_direction_instant = row
                  wind_speed_instant = row
                  wind_speed_two_min = row
                  wind_speed_ten_min = row
                  insert_data = (observe_time, fsz_id, station_name, farm_id, wind_direction_instant,
                                 wind_speed_instant, wind_speed_two_min, wind_speed_ten_min)

                  insert_query = 'INSERT INTO wind_monitor (observe_time, fsz_id, station_name, farm_id, wind_direction_instant, wind_speed_instant, wind_speed_two_min, wind_speed_ten_min) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
                  mysql_cursor.execute(insert_query, insert_data)
                  mysql_conn.commit()
                  inserted_count += 1
                  delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"
                  sql_server_cursor.execute(delete_query, (dtime, row))
                  sql_server_conn.commit()

    # 打印删除和插入的数据统计
    print(f"从intermediateData_Wind表中删除了{deleted_count}条数据.")
    print(f"向wind_monitor表中插入了{inserted_count}条数据.")
4.总体代码

import threadingimport timeimport pyodbcimport pymysqlclass DataTransfer:    def __init__(self, sql_server_conn_params, mysql_conn_params, queryWind,queryIntermediateData, interval=1):      self.sql_server_conn_params = sql_server_conn_params      self.mysql_conn_params = mysql_conn_params      self.queryWind = queryWind      self.queryIntermediateData = queryIntermediateData      self.interval = interval      self.wind_last_dtime = '1970-01-01 00:00:00'# 初始时间    def clear_mysql_tables(self):      """清空 MySQL 中的指定表格数据"""      try:            with pymysql.connect(**self.mysql_conn_params) as mysql_conn:                with mysql_conn.cursor() as cursor:                  # 清空 wind_monitor 表                  cursor.execute("TRUNCATE TABLE wind_monitor")                  mysql_conn.commit()                  print("已清空 wind_monitor 表的数据。")      except Exception as e:            print(f"清空表格时发生错误: {e}")    def transfer_data(self):      self.transfer_wind_data()      while True:            try:                self.transfer_insert_intermediateData_Wind()            except Exception as e:                print(f"发生错误: {e}")            # 等待一定时间再次传输数据            time.sleep(self.interval)    def transfer_insert_intermediateData_Wind(self):      # 连接到 SQL Server      with pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:            with sql_server_conn.cursor() as sql_server_cursor:                # 查询中间表中所有数据                sql_server_cursor.execute(self.queryIntermediateData)                intermediate_rows = sql_server_cursor.fetchall()      # 用于跟踪删除和插入的数量      deleted_count = 0      inserted_count = 0      # 连接到 MySQL      with pymysql.connect(**self.mysql_conn_params) as mysql_conn:            with mysql_conn.cursor() as mysql_cursor:                for row in intermediate_rows:                  observe_time = row.strftime("%Y-%m-%d %H:%M:00")                  fsz_id = row.replace('"', "").strip()                  # 查抄wind_monitor表中是否存在相同数据                  check_query = "SELECT COUNT(*) FROM wind_monitor WHERE observe_time = %s AND fsz_id = %s"                  mysql_cursor.execute(check_query, (observe_time, fsz_id))                  count = mysql_cursor.fetchone()                  dtime = row.strftime("%Y-%m-%d %H:%M:00.000")                  if count > 0:                        # 数据存在,从中间表删除                        delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"                        sql_server_cursor.execute(delete_query, (dtime, row))                        sql_server_conn.commit()                        deleted_count += 1                  else:                        # 数据不存在,插入到wind_monitor并从中间表删除                        station_name = row                        farmName = station_name.split("-")                        # 根据电站名查询电站号                        farm_query = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"                        mysql_cursor.execute(farm_query, ('%' + farmName + '%',))                        farm_result = mysql_cursor.fetchone()                        farm_id = farm_result if farm_result else None                        wind_direction_instant = row                        wind_speed_instant = row                        wind_speed_two_min = row                        wind_speed_ten_min = row                        insert_data = (observe_time, fsz_id, station_name, farm_id, wind_direction_instant,                                       wind_speed_instant, wind_speed_two_min, wind_speed_ten_min)                        insert_query = 'INSERT INTO wind_monitor (observe_time, fsz_id, station_name, farm_id, wind_direction_instant, wind_speed_instant, wind_speed_two_min, wind_speed_ten_min) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'                        mysql_cursor.execute(insert_query, insert_data)                        mysql_conn.commit()                        inserted_count += 1                        delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"                        sql_server_cursor.execute(delete_query, (dtime, row))                        sql_server_conn.commit()      # 打印删除和插入的数据统计      print(f"从intermediateData_Wind表中删除了{deleted_count}条数据.")      print(f"向wind_monitor表中插入了{inserted_count}条数据.")    def transfer_wind_data(self):
      # 连接到 SQL Server
      with pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:
            with sql_server_conn.cursor() as sql_server_cursor:
                # 修改查询,仅选择上次同步后的数据
                modified_query = self.queryWind + " WHERE dtime >= ? ORDER BY dtime"
                sql_server_cursor.execute(modified_query, (self.wind_last_dtime,))
                rows = sql_server_cursor.fetchall()
      if not rows:
            return# 没有新数据
      # 连接到 MySQL
      with pymysql.connect(**self.mysql_conn_params) as mysql_conn:
            with mysql_conn.cursor() as mysql_cursor:
                data_list = []
                for row in rows:
                  observe_time = row.strftime("%Y-%m-%d %H:%M:00")
                  fsz_id = row.replace('"', "").strip()
                  station_name = row
                  farmName = station_name.split("-")
                  # 根据电站名查询电站号
                  sql = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"
                  mysql_cursor.execute(sql, ('%' + farmName + '%',))
                  result = mysql_cursor.fetchone()
                  farm_id = None
                  if result is not None:
                        farm_id = result
                  # 处理查询结果为空的情况
                  if farm_id is not None:
                        farm_id = farm_id
                  staion_name =row
                  wind_direction_instant = row
                  wind_speed_instant = row
                  wind_speed_two_min = row
                  wind_speed_ten_min = row
                  data = (observe_time,fsz_id,staion_name,farm_id,wind_direction_instant,
                            wind_speed_instant,wind_speed_two_min,wind_speed_ten_min)
                  data_list.append(data)
                  self.wind_last_dtime = row.strftime("%Y-%m-%d %H:%M:%S")
                if data_list:
                  result = mysql_cursor.executemany('INSERT INTO wind_monitor'
                                                      '(observe_time,fsz_id,station_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min) '
                                                      'VALUES (%s,%s,%s,%s,%s,%s,%s,%s)', data_list)# 根据你的表结构修改
                  mysql_conn.commit()
                  print(f'wind_monitor表插入了{result}行数据.' if result else '没有新数据插入。')
    def start(self):      # 在启动线程前先清空表格      self.clear_mysql_tables()      thread = threading.Thread(target=self.transfer_data)      thread.start()sql_server_conn_params = {    'driver': '{SQL Server}',    'server': '',    'database': '',    'uid': '',    'pwd': ''}mysql_conn_params = {    'host': 'localhost',    'user': 'root',    'password': '123456',    'database': '',    'charset': 'utf8mb4'}queryIntermediateData = "SELECT dtime,stationID,staionName,windDirectionInstant,windSpeedInstant,windSpeed2min,windSpeed10min FROM intermediateData_Wind"queryWind = 'SELECT dtime,stationID,staionName,windDirectionInstant,windSpeedInstant,windSpeed2min,windSpeed10min FROM realData_Wind'data_transfer = DataTransfer(sql_server_conn_params, mysql_conn_params, queryWind,queryIntermediateData)data_transfer.start()
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 不同数据库进行同步和增量数据(SQL server 与MySQL数据库为例)