C#带多组标签的Snowflake SQL查询批量数据导出程序

打印 上一主题 下一主题

主题 960|帖子 960|积分 2880

设计一个基于多个带标签Snowflake SQL语句作为json设置文件的C#代码程序,实现根据差别的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到当地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来进步程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记载数量的日记。
步调1:创建设置文件类

  1. // AppConfig.cs
  2. public class AppConfig
  3. {
  4.     public SnowflakeConnectionConfig SnowflakeConnection { get; set; }
  5.     public List<QueryConfig> Queries { get; set; }
  6. }
  7. public class SnowflakeConnectionConfig
  8. {
  9.     public string Account { get; set; }
  10.     public string User { get; set; }
  11.     public string Password { get; set; }
  12.     public string Warehouse { get; set; }
  13.     public string Database { get; set; }
  14.     public string Schema { get; set; }
  15.     public string Role { get; set; }
  16. }
  17. public class QueryConfig
  18. {
  19.     public string Label { get; set; }
  20.     public string Sql { get; set; }
  21. }
复制代码
步调2:实现日记记载器

  1. // Logger.cs
  2. public class Logger
  3. {
  4.     private readonly string _logPath;
  5.     private readonly object _lock = new object();
  6.     public Logger(string logPath)
  7.     {
  8.         _logPath = logPath;
  9.         InitializeLogFile();
  10.     }
  11.     private void InitializeLogFile()
  12.     {
  13.         lock (_lock)
  14.         {
  15.             File.AppendAllText(_logPath, $"{"Timestamp",-25}|{"Status",-8}|{"Label",-20}|{"StartTime",-20}|{"Duration(s)",-12}|{"Rows",-10}|{"Error"}\n");
  16.         }
  17.     }
  18.     public void LogSuccess(string label, DateTime startTime, long rowCount, TimeSpan duration)
  19.     {
  20.         var entry = $"{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}|{"SUCCESS",-8}|{label,-20}|{startTime:HH:mm:ss}|{duration.TotalSeconds,-12:N2}|{rowCount,-10}|{"-"}\n";
  21.         WriteLogEntry(entry);
  22.     }
  23.     public void LogError(string label, DateTime startTime, string error, TimeSpan duration)
  24.     {
  25.         var entry = $"{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}|{"ERROR",-8}|{label,-20}|{startTime:HH:mm:ss}|{duration.TotalSeconds,-12:N2}|{"-",-10}|{error}\n";
  26.         WriteLogEntry(entry);
  27.     }
  28.     private void WriteLogEntry(string entry)
  29.     {
  30.         lock (_lock)
  31.         {
  32.             File.AppendAllText(_logPath, entry);
  33.         }
  34.         Console.Write(entry);
  35.     }
  36. }
复制代码
步调3:实现数据导出处理器

  1. // ExportProcessor.cs
  2. using CsvHelper;
  3. using Snowflake.Data.Client;
  4. using System.Globalization;
  5. public class ExportProcessor
  6. {
  7.     private readonly string _connectionString;
  8.     private readonly Logger _logger;
  9.     public ExportProcessor(string connectionString, Logger logger)
  10.     {
  11.         _connectionString = connectionString;
  12.         _logger = logger;
  13.     }
  14.     public async Task ExportQueryAsync(QueryConfig query, string outputDir, CancellationToken cancellationToken = default)
  15.     {
  16.         var startTime = DateTime.UtcNow;
  17.         var sw = Stopwatch.StartNew();
  18.         long rowCount = 0;
  19.         string filePath = Path.Combine(outputDir, $"{query.Label}.csv");
  20.         try
  21.         {
  22.             using (var conn = new SnowflakeDbConnection())
  23.             {
  24.                 conn.ConnectionString = _connectionString;
  25.                 await conn.OpenAsync(cancellationToken);
  26.                 using (var cmd = conn.CreateCommand())
  27.                 {
  28.                     cmd.CommandText = query.Sql;
  29.                     using (var reader = await cmd.ExecuteReaderAsync(cancellationToken))
  30.                     using (var writer = new StreamWriter(filePath, append: false))
  31.                     using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
  32.                     {
  33.                         // Write header
  34.                         for (int i = 0; i < reader.FieldCount; i++)
  35.                         {
  36.                             csv.WriteField(reader.GetName(i));
  37.                         }
  38.                         await csv.NextRecordAsync();
  39.                         // Write rows
  40.                         while (await reader.ReadAsync(cancellationToken))
  41.                         {
  42.                             for (int i = 0; i < reader.FieldCount; i++)
  43.                             {
  44.                                 csv.WriteField(reader.GetValue(i));
  45.                             }
  46.                             await csv.NextRecordAsync();
  47.                             rowCount++;
  48.                         }
  49.                     }
  50.                 }
  51.             }
  52.             sw.Stop();
  53.             _logger.LogSuccess(query.Label, startTime, rowCount, sw.Elapsed);
  54.         }
  55.         catch (Exception ex)
  56.         {
  57.             sw.Stop();
  58.             _logger.LogError(query.Label, startTime, ex.Message, sw.Elapsed);
  59.             SafeDeleteFile(filePath);
  60.             throw; // Re-throw if using retry logic
  61.         }
  62.     }
  63.     private void SafeDeleteFile(string path)
  64.     {
  65.         try { File.Delete(path); }
  66.         catch { /* Ignore deletion errors */ }
  67.     }
  68. }
复制代码
步调4:主程序实现

  1. // Program.cs
  2. using System.CommandLine;
  3. class Program
  4. {
  5.     static async Task Main(string[] args)
  6.     {
  7.         var configOption = new Option<FileInfo>(
  8.             name: "--config",
  9.             description: "Path to configuration file");
  10.         
  11.         var outputOption = new Option<DirectoryInfo>(
  12.             name: "--output",
  13.             description: "Output directory for CSV files");
  14.         var rootCommand = new RootCommand
  15.         {
  16.             configOption,
  17.             outputOption
  18.         };
  19.         rootCommand.Description = "Snowflake Data Exporter";
  20.         
  21.         rootCommand.SetHandler(async (config, outputDir) =>
  22.         {
  23.             await RunExport(config, outputDir);
  24.         }, configOption, outputOption);
  25.         await rootCommand.InvokeAsync(args);
  26.     }
  27.     static async Task RunExport(FileInfo configFile, DirectoryInfo outputDir)
  28.     {
  29.         // Read configuration
  30.         var config = JsonSerializer.Deserialize<AppConfig>(
  31.             File.ReadAllText(configFile.FullName),
  32.             new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
  33.         // Create output directory
  34.         Directory.CreateDirectory(outputDir.FullName);
  35.         // Initialize logger
  36.         var logger = new Logger(Path.Combine(outputDir.FullName, "export.log"));
  37.         // Build connection string
  38.         var connBuilder = new SnowflakeDbConnectionStringBuilder
  39.         {
  40.             Account = config.SnowflakeConnection.Account,
  41.             User = config.SnowflakeConnection.User,
  42.             Password = config.SnowflakeConnection.Password,
  43.             Warehouse = config.SnowflakeConnection.Warehouse,
  44.             Db = config.SnowflakeConnection.Database,
  45.             Schema = config.SnowflakeConnection.Schema,
  46.             Role = config.SnowflakeConnection.Role
  47.         };
  48.         // Initialize processor
  49.         var processor = new ExportProcessor(
  50.             connBuilder.ToString(),
  51.             logger);
  52.         // Parallel execution with throttling
  53.         var parallelOptions = new ParallelOptions
  54.         {
  55.             MaxDegreeOfParallelism = Environment.ProcessorCount
  56.         };
  57.         await Parallel.ForEachAsync(
  58.             config.Queries,
  59.             parallelOptions,
  60.             async (query, cancellationToken) =>
  61.             {
  62.                 await processor.ExportQueryAsync(
  63.                     query,
  64.                     outputDir.FullName,
  65.                     cancellationToken);
  66.             });
  67.     }
  68. }
复制代码
步调5:设置文件示例(config.json)

  1. {
  2.   "snowflakeConnection": {
  3.     "account": "your_account",
  4.     "user": "your_user",
  5.     "password": "your_password",
  6.     "warehouse": "COMPUTE_WH",
  7.     "database": "PROD_DB",
  8.     "schema": "PUBLIC",
  9.     "role": "SYSADMIN"
  10.   },
  11.   "queries": [
  12.     {
  13.       "label": "customers",
  14.       "sql": "SELECT * FROM CUSTOMERS"
  15.     },
  16.     {
  17.       "label": "orders",
  18.       "sql": "SELECT * FROM ORDERS"
  19.     }
  20.   ]
  21. }
复制代码
实现阐明


  • 并行处理

    • 使用Parallel.ForEachAsync进行并行查询处理
    • 默认并行度设置为处理器焦点数
    • 每个查询独立使用本身的数据库连接

  • 大文件处理

    • 使用CsvHelper进行流式写入
    • 采用异步I/O操作(ReadAsync/WriteAsync)
    • 逐行处理避免内存爆炸

  • 错误处理

    • 自动删除不完整文件
    • 具体错误日记记载
    • 异常传播与隔离设计

  • 日记功能

    • 布局化日记格式
    • 线程安全写入
    • 包含关键性能指标

  • 性能优化

    • 异步数据库操作
    • 并行查询实行
    • 流式结果集处理

使用阐明


  • 安装依赖:
    1. dotnet add package Snowflake.Data
    2. dotnet add package CsvHelper
    3. dotnet add package System.CommandLine
    复制代码
  • 编译运行:
    1. dotnet run -- --config ./config.json --output ./exports
    复制代码
  • 输出布局:
    1. exports/
    2.   customers.csv
    3.   orders.csv
    4.   export.log
    复制代码
日记示例

  1. 2023-09-20 14:30:45|SUCCESS |customers           |14:30:30|15.23      |1000000    |-
  2. 2023-09-20 14:31:02|ERROR   |orders             |14:30:45|17.12      |-          |Timeout expired
复制代码
此实现提供了:


  • 线程安全的并行处理
  • 完整错误处理机制
  • 具体的实行日记
  • 高效的大数据处理能力
  • 可设置的Snowflake连接参数
  • 清楚的命令行界面

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

美丽的神话

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表