Python带多组标签的Snowflake SQL查询批量数据导出程序

[复制链接]
发表于 2025-3-14 14:44:09 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
计划一个基于多个带标签Snowflake SQL语句作为json配置文件的Python代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。使用Python Snowflake Connector,需要思量SQL效果集是大数据量分批数据导出的环境,通过多线程和异步操作来提高程序性能,程序需要非常处置惩罚和输出,输出堕落时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记载数量的日记。
  1. import argparse
  2. import csv
  3. import json
  4. import logging
  5. import os
  6. from concurrent.futures import ThreadPoolExecutor
  7. from datetime import datetime
  8. import snowflake.connector
  9. from typing import Dict, List, Optional
  10. # 日志日志配置
  11. logging.basicConfig(
  12.     level=logging.INFO,
  13.     format="%(asctime)s - %(levelname)s - %(message)s",
  14.     handlers=[
  15.         logging.FileHandler("data_export.log"),
  16.         logging.StreamHandler()
  17.     ]
  18. )
  19. logger = logging.getLogger(__name__)
  20. class SnowflakeExporter:
  21.     def __init__(self, config: Dict, output_dir: str = "./output"):
  22.         self.snowflake_config = {
  23.             "user": os.getenv("SNOWFLAKE_USER"),
  24.             "password": os.getenv("SNOWFLAKE_PASSWORD"),
  25.             "account": os.getenv("SNOWFLAKE_ACCOUNT"),
  26.             "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
  27.             "database": os.getenv("SNOWFLAKE_DATABASE"),
  28.             "schema": os.getenv("SNOWFLAKE_SCHEMA")
  29.         }
  30.         self.output_dir = output_dir
  31.         os.makedirs(self.output_dir, exist_ok=True)
  32.         
  33.     def _get_file_path(self, label: str) -> str:
  34.         return os.path.join(self.output_dir, f"{label}.csv")
  35.     def _export_query(self, query_config: Dict):
  36.         label = query_config["label"]
  37.         sql = query_config["sql"]
  38.         params = query_config.get("params", {})
  39.         file_path = self._get_file_path(label)
  40.         
  41.         start_time = datetime.now()
  42.         total_rows = 0
  43.         status = "SUCCESS"
  44.         error_msg = None
  45.         
  46.         try:
  47.             conn = snowflake.connector.connect(**self.snowflake_config)
  48.             cursor = conn.cursor()
  49.             
  50.             logger.info(f"Executing query for [{label}]")
  51.             cursor.execute(sql, params)
  52.             
  53.             # 获取列信息
  54.             columns = [col[0] for col in cursor.description]
  55.             
  56.             # 初始化文件写入
  57.             with open(file_path, "w", newline="", encoding="utf-8") as f:
  58.                 writer = csv.writer(f)
  59.                 writer.writerow(columns)
  60.                
  61.                 # 分批处理数据
  62.                 while True:
  63.                     rows = cursor.fetchmany(10000)
  64.                     if not rows:
  65.                         break
  66.                     writer.writerows(rows)
  67.                     total_rows += len(rows)
  68.                     
  69.         except Exception as e:
  70.             status = "FAILED"
  71.             error_msg = str(e)
  72.             logger.error(f"Error exporting {label}: {error_msg}", exc_info=True)
  73.             
  74.         finally:
  75.             cursor.close()
  76.             conn.close()
  77.             duration = (datetime.now() - start_time).total_seconds()
  78.             
  79.             # 记录日志日志
  80.             log_entry = {
  81.                 "timestamp": datetime.now().isoformat(),
  82.                 "label": label,
  83.                 "status": status,
  84.                 "file_path": file_path,
  85.                 "rows_exported": total_rows,
  86.                 "duration_seconds": round(duration, 2),
  87.                 "error_message": error_msg
  88.             }
  89.             logger.info(json.dumps(log_entry, indent=2))
  90.     def execute_export(self, queries: List[Dict], max_workers: int = 5):
  91.         with ThreadPoolExecutor(max_workers=max_workers) as executor:
  92.             futures = []
  93.             for query in queries:
  94.                 futures.append(executor.submit(self._export_query, query))
  95.                
  96.             for future in futures:
  97.                 try:
  98.                     future.result()
  99.                 except Exception as e:
  100.                     logger.error(f"Thread execution error: {str(e)}")
  101. def main():
  102.     parser = argparse.ArgumentParser(description="Snowflake Data Exporter")
  103.     parser.add_argument("--config", required=True, help="Path to JSON config file")
  104.     parser.add_argument("--output-dir", default="./output", help="Output directory")
  105.     parser.add_argument("--labels", nargs="+", help="Filter queries by labels")
  106.     args = parser.parse_args()
  107.     # 加载配置文件
  108.     with open(args.config) as f:
  109.         config_data = json.load(f)
  110.    
  111.     # 过滤查询配置
  112.     queries = config_data.get("queries", [])
  113.     if args.labels:
  114.         queries = [q for q in queries if q.get("label") in args.labels]
  115.    
  116.     # 执行导出
  117.     exporter = SnowflakeExporter(config_data, args.output_dir)
  118.     exporter.execute_export(queries)
  119. if __name__ == "__main__":
  120.     main()
复制代码
程序阐明


  • 配置文件结构
  1. {
  2.   "queries": [
  3.     {
  4.       "label": "user_data",
  5.       "sql": "SELECT * FROM users WHERE created_at >= %(start_date)s",
  6.       "params": {
  7.         "start_date": "2023-01-01"
  8.       }
  9.     }
  10.   ]
  11. }
复制代码

  • 主要特性


  • 多线程处置惩罚:使用ThreadPoolExecutor实现并发导出
  • 分批处置惩罚:每次获取10,000条记载处置惩罚大数据
  • 自动重写文件:始终使用’w’模式打开文件
  • 具体日记:记载到文件和控制台,包含JSON格式的运行状态
  • 错误处置惩罚:捕获全部非常并记载具体信息
  • 参数化查询:防止SQL注入攻击
  • 环境变量:通过环境变量管理敏感信息

  • 运行方式
  1. export SNOWFLAKE_USER=your_user
  2. export SNOWFLAKE_PASSWORD=your_password
  3. python exporter.py --config queries.json --output-dir ./data --labels user_data sales_data
复制代码

  • 日记示例
  1. {
  2.   "timestamp": "2023-10-10T15:30:45.123456",
  3.   "label": "user_data",
  4.   "status": "SUCCESS",
  5.   "file_path": "./output/user_data.csv",
  6.   "rows_exported": 150000,
  7.   "duration_seconds": 12.34,
  8.   "error_message": null
  9. }
复制代码
优化点阐明


  • 内存管理


  • 使用服务器端游标分批获取数据(fetchmany)
  • 流式写入CSV文件,避免内存中保存完备效果集

  • 并发控制


  • 通过ThreadPoolExecutor管理线程池
  • 默认5个worker(可根据硬件调解)


  • 结构化日记记载
  • 精确的性能指标(持续时间、处置惩罚行数)
  • 错误堆栈信息记载


  • 参数化查询防止SQL注入
  • 敏感信息通过环境变量传递
  • 自动清理资源(with语句包管毗连关闭)
可根据实际需求调解以下参数:


  • fetchmany的批量巨细(当前10,000)
  • 线程池巨细(默认5)
  • 日记格式和具体水平
  • 文件编码方式(当前utf-8)

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告
回复

使用道具 举报

×
登录参与点评抽奖,加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表