使用C# asp.net core 同步数据库
代码片段: 文末附链接。using DataSync.Core;
using Furion.Logging.Extensions;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
using System.Data;
namespace DataSync.Application.DataSync.Services
{
public class DataSyncServices : IDataSyncData, ITransient
{
private readonly object lockObj = new object();
/// <summary>
/// 客户端向服务端同步
/// </summary>
/// <param name="clientConn"></param>
/// <param name="serviceConn">目标数据库</param>
/// <returns></returns>
public string SyncDataForClient(string clientConn, string serviceConn)
{
return SyncData(clientConn, serviceConn);
}
/// <summary>
/// 服务端向客户端同步
/// </summary>
/// <param name="serviceConn"></param>
/// <param name="clientConn"></param>
/// <returns></returns>
public string SyncDataForServer(string serviceConn, string clientConn)
{
return SyncData(serviceConn, clientConn);
}
/// <summary>
/// 数据同步
/// </summary>
private string SyncData(string sourceConn, string targetConn)
{
try
{
//源数据库 数据源链接
SqlSugarScope sourceDb = new SqlSugarScope(new ConnectionConfig()
{
DbType = SqlSugar.DbType.SqlServer,
ConnectionString = sourceConn,
IsAutoCloseConnection = true,
AopEvents = new AopEvents
{
OnLogExecuting = (sql, ps) =>
{
#if DEBUG
Log.Information($"语句:{sql},参数:{(ps.Any() ? "[" : string.Empty) + string.Join("|", ps.Select(m => $"{m.ParameterName}={m.Value}")) + (ps.Any() ? "]" : string.Empty)}");
#endif
}
}
});
//目标数据库 数据源链接
SqlSugarScope targetDb = new SqlSugarScope(new ConnectionConfig()
{
DbType = SqlSugar.DbType.SqlServer,
ConnectionString = targetConn,
IsAutoCloseConnection = true,
AopEvents = new AopEvents
{
OnLogExecuting = (sql, ps) =>
{
#if DEBUG
Log.Information($"语句:{sql},参数:{(ps.Any() ? "[" : string.Empty) + string.Join("|", ps.Select(m => $"{m.ParameterName}={m.Value}")) + (ps.Any() ? "]" : string.Empty)}");
#endif
}
}
});
//使用sqlsugar 初始化目标数据库
targetDb.DbMaintenance.CreateDatabase();
var tableNames = sourceDb.DbMaintenance.GetTableInfoList(false).Select(t => t.Name).ToList(); // 调用函数获取所有表名
var syncBlackTable = App.GetConfig<List<string>>("SyncBlackTable");
tableNames = tableNames.Except(syncBlackTable).ToList();
//多线程
Parallel.ForEach(tableNames, tableName =>
{
lock (lockObj)
{
// 根据表名从源数据库中获取数据并存入 DataTable
var targetdataTable = DataTableHelper.FetchDataFromTable(tableName, sourceDb);
//判断数据表在目标库是否存在
var flagTargetTab = targetDb.DbMaintenance.IsAnyTable(tableName);
if (!flagTargetTab)
{
// 创建表的SQL语句
var createTableSql = $"CREATE TABLE {tableName} (";
if (targetdataTable != null && targetdataTable.Rows.Count > 0)
{
//目标数据库写入-先移除数据同步标识
DataBaseInfoService.DatatableRemoveCloumns(targetdataTable);
// 遍历DataTable的列
foreach (DataColumn column in targetdataTable.Columns)
{
string columnName = column.ColumnName;
string dataType = DataBaseInfoService.GetSqlDataType(column.DataType);
createTableSql += $"{columnName} {dataType}, ";
}
createTableSql = createTableSql.TrimEnd(',', ' ') + ")";
// 创建表
targetDb.Ado.ExecuteCommand(createTableSql);
("TargetTable : " + tableName + ",创建成功").LogInformation();
// }
//}
}
}
//AppSys
if (tableName.ToUpper().Equals("APPSYS"))
{
AppSysDataSync.SyncData(tableName, sourceDb, targetDb);
}
var selectCountSql = $"SELECT COUNT(*) FROM {tableName} ";
var sourceCount = sourceDb.Ado.GetInt(selectCountSql);
var middleCount = targetDb.Ado.GetInt(selectCountSql);
//增量
if (sourceCount > middleCount)
{
//commandTarget.Connection = connTarget;
// commandTarget.CommandType = CommandType.Text;
//查询数据
var selectTableSql = $"SELECT * FROM {tableName}";
//创建datatable(源数据)
var sourceDataTable = sourceDb.Ado.GetDataTable(selectTableSql);
if (sourceDataTable != null && sourceDataTable.Rows.Count > 0)
{
//新增列 MD5
DataBaseInfoService.DataTableAddColumsMd5(sourceDataTable);
}
//创建datatable(目标表数据)
var targetDataTable = targetDb.Ado.GetDataTable(selectTableSql);
if (targetDataTable != null && targetDataTable.Rows.Count > 0)
{
//新增列 MD5
DataBaseInfoService.DataTableAddColumsMd5(targetDataTable);
}
// 计算差集
var tempTable = new DataTable();
var tempExceptTable = (from source in sourceDataTable.AsEnumerable()
where
!(from target in targetDataTable.AsEnumerable() select target.Field<string>("MD5")).Contains(
source.Field<string>("MD5"))
select source);
if (tempExceptTable != null && tempExceptTable.Count() > 0)
{
tempTable = tempExceptTable.CopyToDataTable();
}
//批量插入数据
if (tempTable != null && tempTable.Rows.Count > 0)
{
//目标数据库写入-先移除数据同步标识,MD5标识
DataBaseInfoService.DatatableRemoveCloumns(tempTable);
var connTarget = new SqlConnection(targetConn);
DataBaseInfoService.DataBulkCopy(connTarget, tableName, tempTable);
//TargetDataScope.Db.Fastest<DataTable>().AS(tableName).BulkCopy(tempTable);
}
}
//删除
else if (sourceCount < middleCount)
{
//查询数据
var selectTableSql = $"SELECT * FROM {tableName}";
//创建datatable(源数据)
var sourceDataTable = sourceDb.Ado.GetDataTable(selectTableSql);
if (sourceDataTable != null && sourceDataTable.Rows.Count > 0)
{
//新增列 MD5
DataBaseInfoService.DataTableAddColumsMd5(sourceDataTable);
}
//创建datatable
var taergetTable = targetDb.Ado.GetDataTable(selectTableSql);
if (taergetTable != null && taergetTable.Rows.Count > 0)
{
//新增列 MD5
DataBaseInfoService.DataTableAddColumsMd5(taergetTable);
}
// 计算差集
var tempTable = new DataTable();
var tempExceptTable = (from target in taergetTable.AsEnumerable()
where
!(from source in sourceDataTable.AsEnumerable() select source.Field<string>("MD5")).Contains(
target.Field<string>("MD5"))
select target);
if (tempExceptTable != null && tempExceptTable.Count() > 0)
{
tempTable = tempExceptTable.CopyToDataTable();
}
if (tempTable != null && tempTable.Rows.Count > 0)
{
//获取主键字段
var PrimaryKeyName = targetDb.DbMaintenance.GetPrimaries(tableName);
//DataTableHelper.GetPrimaryKeyFieldName(tableName, connTarget);
//获取自增列
var Identities = targetDb.DbMaintenance.GetIsIdentities(tableName);
if (PrimaryKeyName != null && PrimaryKeyName.Count > 0)
{
foreach (DataRow row in tempTable.Rows)
{
var deleteDataSql = DataTableHelper.ConstructDeleteSql(tableName, PrimaryKeyName, Identities, row);
//$"DELETE FROM {tableName} WHERE {PrimaryKeyName} ='{row]}'";
//目标数据数据操作对象
targetDb.Ado.ExecuteCommand(deleteDataSql);
}
}
}
}
//更新
else
{
//判断是否存在需要更新的记录
//和目标表比较取差集
//查询数据
var selectTableSql = $"SELECT * FROM {tableName}";
//创建datatable(源数据)
var sourceDataTable = sourceDb.Ado.GetDataTable(selectTableSql);
if (sourceDataTable != null && sourceDataTable.Rows.Count > 0)
{
//新增列 MD5
DataBaseInfoService.DataTableAddColumsMd5(sourceDataTable);
}
//创建datatable(目标表数据)
var targetDataTable = targetDb.Ado.GetDataTable(selectTableSql);
if (targetDataTable != null && targetDataTable.Rows.Count > 0)
{
//新增列 MD5
DataBaseInfoService.DataTableAddColumsMd5(targetDataTable);
}
// 计算差集
var tempTable = new DataTable();
var tempExceptTable = (from source in sourceDataTable.AsEnumerable()
where
!(from target in targetDataTable.AsEnumerable() select target.Field<string>("MD5")).Contains(
source.Field<string>("MD5"))
select source);
if (tempExceptTable != null && tempExceptTable.Count() > 0)
{
tempTable = tempExceptTable.CopyToDataTable();
}
if (tempTable != null && tempTable.Rows.Count > 0)
{
//删除标识列和MD5列
DataBaseInfoService.DatatableRemoveCloumns(tempTable);
//获取目标表主键字段
var PrimaryKeyName = targetDb.DbMaintenance.GetPrimaries(tableName);
//获取自增列
var Identities = targetDb.DbMaintenance.GetIsIdentities(tableName);
//DataTableHelper.GetPrimaryKeyFieldName(tableName, connTarget);
foreach (DataRow dataRow in tempTable.Rows)
{
var updateDataSql = DataTableHelper.ConstructUpdateSql(tableName, PrimaryKeyName, Identities, dataRow);
targetDb.Ado.ExecuteCommand(updateDataSql);
}
}
}
}
});
}
catch (Exception ex)
{
("Error occurred while connecting to database or fetching data from tables.\n" + ex.Message).LogError();
return "同步失败。详见错误日志!";
}
return "同步完成!";
}
}
} Gitee: https://gitee.com/ltf_free/sync-data.git
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]