设计一个基于多个带标签Snowflake SQL语句作为json设置文件的C#代码程序,实现根据差别的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到当地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来进步程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记载数量的日记。
步调1:创建设置文件类
- // AppConfig.cs
- public class AppConfig
- {
- public SnowflakeConnectionConfig SnowflakeConnection { get; set; }
- public List<QueryConfig> Queries { get; set; }
- }
- public class SnowflakeConnectionConfig
- {
- public string Account { get; set; }
- public string User { get; set; }
- public string Password { get; set; }
- public string Warehouse { get; set; }
- public string Database { get; set; }
- public string Schema { get; set; }
- public string Role { get; set; }
- }
- public class QueryConfig
- {
- public string Label { get; set; }
- public string Sql { get; set; }
- }
复制代码 步调2:实现日记记载器
- // Logger.cs
- public class Logger
- {
- private readonly string _logPath;
- private readonly object _lock = new object();
- public Logger(string logPath)
- {
- _logPath = logPath;
- InitializeLogFile();
- }
- private void InitializeLogFile()
- {
- lock (_lock)
- {
- File.AppendAllText(_logPath, $"{"Timestamp",-25}|{"Status",-8}|{"Label",-20}|{"StartTime",-20}|{"Duration(s)",-12}|{"Rows",-10}|{"Error"}\n");
- }
- }
- public void LogSuccess(string label, DateTime startTime, long rowCount, TimeSpan duration)
- {
- var entry = $"{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}|{"SUCCESS",-8}|{label,-20}|{startTime:HH:mm:ss}|{duration.TotalSeconds,-12:N2}|{rowCount,-10}|{"-"}\n";
- WriteLogEntry(entry);
- }
- public void LogError(string label, DateTime startTime, string error, TimeSpan duration)
- {
- var entry = $"{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}|{"ERROR",-8}|{label,-20}|{startTime:HH:mm:ss}|{duration.TotalSeconds,-12:N2}|{"-",-10}|{error}\n";
- WriteLogEntry(entry);
- }
- private void WriteLogEntry(string entry)
- {
- lock (_lock)
- {
- File.AppendAllText(_logPath, entry);
- }
- Console.Write(entry);
- }
- }
复制代码 步调3:实现数据导出处理器
- // ExportProcessor.cs
- using CsvHelper;
- using Snowflake.Data.Client;
- using System.Globalization;
- public class ExportProcessor
- {
- private readonly string _connectionString;
- private readonly Logger _logger;
- public ExportProcessor(string connectionString, Logger logger)
- {
- _connectionString = connectionString;
- _logger = logger;
- }
- public async Task ExportQueryAsync(QueryConfig query, string outputDir, CancellationToken cancellationToken = default)
- {
- var startTime = DateTime.UtcNow;
- var sw = Stopwatch.StartNew();
- long rowCount = 0;
- string filePath = Path.Combine(outputDir, $"{query.Label}.csv");
- try
- {
- using (var conn = new SnowflakeDbConnection())
- {
- conn.ConnectionString = _connectionString;
- await conn.OpenAsync(cancellationToken);
- using (var cmd = conn.CreateCommand())
- {
- cmd.CommandText = query.Sql;
- using (var reader = await cmd.ExecuteReaderAsync(cancellationToken))
- using (var writer = new StreamWriter(filePath, append: false))
- using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
- {
- // Write header
- for (int i = 0; i < reader.FieldCount; i++)
- {
- csv.WriteField(reader.GetName(i));
- }
- await csv.NextRecordAsync();
- // Write rows
- while (await reader.ReadAsync(cancellationToken))
- {
- for (int i = 0; i < reader.FieldCount; i++)
- {
- csv.WriteField(reader.GetValue(i));
- }
- await csv.NextRecordAsync();
- rowCount++;
- }
- }
- }
- }
- sw.Stop();
- _logger.LogSuccess(query.Label, startTime, rowCount, sw.Elapsed);
- }
- catch (Exception ex)
- {
- sw.Stop();
- _logger.LogError(query.Label, startTime, ex.Message, sw.Elapsed);
- SafeDeleteFile(filePath);
- throw; // Re-throw if using retry logic
- }
- }
- private void SafeDeleteFile(string path)
- {
- try { File.Delete(path); }
- catch { /* Ignore deletion errors */ }
- }
- }
复制代码 步调4:主程序实现
- // Program.cs
- using System.CommandLine;
- class Program
- {
- static async Task Main(string[] args)
- {
- var configOption = new Option<FileInfo>(
- name: "--config",
- description: "Path to configuration file");
-
- var outputOption = new Option<DirectoryInfo>(
- name: "--output",
- description: "Output directory for CSV files");
- var rootCommand = new RootCommand
- {
- configOption,
- outputOption
- };
- rootCommand.Description = "Snowflake Data Exporter";
-
- rootCommand.SetHandler(async (config, outputDir) =>
- {
- await RunExport(config, outputDir);
- }, configOption, outputOption);
- await rootCommand.InvokeAsync(args);
- }
- static async Task RunExport(FileInfo configFile, DirectoryInfo outputDir)
- {
- // Read configuration
- var config = JsonSerializer.Deserialize<AppConfig>(
- File.ReadAllText(configFile.FullName),
- new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
- // Create output directory
- Directory.CreateDirectory(outputDir.FullName);
- // Initialize logger
- var logger = new Logger(Path.Combine(outputDir.FullName, "export.log"));
- // Build connection string
- var connBuilder = new SnowflakeDbConnectionStringBuilder
- {
- Account = config.SnowflakeConnection.Account,
- User = config.SnowflakeConnection.User,
- Password = config.SnowflakeConnection.Password,
- Warehouse = config.SnowflakeConnection.Warehouse,
- Db = config.SnowflakeConnection.Database,
- Schema = config.SnowflakeConnection.Schema,
- Role = config.SnowflakeConnection.Role
- };
- // Initialize processor
- var processor = new ExportProcessor(
- connBuilder.ToString(),
- logger);
- // Parallel execution with throttling
- var parallelOptions = new ParallelOptions
- {
- MaxDegreeOfParallelism = Environment.ProcessorCount
- };
- await Parallel.ForEachAsync(
- config.Queries,
- parallelOptions,
- async (query, cancellationToken) =>
- {
- await processor.ExportQueryAsync(
- query,
- outputDir.FullName,
- cancellationToken);
- });
- }
- }
复制代码 步调5:设置文件示例(config.json)
- {
- "snowflakeConnection": {
- "account": "your_account",
- "user": "your_user",
- "password": "your_password",
- "warehouse": "COMPUTE_WH",
- "database": "PROD_DB",
- "schema": "PUBLIC",
- "role": "SYSADMIN"
- },
- "queries": [
- {
- "label": "customers",
- "sql": "SELECT * FROM CUSTOMERS"
- },
- {
- "label": "orders",
- "sql": "SELECT * FROM ORDERS"
- }
- ]
- }
复制代码 实现阐明
- 并行处理:
- 使用Parallel.ForEachAsync进行并行查询处理
- 默认并行度设置为处理器焦点数
- 每个查询独立使用本身的数据库连接
- 大文件处理:
- 使用CsvHelper进行流式写入
- 采用异步I/O操作(ReadAsync/WriteAsync)
- 逐行处理避免内存爆炸
- 错误处理:
- 自动删除不完整文件
- 具体错误日记记载
- 异常传播与隔离设计
- 日记功能:
- 性能优化:
使用阐明
- 安装依赖:
- dotnet add package Snowflake.Data
- dotnet add package CsvHelper
- dotnet add package System.CommandLine
复制代码 - 编译运行:
- dotnet run -- --config ./config.json --output ./exports
复制代码 - 输出布局:
- exports/
- customers.csv
- orders.csv
- export.log
复制代码 日记示例
- 2023-09-20 14:30:45|SUCCESS |customers |14:30:30|15.23 |1000000 |-
- 2023-09-20 14:31:02|ERROR |orders |14:30:45|17.12 |- |Timeout expired
复制代码 此实现提供了:
- 线程安全的并行处理
- 完整错误处理机制
- 具体的实行日记
- 高效的大数据处理能力
- 可设置的Snowflake连接参数
- 清楚的命令行界面
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |