Python带多组标签的Snowflake SQL查询批量数据导出程序
计划一个基于多个带标签Snowflake SQL语句作为json配置文件的Python代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。使用Python Snowflake Connector,需要思量SQL效果集是大数据量分批数据导出的环境,通过多线程和异步操作来提高程序性能,程序需要非常处置惩罚和输出,输出堕落时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记载数量的日记。import argparse
import csv
import json
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import snowflake.connector
from typing import Dict, List, Optional
# 日志配置
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("data_export.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class SnowflakeExporter:
def __init__(self, config: Dict, output_dir: str = "./output"):
self.snowflake_config = {
"user": os.getenv("SNOWFLAKE_USER"),
"password": os.getenv("SNOWFLAKE_PASSWORD"),
"account": os.getenv("SNOWFLAKE_ACCOUNT"),
"warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
"database": os.getenv("SNOWFLAKE_DATABASE"),
"schema": os.getenv("SNOWFLAKE_SCHEMA")
}
self.output_dir = output_dir
os.makedirs(self.output_dir, exist_ok=True)
def _get_file_path(self, label: str) -> str:
return os.path.join(self.output_dir, f"{label}.csv")
def _export_query(self, query_config: Dict):
label = query_config["label"]
sql = query_config["sql"]
params = query_config.get("params", {})
file_path = self._get_file_path(label)
start_time = datetime.now()
total_rows = 0
status = "SUCCESS"
error_msg = None
try:
conn = snowflake.connector.connect(**self.snowflake_config)
cursor = conn.cursor()
logger.info(f"Executing query for [{label}]")
cursor.execute(sql, params)
# 获取列信息
columns = for col in cursor.description]
# 初始化文件写入
with open(file_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(columns)
# 分批处理数据
while True:
rows = cursor.fetchmany(10000)
if not rows:
break
writer.writerows(rows)
total_rows += len(rows)
except Exception as e:
status = "FAILED"
error_msg = str(e)
logger.error(f"Error exporting {label}: {error_msg}", exc_info=True)
finally:
cursor.close()
conn.close()
duration = (datetime.now() - start_time).total_seconds()
# 记录日志
log_entry = {
"timestamp": datetime.now().isoformat(),
"label": label,
"status": status,
"file_path": file_path,
"rows_exported": total_rows,
"duration_seconds": round(duration, 2),
"error_message": error_msg
}
logger.info(json.dumps(log_entry, indent=2))
def execute_export(self, queries: List, max_workers: int = 5):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for query in queries:
futures.append(executor.submit(self._export_query, query))
for future in futures:
try:
future.result()
except Exception as e:
logger.error(f"Thread execution error: {str(e)}")
def main():
parser = argparse.ArgumentParser(description="Snowflake Data Exporter")
parser.add_argument("--config", required=True, help="Path to JSON config file")
parser.add_argument("--output-dir", default="./output", help="Output directory")
parser.add_argument("--labels", nargs="+", help="Filter queries by labels")
args = parser.parse_args()
# 加载配置文件
with open(args.config) as f:
config_data = json.load(f)
# 过滤查询配置
queries = config_data.get("queries", [])
if args.labels:
queries =
# 执行导出
exporter = SnowflakeExporter(config_data, args.output_dir)
exporter.execute_export(queries)
if __name__ == "__main__":
main()
程序阐明
[*]配置文件结构:
{
"queries": [
{
"label": "user_data",
"sql": "SELECT * FROM users WHERE created_at >= %(start_date)s",
"params": {
"start_date": "2023-01-01"
}
}
]
}
[*]主要特性:
[*]多线程处置惩罚:使用ThreadPoolExecutor实现并发导出
[*]分批处置惩罚:每次获取10,000条记载处置惩罚大数据
[*]自动重写文件:始终使用’w’模式打开文件
[*]具体日记:记载到文件和控制台,包含JSON格式的运行状态
[*]错误处置惩罚:捕获全部非常并记载具体信息
[*]参数化查询:防止SQL注入攻击
[*]环境变量:通过环境变量管理敏感信息
[*]运行方式:
export SNOWFLAKE_USER=your_user
export SNOWFLAKE_PASSWORD=your_password
python exporter.py --config queries.json --output-dir ./data --labels user_data sales_data
[*]日记示例:
{
"timestamp": "2023-10-10T15:30:45.123456",
"label": "user_data",
"status": "SUCCESS",
"file_path": "./output/user_data.csv",
"rows_exported": 150000,
"duration_seconds": 12.34,
"error_message": null
}
优化点阐明
[*]内存管理:
[*]使用服务器端游标分批获取数据(fetchmany)
[*]流式写入CSV文件,避免内存中保存完备效果集
[*]并发控制:
[*]通过ThreadPoolExecutor管理线程池
[*]默认5个worker(可根据硬件调解)
[*]可观测性:
[*]结构化日记记载
[*]精确的性能指标(持续时间、处置惩罚行数)
[*]错误堆栈信息记载
[*]安全措施:
[*]参数化查询防止SQL注入
[*]敏感信息通过环境变量传递
[*]自动清理资源(with语句包管毗连关闭)
可根据实际需求调解以下参数:
[*]fetchmany的批量巨细(当前10,000)
[*]线程池巨细(默认5)
[*]日记格式和具体水平
[*]文件编码方式(当前utf-8)
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]