计划一个基于多个带标签Snowflake SQL语句作为json配置文件的Python代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。使用Python Snowflake Connector,需要思量SQL效果集是大数据量分批数据导出的环境,通过多线程和异步操作来提高程序性能,程序需要非常处置惩罚和输出,输出堕落时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记载数量的日记。
- import argparse
- import csv
- import json
- import logging
- import os
- from concurrent.futures import ThreadPoolExecutor
- from datetime import datetime
- import snowflake.connector
- from typing import Dict, List, Optional
- # 日志配置
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(levelname)s - %(message)s",
- handlers=[
- logging.FileHandler("data_export.log"),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- class SnowflakeExporter:
- def __init__(self, config: Dict, output_dir: str = "./output"):
- self.snowflake_config = {
- "user": os.getenv("SNOWFLAKE_USER"),
- "password": os.getenv("SNOWFLAKE_PASSWORD"),
- "account": os.getenv("SNOWFLAKE_ACCOUNT"),
- "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
- "database": os.getenv("SNOWFLAKE_DATABASE"),
- "schema": os.getenv("SNOWFLAKE_SCHEMA")
- }
- self.output_dir = output_dir
- os.makedirs(self.output_dir, exist_ok=True)
-
- def _get_file_path(self, label: str) -> str:
- return os.path.join(self.output_dir, f"{label}.csv")
- def _export_query(self, query_config: Dict):
- label = query_config["label"]
- sql = query_config["sql"]
- params = query_config.get("params", {})
- file_path = self._get_file_path(label)
-
- start_time = datetime.now()
- total_rows = 0
- status = "SUCCESS"
- error_msg = None
-
- try:
- conn = snowflake.connector.connect(**self.snowflake_config)
- cursor = conn.cursor()
-
- logger.info(f"Executing query for [{label}]")
- cursor.execute(sql, params)
-
- # 获取列信息
- columns = [col[0] for col in cursor.description]
-
- # 初始化文件写入
- with open(file_path, "w", newline="", encoding="utf-8") as f:
- writer = csv.writer(f)
- writer.writerow(columns)
-
- # 分批处理数据
- while True:
- rows = cursor.fetchmany(10000)
- if not rows:
- break
- writer.writerows(rows)
- total_rows += len(rows)
-
- except Exception as e:
- status = "FAILED"
- error_msg = str(e)
- logger.error(f"Error exporting {label}: {error_msg}", exc_info=True)
-
- finally:
- cursor.close()
- conn.close()
- duration = (datetime.now() - start_time).total_seconds()
-
- # 记录日志
- log_entry = {
- "timestamp": datetime.now().isoformat(),
- "label": label,
- "status": status,
- "file_path": file_path,
- "rows_exported": total_rows,
- "duration_seconds": round(duration, 2),
- "error_message": error_msg
- }
- logger.info(json.dumps(log_entry, indent=2))
- def execute_export(self, queries: List[Dict], max_workers: int = 5):
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- futures = []
- for query in queries:
- futures.append(executor.submit(self._export_query, query))
-
- for future in futures:
- try:
- future.result()
- except Exception as e:
- logger.error(f"Thread execution error: {str(e)}")
- def main():
- parser = argparse.ArgumentParser(description="Snowflake Data Exporter")
- parser.add_argument("--config", required=True, help="Path to JSON config file")
- parser.add_argument("--output-dir", default="./output", help="Output directory")
- parser.add_argument("--labels", nargs="+", help="Filter queries by labels")
- args = parser.parse_args()
- # 加载配置文件
- with open(args.config) as f:
- config_data = json.load(f)
-
- # 过滤查询配置
- queries = config_data.get("queries", [])
- if args.labels:
- queries = [q for q in queries if q.get("label") in args.labels]
-
- # 执行导出
- exporter = SnowflakeExporter(config_data, args.output_dir)
- exporter.execute_export(queries)
- if __name__ == "__main__":
- main()
复制代码 程序阐明
- {
- "queries": [
- {
- "label": "user_data",
- "sql": "SELECT * FROM users WHERE created_at >= %(start_date)s",
- "params": {
- "start_date": "2023-01-01"
- }
- }
- ]
- }
复制代码
- 多线程处置惩罚:使用ThreadPoolExecutor实现并发导出
- 分批处置惩罚:每次获取10,000条记载处置惩罚大数据
- 自动重写文件:始终使用’w’模式打开文件
- 具体日记:记载到文件和控制台,包含JSON格式的运行状态
- 错误处置惩罚:捕获全部非常并记载具体信息
- 参数化查询:防止SQL注入攻击
- 环境变量:通过环境变量管理敏感信息
- export SNOWFLAKE_USER=your_user
- export SNOWFLAKE_PASSWORD=your_password
- python exporter.py --config queries.json --output-dir ./data --labels user_data sales_data
复制代码- {
- "timestamp": "2023-10-10T15:30:45.123456",
- "label": "user_data",
- "status": "SUCCESS",
- "file_path": "./output/user_data.csv",
- "rows_exported": 150000,
- "duration_seconds": 12.34,
- "error_message": null
- }
复制代码 优化点阐明
- 使用服务器端游标分批获取数据(fetchmany)
- 流式写入CSV文件,避免内存中保存完备效果集
- 通过ThreadPoolExecutor管理线程池
- 默认5个worker(可根据硬件调解)
- 结构化日记记载
- 精确的性能指标(持续时间、处置惩罚行数)
- 错误堆栈信息记载
- 参数化查询防止SQL注入
- 敏感信息通过环境变量传递
- 自动清理资源(with语句包管毗连关闭)
可根据实际需求调解以下参数:
- fetchmany的批量巨细(当前10,000)
- 线程池巨细(默认5)
- 日记格式和具体水平
- 文件编码方式(当前utf-8)
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |