熊熊出没 发表于 2025-3-14 14:44:09

Python带多组标签的Snowflake SQL查询批量数据导出程序

计划一个基于多个带标签Snowflake SQL语句作为json配置文件的Python代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。使用Python Snowflake Connector,需要思量SQL效果集是大数据量分批数据导出的环境,通过多线程和异步操作来提高程序性能,程序需要非常处置惩罚和输出,输出堕落时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记载数量的日记。
import argparse
import csv
import json
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import snowflake.connector
from typing import Dict, List, Optional

# 日志配置
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
      logging.FileHandler("data_export.log"),
      logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class SnowflakeExporter:
    def __init__(self, config: Dict, output_dir: str = "./output"):
      self.snowflake_config = {
            "user": os.getenv("SNOWFLAKE_USER"),
            "password": os.getenv("SNOWFLAKE_PASSWORD"),
            "account": os.getenv("SNOWFLAKE_ACCOUNT"),
            "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
            "database": os.getenv("SNOWFLAKE_DATABASE"),
            "schema": os.getenv("SNOWFLAKE_SCHEMA")
      }
      self.output_dir = output_dir
      os.makedirs(self.output_dir, exist_ok=True)
      
    def _get_file_path(self, label: str) -> str:
      return os.path.join(self.output_dir, f"{label}.csv")

    def _export_query(self, query_config: Dict):
      label = query_config["label"]
      sql = query_config["sql"]
      params = query_config.get("params", {})
      file_path = self._get_file_path(label)
      
      start_time = datetime.now()
      total_rows = 0
      status = "SUCCESS"
      error_msg = None
      
      try:
            conn = snowflake.connector.connect(**self.snowflake_config)
            cursor = conn.cursor()
            
            logger.info(f"Executing query for [{label}]")
            cursor.execute(sql, params)
            
            # 获取列信息
            columns = for col in cursor.description]
            
            # 初始化文件写入
            with open(file_path, "w", newline="", encoding="utf-8") as f:
                writer = csv.writer(f)
                writer.writerow(columns)
               
                # 分批处理数据
                while True:
                  rows = cursor.fetchmany(10000)
                  if not rows:
                        break
                  writer.writerows(rows)
                  total_rows += len(rows)
                  
      except Exception as e:
            status = "FAILED"
            error_msg = str(e)
            logger.error(f"Error exporting {label}: {error_msg}", exc_info=True)
            
      finally:
            cursor.close()
            conn.close()
            duration = (datetime.now() - start_time).total_seconds()
            
            # 记录日志
            log_entry = {
                "timestamp": datetime.now().isoformat(),
                "label": label,
                "status": status,
                "file_path": file_path,
                "rows_exported": total_rows,
                "duration_seconds": round(duration, 2),
                "error_message": error_msg
            }
            logger.info(json.dumps(log_entry, indent=2))

    def execute_export(self, queries: List, max_workers: int = 5):
      with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []
            for query in queries:
                futures.append(executor.submit(self._export_query, query))
               
            for future in futures:
                try:
                  future.result()
                except Exception as e:
                  logger.error(f"Thread execution error: {str(e)}")

def main():
    parser = argparse.ArgumentParser(description="Snowflake Data Exporter")
    parser.add_argument("--config", required=True, help="Path to JSON config file")
    parser.add_argument("--output-dir", default="./output", help="Output directory")
    parser.add_argument("--labels", nargs="+", help="Filter queries by labels")
    args = parser.parse_args()

    # 加载配置文件
    with open(args.config) as f:
      config_data = json.load(f)
   
    # 过滤查询配置
    queries = config_data.get("queries", [])
    if args.labels:
      queries =
   
    # 执行导出
    exporter = SnowflakeExporter(config_data, args.output_dir)
    exporter.execute_export(queries)

if __name__ == "__main__":
    main()
程序阐明


[*]配置文件结构:
{
"queries": [
    {
      "label": "user_data",
      "sql": "SELECT * FROM users WHERE created_at >= %(start_date)s",
      "params": {
      "start_date": "2023-01-01"
      }
    }
]
}

[*]主要特性:


[*]多线程处置惩罚:使用ThreadPoolExecutor实现并发导出
[*]分批处置惩罚:每次获取10,000条记载处置惩罚大数据
[*]自动重写文件:始终使用’w’模式打开文件
[*]具体日记:记载到文件和控制台,包含JSON格式的运行状态
[*]错误处置惩罚:捕获全部非常并记载具体信息
[*]参数化查询:防止SQL注入攻击
[*]环境变量:通过环境变量管理敏感信息

[*]运行方式:
export SNOWFLAKE_USER=your_user
export SNOWFLAKE_PASSWORD=your_password
python exporter.py --config queries.json --output-dir ./data --labels user_data sales_data

[*]日记示例:
{
"timestamp": "2023-10-10T15:30:45.123456",
"label": "user_data",
"status": "SUCCESS",
"file_path": "./output/user_data.csv",
"rows_exported": 150000,
"duration_seconds": 12.34,
"error_message": null
}
优化点阐明


[*]内存管理:


[*]使用服务器端游标分批获取数据(fetchmany)
[*]流式写入CSV文件,避免内存中保存完备效果集

[*]并发控制:


[*]通过ThreadPoolExecutor管理线程池
[*]默认5个worker(可根据硬件调解)

[*]可观测性:


[*]结构化日记记载
[*]精确的性能指标(持续时间、处置惩罚行数)
[*]错误堆栈信息记载

[*]安全措施:


[*]参数化查询防止SQL注入
[*]敏感信息通过环境变量传递
[*]自动清理资源(with语句包管毗连关闭)
可根据实际需求调解以下参数:


[*]fetchmany的批量巨细(当前10,000)
[*]线程池巨细(默认5)
[*]日记格式和具体水平
[*]文件编码方式(当前utf-8)

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Python带多组标签的Snowflake SQL查询批量数据导出程序