C#带多组标签的Snowflake SQL查询批量数据导出程序
设计一个基于多个带标签Snowflake SQL语句作为json设置文件的C#代码程序,实现根据差别的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到当地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来进步程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记载数量的日记。步调1:创建设置文件类
// AppConfig.cs
public class AppConfig
{
public SnowflakeConnectionConfig SnowflakeConnection { get; set; }
public List<QueryConfig> Queries { get; set; }
}
public class SnowflakeConnectionConfig
{
public string Account { get; set; }
public string User { get; set; }
public string Password { get; set; }
public string Warehouse { get; set; }
public string Database { get; set; }
public string Schema { get; set; }
public string Role { get; set; }
}
public class QueryConfig
{
public string Label { get; set; }
public string Sql { get; set; }
}
步调2:实现日记记载器
// Logger.cs
public class Logger
{
private readonly string _logPath;
private readonly object _lock = new object();
public Logger(string logPath)
{
_logPath = logPath;
InitializeLogFile();
}
private void InitializeLogFile()
{
lock (_lock)
{
File.AppendAllText(_logPath, $"{"Timestamp",-25}|{"Status",-8}|{"Label",-20}|{"StartTime",-20}|{"Duration(s)",-12}|{"Rows",-10}|{"Error"}\n");
}
}
public void LogSuccess(string label, DateTime startTime, long rowCount, TimeSpan duration)
{
var entry = $"{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}|{"SUCCESS",-8}|{label,-20}|{startTime:HH:mm:ss}|{duration.TotalSeconds,-12:N2}|{rowCount,-10}|{"-"}\n";
WriteLogEntry(entry);
}
public void LogError(string label, DateTime startTime, string error, TimeSpan duration)
{
var entry = $"{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}|{"ERROR",-8}|{label,-20}|{startTime:HH:mm:ss}|{duration.TotalSeconds,-12:N2}|{"-",-10}|{error}\n";
WriteLogEntry(entry);
}
private void WriteLogEntry(string entry)
{
lock (_lock)
{
File.AppendAllText(_logPath, entry);
}
Console.Write(entry);
}
}
步调3:实现数据导出处理器
// ExportProcessor.cs
using CsvHelper;
using Snowflake.Data.Client;
using System.Globalization;
public class ExportProcessor
{
private readonly string _connectionString;
private readonly Logger _logger;
public ExportProcessor(string connectionString, Logger logger)
{
_connectionString = connectionString;
_logger = logger;
}
public async Task ExportQueryAsync(QueryConfig query, string outputDir, CancellationToken cancellationToken = default)
{
var startTime = DateTime.UtcNow;
var sw = Stopwatch.StartNew();
long rowCount = 0;
string filePath = Path.Combine(outputDir, $"{query.Label}.csv");
try
{
using (var conn = new SnowflakeDbConnection())
{
conn.ConnectionString = _connectionString;
await conn.OpenAsync(cancellationToken);
using (var cmd = conn.CreateCommand())
{
cmd.CommandText = query.Sql;
using (var reader = await cmd.ExecuteReaderAsync(cancellationToken))
using (var writer = new StreamWriter(filePath, append: false))
using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
{
// Write header
for (int i = 0; i < reader.FieldCount; i++)
{
csv.WriteField(reader.GetName(i));
}
await csv.NextRecordAsync();
// Write rows
while (await reader.ReadAsync(cancellationToken))
{
for (int i = 0; i < reader.FieldCount; i++)
{
csv.WriteField(reader.GetValue(i));
}
await csv.NextRecordAsync();
rowCount++;
}
}
}
}
sw.Stop();
_logger.LogSuccess(query.Label, startTime, rowCount, sw.Elapsed);
}
catch (Exception ex)
{
sw.Stop();
_logger.LogError(query.Label, startTime, ex.Message, sw.Elapsed);
SafeDeleteFile(filePath);
throw; // Re-throw if using retry logic
}
}
private void SafeDeleteFile(string path)
{
try { File.Delete(path); }
catch { /* Ignore deletion errors */ }
}
}
步调4:主程序实现
// Program.cs
using System.CommandLine;
class Program
{
static async Task Main(string[] args)
{
var configOption = new Option<FileInfo>(
name: "--config",
description: "Path to configuration file");
var outputOption = new Option<DirectoryInfo>(
name: "--output",
description: "Output directory for CSV files");
var rootCommand = new RootCommand
{
configOption,
outputOption
};
rootCommand.Description = "Snowflake Data Exporter";
rootCommand.SetHandler(async (config, outputDir) =>
{
await RunExport(config, outputDir);
}, configOption, outputOption);
await rootCommand.InvokeAsync(args);
}
static async Task RunExport(FileInfo configFile, DirectoryInfo outputDir)
{
// Read configuration
var config = JsonSerializer.Deserialize<AppConfig>(
File.ReadAllText(configFile.FullName),
new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
// Create output directory
Directory.CreateDirectory(outputDir.FullName);
// Initialize logger
var logger = new Logger(Path.Combine(outputDir.FullName, "export.log"));
// Build connection string
var connBuilder = new SnowflakeDbConnectionStringBuilder
{
Account = config.SnowflakeConnection.Account,
User = config.SnowflakeConnection.User,
Password = config.SnowflakeConnection.Password,
Warehouse = config.SnowflakeConnection.Warehouse,
Db = config.SnowflakeConnection.Database,
Schema = config.SnowflakeConnection.Schema,
Role = config.SnowflakeConnection.Role
};
// Initialize processor
var processor = new ExportProcessor(
connBuilder.ToString(),
logger);
// Parallel execution with throttling
var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
await Parallel.ForEachAsync(
config.Queries,
parallelOptions,
async (query, cancellationToken) =>
{
await processor.ExportQueryAsync(
query,
outputDir.FullName,
cancellationToken);
});
}
}
步调5:设置文件示例(config.json)
{
"snowflakeConnection": {
"account": "your_account",
"user": "your_user",
"password": "your_password",
"warehouse": "COMPUTE_WH",
"database": "PROD_DB",
"schema": "PUBLIC",
"role": "SYSADMIN"
},
"queries": [
{
"label": "customers",
"sql": "SELECT * FROM CUSTOMERS"
},
{
"label": "orders",
"sql": "SELECT * FROM ORDERS"
}
]
}
实现阐明
[*] 并行处理:
[*]使用Parallel.ForEachAsync进行并行查询处理
[*]默认并行度设置为处理器焦点数
[*]每个查询独立使用本身的数据库连接
[*] 大文件处理:
[*]使用CsvHelper进行流式写入
[*]采用异步I/O操作(ReadAsync/WriteAsync)
[*]逐行处理避免内存爆炸
[*] 错误处理:
[*]自动删除不完整文件
[*]具体错误日记记载
[*]异常传播与隔离设计
[*] 日记功能:
[*]布局化日记格式
[*]线程安全写入
[*]包含关键性能指标
[*] 性能优化:
[*]异步数据库操作
[*]并行查询实行
[*]流式结果集处理
使用阐明
[*] 安装依赖:
dotnet add package Snowflake.Data
dotnet add package CsvHelper
dotnet add package System.CommandLine
[*] 编译运行:
dotnet run -- --config ./config.json --output ./exports
[*] 输出布局:
exports/
customers.csv
orders.csv
export.log
日记示例
2023-09-20 14:30:45|SUCCESS |customers |14:30:30|15.23 |1000000 |-
2023-09-20 14:31:02|ERROR |orders |14:30:45|17.12 |- |Timeout expired
此实现提供了:
[*]线程安全的并行处理
[*]完整错误处理机制
[*]具体的实行日记
[*]高效的大数据处理能力
[*]可设置的Snowflake连接参数
[*]清楚的命令行界面
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]