代码片段: 文末附链接。- using DataSync.Core;
- using Furion.Logging.Extensions;
- using Microsoft.Data.SqlClient;
- using Microsoft.Extensions.Logging;
- using System.Data;
- namespace DataSync.Application.DataSync.Services
- {
- public class DataSyncServices : IDataSyncData, ITransient
- {
- private readonly object lockObj = new object();
- /// <summary>
- /// 客户端向服务端同步
- /// </summary>
- /// <param name="clientConn"></param>
- /// <param name="serviceConn">目标数据库</param>
- /// <returns></returns>
- public string SyncDataForClient(string clientConn, string serviceConn)
- {
- return SyncData(clientConn, serviceConn);
- }
- /// <summary>
- /// 服务端向客户端同步
- /// </summary>
- /// <param name="serviceConn"></param>
- /// <param name="clientConn"></param>
- /// <returns></returns>
- public string SyncDataForServer(string serviceConn, string clientConn)
- {
- return SyncData(serviceConn, clientConn);
- }
- /// <summary>
- /// 数据同步
- /// </summary>
- private string SyncData(string sourceConn, string targetConn)
- {
- try
- {
- //源数据库 数据源链接
- SqlSugarScope sourceDb = new SqlSugarScope(new ConnectionConfig()
- {
- DbType = SqlSugar.DbType.SqlServer,
- ConnectionString = sourceConn,
- IsAutoCloseConnection = true,
- AopEvents = new AopEvents
- {
- OnLogExecuting = (sql, ps) =>
- {
- #if DEBUG
- Log.Information($"语句:{sql},参数:{(ps.Any() ? "[" : string.Empty) + string.Join("|", ps.Select(m => $"{m.ParameterName}={m.Value}")) + (ps.Any() ? "]" : string.Empty)}");
- #endif
- }
- }
- });
- //目标数据库 数据源链接
- SqlSugarScope targetDb = new SqlSugarScope(new ConnectionConfig()
- {
- DbType = SqlSugar.DbType.SqlServer,
- ConnectionString = targetConn,
- IsAutoCloseConnection = true,
- AopEvents = new AopEvents
- {
- OnLogExecuting = (sql, ps) =>
- {
- #if DEBUG
- Log.Information($"语句:{sql},参数:{(ps.Any() ? "[" : string.Empty) + string.Join("|", ps.Select(m => $"{m.ParameterName}={m.Value}")) + (ps.Any() ? "]" : string.Empty)}");
- #endif
- }
- }
- });
- //使用sqlsugar 初始化目标数据库
- targetDb.DbMaintenance.CreateDatabase();
- var tableNames = sourceDb.DbMaintenance.GetTableInfoList(false).Select(t => t.Name).ToList(); // 调用函数获取所有表名
- var syncBlackTable = App.GetConfig<List<string>>("SyncBlackTable");
- tableNames = tableNames.Except(syncBlackTable).ToList();
- //多线程
- Parallel.ForEach(tableNames, tableName =>
- {
- lock (lockObj)
- {
- // 根据表名从源数据库中获取数据并存入 DataTable
- var targetdataTable = DataTableHelper.FetchDataFromTable(tableName, sourceDb);
- //判断数据表在目标库是否存在
- var flagTargetTab = targetDb.DbMaintenance.IsAnyTable(tableName);
- if (!flagTargetTab)
- {
- // 创建表的SQL语句
- var createTableSql = $"CREATE TABLE {tableName} (";
- if (targetdataTable != null && targetdataTable.Rows.Count > 0)
- {
- //目标数据库写入-先移除数据同步标识
- DataBaseInfoService.DatatableRemoveCloumns(targetdataTable);
- // 遍历DataTable的列
- foreach (DataColumn column in targetdataTable.Columns)
- {
- string columnName = column.ColumnName;
- string dataType = DataBaseInfoService.GetSqlDataType(column.DataType);
- createTableSql += $"{columnName} {dataType}, ";
- }
- createTableSql = createTableSql.TrimEnd(',', ' ') + ")";
- // 创建表
- targetDb.Ado.ExecuteCommand(createTableSql);
- ("TargetTable : " + tableName + ",创建成功").LogInformation();
- // }
- //}
- }
- }
- //AppSys
- if (tableName.ToUpper().Equals("APPSYS"))
- {
- AppSysDataSync.SyncData(tableName, sourceDb, targetDb);
- }
- var selectCountSql = $"SELECT COUNT(*) FROM {tableName} ";
- var sourceCount = sourceDb.Ado.GetInt(selectCountSql);
- var middleCount = targetDb.Ado.GetInt(selectCountSql);
- //增量
- if (sourceCount > middleCount)
- {
- // commandTarget.Connection = connTarget;
- // commandTarget.CommandType = CommandType.Text;
- //查询数据
- var selectTableSql = $"SELECT * FROM {tableName}";
- //创建datatable(源数据)
- var sourceDataTable = sourceDb.Ado.GetDataTable(selectTableSql);
- if (sourceDataTable != null && sourceDataTable.Rows.Count > 0)
- {
- //新增列 MD5
- DataBaseInfoService.DataTableAddColumsMd5(sourceDataTable);
- }
- //创建datatable(目标表数据)
- var targetDataTable = targetDb.Ado.GetDataTable(selectTableSql);
- if (targetDataTable != null && targetDataTable.Rows.Count > 0)
- {
- //新增列 MD5
- DataBaseInfoService.DataTableAddColumsMd5(targetDataTable);
- }
- // 计算差集
- var tempTable = new DataTable();
- var tempExceptTable = (from source in sourceDataTable.AsEnumerable()
- where
- !(from target in targetDataTable.AsEnumerable() select target.Field<string>("MD5")).Contains(
- source.Field<string>("MD5"))
- select source);
- if (tempExceptTable != null && tempExceptTable.Count() > 0)
- {
- tempTable = tempExceptTable.CopyToDataTable();
- }
- //批量插入数据
- if (tempTable != null && tempTable.Rows.Count > 0)
- {
- //目标数据库写入-先移除数据同步标识,MD5标识
- DataBaseInfoService.DatatableRemoveCloumns(tempTable);
- var connTarget = new SqlConnection(targetConn);
- DataBaseInfoService.DataBulkCopy(connTarget, tableName, tempTable);
- // TargetDataScope.Db.Fastest<DataTable>().AS(tableName).BulkCopy(tempTable);
- }
- }
- //删除
- else if (sourceCount < middleCount)
- {
- //查询数据
- var selectTableSql = $"SELECT * FROM {tableName}";
- //创建datatable(源数据)
- var sourceDataTable = sourceDb.Ado.GetDataTable(selectTableSql);
- if (sourceDataTable != null && sourceDataTable.Rows.Count > 0)
- {
- //新增列 MD5
- DataBaseInfoService.DataTableAddColumsMd5(sourceDataTable);
- }
- //创建datatable
- var taergetTable = targetDb.Ado.GetDataTable(selectTableSql);
- if (taergetTable != null && taergetTable.Rows.Count > 0)
- {
- //新增列 MD5
- DataBaseInfoService.DataTableAddColumsMd5(taergetTable);
- }
- // 计算差集
- var tempTable = new DataTable();
- var tempExceptTable = (from target in taergetTable.AsEnumerable()
- where
- !(from source in sourceDataTable.AsEnumerable() select source.Field<string>("MD5")).Contains(
- target.Field<string>("MD5"))
- select target);
- if (tempExceptTable != null && tempExceptTable.Count() > 0)
- {
- tempTable = tempExceptTable.CopyToDataTable();
- }
- if (tempTable != null && tempTable.Rows.Count > 0)
- {
- //获取主键字段
- var PrimaryKeyName = targetDb.DbMaintenance.GetPrimaries(tableName);
- //DataTableHelper.GetPrimaryKeyFieldName(tableName, connTarget);
- //获取自增列
- var Identities = targetDb.DbMaintenance.GetIsIdentities(tableName);
- if (PrimaryKeyName != null && PrimaryKeyName.Count > 0)
- {
- foreach (DataRow row in tempTable.Rows)
- {
- var deleteDataSql = DataTableHelper.ConstructDeleteSql(tableName, PrimaryKeyName, Identities, row);
- //$"DELETE FROM {tableName} WHERE {PrimaryKeyName} ='{row[PrimaryKeyName[0]]}'";
- //目标数据数据操作对象
- targetDb.Ado.ExecuteCommand(deleteDataSql);
- }
- }
- }
- }
- //更新
- else
- {
- //判断是否存在需要更新的记录
- //和目标表比较取差集
- //查询数据
- var selectTableSql = $"SELECT * FROM {tableName}";
- //创建datatable(源数据)
- var sourceDataTable = sourceDb.Ado.GetDataTable(selectTableSql);
- if (sourceDataTable != null && sourceDataTable.Rows.Count > 0)
- {
- //新增列 MD5
- DataBaseInfoService.DataTableAddColumsMd5(sourceDataTable);
- }
- //创建datatable(目标表数据)
- var targetDataTable = targetDb.Ado.GetDataTable(selectTableSql);
- if (targetDataTable != null && targetDataTable.Rows.Count > 0)
- {
- //新增列 MD5
- DataBaseInfoService.DataTableAddColumsMd5(targetDataTable);
- }
- // 计算差集
- var tempTable = new DataTable();
- var tempExceptTable = (from source in sourceDataTable.AsEnumerable()
- where
- !(from target in targetDataTable.AsEnumerable() select target.Field<string>("MD5")).Contains(
- source.Field<string>("MD5"))
- select source);
- if (tempExceptTable != null && tempExceptTable.Count() > 0)
- {
- tempTable = tempExceptTable.CopyToDataTable();
- }
- if (tempTable != null && tempTable.Rows.Count > 0)
- {
- //删除标识列和MD5列
- DataBaseInfoService.DatatableRemoveCloumns(tempTable);
- //获取目标表主键字段
- var PrimaryKeyName = targetDb.DbMaintenance.GetPrimaries(tableName);
- //获取自增列
- var Identities = targetDb.DbMaintenance.GetIsIdentities(tableName);
- //DataTableHelper.GetPrimaryKeyFieldName(tableName, connTarget);
- foreach (DataRow dataRow in tempTable.Rows)
- {
- var updateDataSql = DataTableHelper.ConstructUpdateSql(tableName, PrimaryKeyName, Identities, dataRow);
- targetDb.Ado.ExecuteCommand(updateDataSql);
- }
- }
- }
- }
- });
- }
- catch (Exception ex)
- {
- ("Error occurred while connecting to database or fetching data from tables.\n" + ex.Message).LogError();
- return "同步失败。详见错误日志!";
- }
- return "同步完成!";
- }
- }
- }
复制代码 Gitee: https://gitee.com/ltf_free/sync-data.git
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |