Python带多组标签的Snowflake SQL查询批量数据导出程序

打印 上一主题 下一主题

主题 969|帖子 969|积分 2907

计划一个基于多个带标签Snowflake SQL语句作为json配置文件的Python代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。使用Python Snowflake Connector,需要思量SQL效果集是大数据量分批数据导出的环境,通过多线程和异步操作来提高程序性能,程序需要非常处置惩罚和输出,输出堕落时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记载数量的日记。
  1. import argparse
  2. import csv
  3. import json
  4. import logging
  5. import os
  6. from concurrent.futures import ThreadPoolExecutor
  7. from datetime import datetime
  8. import snowflake.connector
  9. from typing import Dict, List, Optional
  10. # 日志配置
  11. logging.basicConfig(
  12.     level=logging.INFO,
  13.     format="%(asctime)s - %(levelname)s - %(message)s",
  14.     handlers=[
  15.         logging.FileHandler("data_export.log"),
  16.         logging.StreamHandler()
  17.     ]
  18. )
  19. logger = logging.getLogger(__name__)
  20. class SnowflakeExporter:
  21.     def __init__(self, config: Dict, output_dir: str = "./output"):
  22.         self.snowflake_config = {
  23.             "user": os.getenv("SNOWFLAKE_USER"),
  24.             "password": os.getenv("SNOWFLAKE_PASSWORD"),
  25.             "account": os.getenv("SNOWFLAKE_ACCOUNT"),
  26.             "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
  27.             "database": os.getenv("SNOWFLAKE_DATABASE"),
  28.             "schema": os.getenv("SNOWFLAKE_SCHEMA")
  29.         }
  30.         self.output_dir = output_dir
  31.         os.makedirs(self.output_dir, exist_ok=True)
  32.         
  33.     def _get_file_path(self, label: str) -> str:
  34.         return os.path.join(self.output_dir, f"{label}.csv")
  35.     def _export_query(self, query_config: Dict):
  36.         label = query_config["label"]
  37.         sql = query_config["sql"]
  38.         params = query_config.get("params", {})
  39.         file_path = self._get_file_path(label)
  40.         
  41.         start_time = datetime.now()
  42.         total_rows = 0
  43.         status = "SUCCESS"
  44.         error_msg = None
  45.         
  46.         try:
  47.             conn = snowflake.connector.connect(**self.snowflake_config)
  48.             cursor = conn.cursor()
  49.             
  50.             logger.info(f"Executing query for [{label}]")
  51.             cursor.execute(sql, params)
  52.             
  53.             # 获取列信息
  54.             columns = [col[0] for col in cursor.description]
  55.             
  56.             # 初始化文件写入
  57.             with open(file_path, "w", newline="", encoding="utf-8") as f:
  58.                 writer = csv.writer(f)
  59.                 writer.writerow(columns)
  60.                
  61.                 # 分批处理数据
  62.                 while True:
  63.                     rows = cursor.fetchmany(10000)
  64.                     if not rows:
  65.                         break
  66.                     writer.writerows(rows)
  67.                     total_rows += len(rows)
  68.                     
  69.         except Exception as e:
  70.             status = "FAILED"
  71.             error_msg = str(e)
  72.             logger.error(f"Error exporting {label}: {error_msg}", exc_info=True)
  73.             
  74.         finally:
  75.             cursor.close()
  76.             conn.close()
  77.             duration = (datetime.now() - start_time).total_seconds()
  78.             
  79.             # 记录日志
  80.             log_entry = {
  81.                 "timestamp": datetime.now().isoformat(),
  82.                 "label": label,
  83.                 "status": status,
  84.                 "file_path": file_path,
  85.                 "rows_exported": total_rows,
  86.                 "duration_seconds": round(duration, 2),
  87.                 "error_message": error_msg
  88.             }
  89.             logger.info(json.dumps(log_entry, indent=2))
  90.     def execute_export(self, queries: List[Dict], max_workers: int = 5):
  91.         with ThreadPoolExecutor(max_workers=max_workers) as executor:
  92.             futures = []
  93.             for query in queries:
  94.                 futures.append(executor.submit(self._export_query, query))
  95.                
  96.             for future in futures:
  97.                 try:
  98.                     future.result()
  99.                 except Exception as e:
  100.                     logger.error(f"Thread execution error: {str(e)}")
  101. def main():
  102.     parser = argparse.ArgumentParser(description="Snowflake Data Exporter")
  103.     parser.add_argument("--config", required=True, help="Path to JSON config file")
  104.     parser.add_argument("--output-dir", default="./output", help="Output directory")
  105.     parser.add_argument("--labels", nargs="+", help="Filter queries by labels")
  106.     args = parser.parse_args()
  107.     # 加载配置文件
  108.     with open(args.config) as f:
  109.         config_data = json.load(f)
  110.    
  111.     # 过滤查询配置
  112.     queries = config_data.get("queries", [])
  113.     if args.labels:
  114.         queries = [q for q in queries if q.get("label") in args.labels]
  115.    
  116.     # 执行导出
  117.     exporter = SnowflakeExporter(config_data, args.output_dir)
  118.     exporter.execute_export(queries)
  119. if __name__ == "__main__":
  120.     main()
复制代码
程序阐明


  • 配置文件结构
  1. {
  2.   "queries": [
  3.     {
  4.       "label": "user_data",
  5.       "sql": "SELECT * FROM users WHERE created_at >= %(start_date)s",
  6.       "params": {
  7.         "start_date": "2023-01-01"
  8.       }
  9.     }
  10.   ]
  11. }
复制代码

  • 主要特性


  • 多线程处置惩罚:使用ThreadPoolExecutor实现并发导出
  • 分批处置惩罚:每次获取10,000条记载处置惩罚大数据
  • 自动重写文件:始终使用’w’模式打开文件
  • 具体日记:记载到文件和控制台,包含JSON格式的运行状态
  • 错误处置惩罚:捕获全部非常并记载具体信息
  • 参数化查询:防止SQL注入攻击
  • 环境变量:通过环境变量管理敏感信息

  • 运行方式
  1. export SNOWFLAKE_USER=your_user
  2. export SNOWFLAKE_PASSWORD=your_password
  3. python exporter.py --config queries.json --output-dir ./data --labels user_data sales_data
复制代码

  • 日记示例
  1. {
  2.   "timestamp": "2023-10-10T15:30:45.123456",
  3.   "label": "user_data",
  4.   "status": "SUCCESS",
  5.   "file_path": "./output/user_data.csv",
  6.   "rows_exported": 150000,
  7.   "duration_seconds": 12.34,
  8.   "error_message": null
  9. }
复制代码
优化点阐明


  • 内存管理


  • 使用服务器端游标分批获取数据(fetchmany)
  • 流式写入CSV文件,避免内存中保存完备效果集

  • 并发控制


  • 通过ThreadPoolExecutor管理线程池
  • 默认5个worker(可根据硬件调解)

  • 可观测性


  • 结构化日记记载
  • 精确的性能指标(持续时间、处置惩罚行数)
  • 错误堆栈信息记载

  • 安全措施


  • 参数化查询防止SQL注入
  • 敏感信息通过环境变量传递
  • 自动清理资源(with语句包管毗连关闭)
可根据实际需求调解以下参数:


  • fetchmany的批量巨细(当前10,000)
  • 线程池巨细(默认5)
  • 日记格式和具体水平
  • 文件编码方式(当前utf-8)

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

熊熊出没

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表