Lean ConnectQuant针对A股的量化数据库搭建【量化数据篇】 ...

金歌  论坛元老 | 2025-4-23 18:05:16 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1765|帖子 1765|积分 5295

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
本文基于 SQLite 轻量级数据库与 QuantConnect Lean 框架,构建了一套​​支持高频数据回测、策略无缝实盘部署​​的本地化办理方案。
0x01获取所有股票
  1. import baostock as bs
  2. import pandas as pd
  3. import sqlite3
  4. from tqdm import tqdm
  5. import datetime
  6. def save_to_sqlite(df, db_path, table_name):
  7.     """将DataFrame存储到SQLite数据库"""
  8.     try:
  9.         conn = sqlite3.connect(db_path)
  10.         df.to_sql(table_name, conn, if_exists='replace', index=False)
  11.         print(f"数据成功保存至 {db_path} 的 {table_name} 表")
  12.     except Exception as e:
  13.         print(f"数据库操作失败:{str(e)}")
  14.     finally:
  15.         if 'conn' in locals() and conn:
  16.             conn.close()
  17. def validate_stock_code(code):
  18.     """规范股票代码格式为交易所.6位数字(增强创业板/科创板处理)[7,12](@ref)"""
  19.     if not isinstance(code, str):
  20.         code = str(code)
  21.    
  22.     code = code.strip().lower().replace(" ", "")
  23.    
  24.     if code.startswith(("sh.", "sz.", "bj.")):
  25.         prefix, num = code.split(".")
  26.         return f"{prefix}.{num.zfill(6)}"
  27.    
  28.     if code.isdigit():
  29.         code = code.zfill(6)
  30.         first_char = code[0]
  31.         if first_char in ['5','6','9','7'] or code.startswith("688"):
  32.             return f"sh.{code}"
  33.         elif first_char in ['0','1','2','3'] or code.startswith("30"):
  34.             return f"sz.{code}"
  35.         elif code.startswith("8"):
  36.             return f"bj.{code}"
  37.    
  38.     raise ValueError(f"无法识别的股票代码格式: {code}")
  39. def get_index_components(index_func):
  40.     """通用指数成分股获取函数(支持自动重试)[8,11](@ref)"""
  41.     max_retries = 3
  42.     for attempt in range(max_retries):
  43.         try:
  44.             rs = index_func()
  45.             if rs.error_code == '0':
  46.                 return {validate_stock_code(item[1]) for item in rs.data}
  47.         except Exception as e:
  48.             print(f"指数查询第{attempt+1}次失败: {str(e)}")
  49.             if attempt == max_retries - 1:
  50.                 return set()
  51.     return set()
  52. def get_all_stocks_with_industry():
  53.     """获取全市场股票及指数成分标记"""
  54.     try:
  55.         # 登录系统
  56.         if bs.login().error_code != '0':
  57.             raise ConnectionError("Baostock登录失败")
  58.         # 获取三大指数成分股(提前获取避免多次查询)[7,11](@ref)
  59.         sz50_set = get_index_components(bs.query_sz50_stocks)
  60.         hs300_set = get_index_components(bs.query_hs300_stocks)
  61.         zz500_set = get_index_components(bs.query_zz500_stocks)
  62.         print(f"指数成分股数量:SZ50={len(sz50_set)}, HS300={len(hs300_set)}, ZZ500={len(zz500_set)}")
  63.         # 获取全市场股票(含最新交易状态)[13](@ref)
  64.         query_date = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
  65.         rs = bs.query_all_stock(day=query_date)
  66.         if rs.error_code != '0':
  67.             raise ValueError(f"股票查询失败:{rs.error_msg}")
  68.         # 处理原始数据(增强状态过滤)[13](@ref)
  69.         raw_df = pd.DataFrame(rs.data, columns=rs.fields)
  70.         valid_df = raw_df[raw_df['tradeStatus'] == '1']  # 过滤有效交易股票
  71.         print(f"初始数据量:{len(raw_df)},有效交易股票:{len(valid_df)}")
  72.         # 代码格式转换
  73.         valid_df['valid_code'] = valid_df['code'].apply(lambda x: validate_stock_code(x))
  74.         
  75.         # 获取行业数据(批量查询优化)[3,9](@ref)
  76.         industry_rs = bs.query_stock_industry()
  77.         industry_data = []
  78.         fields = industry_rs.fields
  79.         while industry_rs.next():
  80.             row = industry_rs.get_row_data()
  81.             try:
  82.                 industry_data.append({
  83.                     'valid_code': validate_stock_code(row[1]),
  84.                     'industry': row[3],
  85.                     'industry_type': row[4]
  86.                 })
  87.             except Exception as e:
  88.                 print(f"跳过无效行业数据: {row[1]}, 原因: {str(e)}")
  89.         industry_df = pd.DataFrame(industry_data)
  90.         # 合并数据
  91.         merged_df = pd.merge(
  92.             valid_df[['valid_code', 'code_name']].rename(columns={'valid_code':'code', 'code_name':'name'}),
  93.             industry_df,
  94.             left_on='code',
  95.             right_on='valid_code',
  96.             how='left'
  97.         ).drop(columns=['valid_code'])
  98.         # 标记指数成分[7,11](@ref)
  99.         merged_df['issz50'] = merged_df['code'].isin(sz50_set).astype(int)
  100.         merged_df['ishs300'] = merged_df['code'].isin(hs300_set).astype(int)
  101.         merged_df['iszz500'] = merged_df['code'].isin(zz500_set).astype(int)
  102.         # 拆分交易所代码
  103.         merged_df['exchange'] = merged_df['code'].str.split('.').str[0]
  104.         merged_df['code'] = merged_df['code'].str.split('.').str[1]
  105.         
  106.         # 填充缺失值
  107.         merged_df.fillna({
  108.             'industry': '未知',
  109.             'industry_type': '未分类'
  110.         }, inplace=True)
  111.         return merged_df[[
  112.             'code', 'name', 'industry', 'industry_type',
  113.             'issz50', 'ishs300', 'iszz500', 'exchange'
  114.         ]]
  115.     except Exception as e:
  116.         print(f"处理失败:{str(e)}")
  117.         return None
  118.     finally:
  119.         bs.logout()
  120. if __name__ == "__main__":
  121.     df = get_all_stocks_with_industry()
  122.     if df is not None:
  123.         print("\n数据样例:")
  124.         print(df[df['industry'] != '未知'].head(5))
  125.         save_to_sqlite(df, r"../project/data/AAshares/code.db", "all_stocks")
复制代码
0x02 获取日K线数据
  1. import baostock as bs
  2. import pandas as pd
  3. import sqlite3
  4. from tqdm import tqdm
  5. import time
  6. from datetime import datetime
  7. def get_stock_codes(conn):
  8.     """从数据库获取带交易所前缀的股票代码(结构不变)"""
  9.     df = pd.read_sql('SELECT exchange, code FROM all_stocks', conn)
  10.     return [f"{row['exchange'].lower().strip()}.{str(row['code']).zfill(6)}"
  11.             for _, row in df.iterrows()]
  12. def download_daily_data(code, start_date, end_date, max_retries=3):
  13.     """下载日K线数据(仅修改frequency和fields)"""
  14.     for attempt in range(max_retries):
  15.         rs = bs.query_history_k_data_plus(
  16.             code=code,
  17.             fields="date,code,open,high,low,close,volume,amount",
  18.             start_date=start_date,
  19.             end_date=end_date,
  20.             frequency="d",  # 改为日线[1,7](@ref)
  21.             adjustflag="2"   # 保持前复权
  22.         )
  23.         if rs.error_code == '0':
  24.             return rs
  25.         time.sleep(1)
  26.     return rs
  27. def process_daily_data(rs):
  28.     """处理日K线数据(移除时间合并步骤)"""
  29.     data_list = []
  30.     while rs.next():
  31.         data_list.append(rs.get_row_data())
  32.     df = pd.DataFrame(data_list, columns=rs.fields)
  33.    
  34.     # 数据清洗(仅保留日期处理)
  35.     df['code'] = df['code'].str.split('.').str[1]
  36.     numeric_cols = ['open', 'high', 'low', 'close', 'volume', 'amount']
  37.     df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')
  38.     return df.dropna().reset_index(drop=True)
  39. def save_to_db(df, conn):
  40.     # 将DataFrame转换为字典列表
  41.     records = df.to_dict('records')
  42.     for record in records:
  43.         try:
  44.             # 单条插入语句
  45.             conn.execute(
  46.                 """
  47.                 INSERT OR IGNORE INTO stock_day_k
  48.                 (code, date, open, close, high, low, volume, amount)
  49.                 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
  50.                 """,
  51.                 (record['code'], record['date'],
  52.                 record['open'], record['close'],
  53.                 record['high'], record['low'],
  54.                 record['volume'], record['amount'])
  55.             )
  56.         except IntegrityError:
  57.             print(f"主键冲突已跳过:{record['code']} - {record['date']}")
  58.             continue
  59.     conn.commit()
  60. def main():
  61.     # 初始化数据库(修改表结构)[5](@ref)
  62.     conn = sqlite3.connect(r"../project/data/AAshares/code.db")
  63.     conn.executescript('''
  64.         CREATE TABLE IF NOT EXISTS stock_day_k (
  65.             date TEXT, code TEXT, open REAL, high REAL,
  66.             low REAL, close REAL, volume INTEGER, amount REAL,
  67.             PRIMARY KEY (date, code)
  68.         );
  69.         CREATE INDEX IF NOT EXISTS idx_day ON stock_day_k(code);
  70.     ''')
  71.     # 登录BaoStock(保持原登录逻辑)
  72.     if bs.login().error_code != '0':
  73.         print("登录失败")
  74.         return
  75.     try:
  76.         codes = get_stock_codes(conn)
  77.         start_date = '2025-01-01'  # 保持时间范围不变
  78.         end_date = '2025-04-01'
  79.         
  80.         for code in tqdm(codes, desc="下载进度"):
  81.             try:
  82.                 rs = download_daily_data(code, start_date, end_date)
  83.                 if rs.error_code != '0': continue
  84.                
  85.                 df = process_daily_data(rs)
  86.                 if not df.empty:
  87.                     save_to_db(df, conn)
  88.                     
  89.                 time.sleep(0.5)  # 保持反爬策略不变[8](@ref)
  90.                
  91.             except Exception as e:
  92.                 print(f"{code} 下载失败: {str(e)}")
  93.                
  94.     finally:
  95.         conn.close()
  96.         bs.logout()
  97. if __name__ == "__main__":
  98.     main()
复制代码
0x03 获取5分钟K线数据
  1. import baostock as bs
  2. import pandas as pd
  3. import sqlite3
  4. from tqdm import tqdm
  5. import time
  6. from datetime import datetime
  7. def get_stock_codes(conn):
  8.     """从数据库获取带交易所前缀的股票代码(网页6最佳实践)"""
  9.     df = pd.read_sql('SELECT exchange, code FROM all_stocks', conn)
  10.     return [f"{row['exchange'].lower().strip()}.{str(row['code']).zfill(6)}"
  11.             for _, row in df.iterrows()]
  12. def download_5min_data(code, start_date, end_date, max_retries=3):
  13.     """下载5分钟K线数据(含错误重试机制,网页8优化)"""
  14.     for attempt in range(max_retries):
  15.         rs = bs.query_history_k_data_plus(
  16.             code=code,
  17.             fields="date,time,code,open,high,low,close,volume,amount",
  18.             start_date=start_date,
  19.             end_date=end_date,
  20.             frequency="5",
  21.             adjustflag="2"  # 前复权
  22.         )
  23.         if rs.error_code == '0':
  24.             return rs
  25.         time.sleep(1)
  26.     return rs
  27. def process_5min_data(rs):
  28.     """处理5分钟数据格式(网页7时间处理优化)"""
  29.     data_list = []
  30.     while rs.next():
  31.         data_list.append(rs.get_row_data())
  32.     df = pd.DataFrame(data_list, columns=rs.fields)
  33.    
  34.     # 合并日期和时间字段
  35.     df['datetime'] = df['time'].apply(
  36.         lambda x: f"{x[:4]}-{x[4:6]}-{x[6:8]} {x[8:10]}:{x[10:12]}:00")
  37.     df = df.drop(columns=['date', 'time'])
  38.     df.rename(columns={'datetime': 'date'}, inplace=True)
  39.     # 数据清洗
  40.     df['code'] = df['code'].str.split('.').str[1]
  41.     numeric_cols = ['open', 'high', 'low', 'close', 'volume','amount']
  42.     df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')
  43.     return df.dropna().reset_index(drop=True)
  44. def save_to_db(df, conn):
  45.     # 将DataFrame转换为字典列表
  46.     records = df.to_dict('records')
  47.     for record in records:
  48.         try:
  49.             # 单条插入语句
  50.             conn.execute(
  51.                 """
  52.                 INSERT OR IGNORE INTO stock_5min_k
  53.                 (code, date, open, close, high, low, volume, amount)
  54.                 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
  55.                 """,
  56.                 (record['code'], record['date'],
  57.                 record['open'], record['close'],
  58.                 record['high'], record['low'],
  59.                 record['volume'], record['amount'])
  60.             )
  61.         except IntegrityError:
  62.             print(f"主键冲突已跳过:{record['code']} - {record['date']}")
  63.             continue
  64.     conn.commit()
  65. def main():
  66.     # 初始化数据库(网页6表结构)
  67.     conn = sqlite3.connect(r"../project/data/AAshares/code.db")
  68.     conn.executescript('''
  69.         CREATE TABLE IF NOT EXISTS stock_5min_k (
  70.             date TEXT, code TEXT, open REAL, high REAL,
  71.             low REAL, close REAL, volume INTEGER,amount REAL,
  72.             PRIMARY KEY (date, code)
  73.         );
  74.         CREATE INDEX IF NOT EXISTS idx_5min ON stock_5min_k(code);
  75.     ''')
  76.     # 登录BaoStock(网页7规范)
  77.     if bs.login().error_code != '0':
  78.         print("登录失败")
  79.         return
  80.     try:
  81.         codes = get_stock_codes(conn)
  82.         start_date = '2025-04-19'
  83.         end_date = datetime.now().strftime('%Y-%m-%d')  # 格式为 'YYYY-MM-DD'
  84.         
  85.         for code in tqdm(codes, desc="下载进度"):
  86.             try:
  87.                 rs = download_5min_data(code, start_date, end_date)
  88.                 if rs.error_code != '0': continue
  89.                
  90.                 df = process_5min_data(rs)
  91.                 if not df.empty:
  92.                     save_to_db(df, conn)
  93.                     
  94.                 time.sleep(0.5)  # 反爬策略(网页8建议)
  95.                
  96.             except Exception as e:
  97.                 print(f"{code} 下载失败: {str(e)}")
  98.                
  99.     finally:
  100.         conn.close()
  101.         bs.logout()
  102. if __name__ == "__main__":
  103.     main()
复制代码
0x04 通过web将获取的数据读出来
  1. import csv
  2. from io import StringIO
  3. from flask import Flask, jsonify, request, Response
  4. import sqlite3
  5. from datetime import datetime, timedelta, timezone
  6. import pandas as pd
  7. import requests
  8. app = Flask(__name__)
  9. def get_db_connection():
  10.     conn = sqlite3.connect(r"../project/data/AAshares/code.db")
  11.     conn.row_factory = sqlite3.Row
  12.     return conn
  13. def getdatasqlite(code,table_name):
  14.     """统一处理数据请求"""
  15.     conn = get_db_connection()
  16.     cursor = conn.cursor()
  17.     cursor.execute(f"""
  18.         SELECT
  19.             date AS Date,         
  20.             open AS Open,
  21.             close AS Close,
  22.             high AS High,
  23.             low AS Low,
  24.             volume AS Volume,
  25.             amount AS Amount
  26.         FROM {table_name}
  27.         WHERE code = ?
  28.         ORDER BY date
  29.     """, (code,))
  30.    
  31.     data = cursor.fetchall()
  32.     converted_data = []
  33.     for row in data:
  34.         # 按新的字段顺序解析
  35.         date_str = row[0]  # 直接使用数据库返回的日期字符串[3,8](@ref)
  36.         # 添加时区转换逻辑(假设数据库存储UTC时间)
  37.         if(table_name=="stock_5min_k"):
  38.             local_time = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")  # 添加秒并解析为本地时间
  39.         else:
  40.             local_time = datetime.strptime(date_str, "%Y-%m-%d")  # 添加秒并解析为本地时间      
  41.         date_time_str = local_time.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")  # 转换为UTC时间
  42.         converted = {
  43.             "Date": date_time_str,  # 统一时间格式[5,7](@ref)
  44.             "Open": float(row[1]),
  45.             "Close": float(row[2]),
  46.             "High": float(row[3]),
  47.             "Low": float(row[4]),
  48.             "Volume": float(row[5]),  # 交易量
  49.             "Amount": float(row[6])   # 修正字段索引[2,6](@ref)
  50.         }
  51.         converted_data.append(converted)
  52.     return converted_data
  53. def getdatahttp(code,table_name):
  54.     # 请求第三方接口
  55.     if code.startswith("6"):
  56.         secid = "1." + code
  57.     else:
  58.         secid = "0." + code
  59.     # end_date = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')
  60.     end_date = (datetime.now()).strftime('%Y%m%d')
  61.     if(table_name=="stock_5min_k"):
  62.         api_url = f"https://push2his.eastmoney.com/api/qt/stock/kline/get?secid={secid}&fields1=f1%2Cf2%2Cf3%2Cf4%2Cf5%2Cf6&fields2=f51%2Cf52%2Cf53%2Cf54%2Cf55%2Cf56%2Cf57%2Cf58%2Cf59%2Cf60%2Cf61&klt=5&fqt=1&end={end_date}&lmt=1488"  # 替换为实际API地址
  63.     else:
  64.         api_url = f"https://push2his.eastmoney.com/api/qt/stock/kline/get?secid={secid}&fields1=f1%2Cf2%2Cf3%2Cf4%2Cf5%2Cf6&fields2=f51%2Cf52%2Cf53%2Cf54%2Cf55%2Cf56%2Cf57%2Cf58%2Cf59%2Cf60%2Cf61&klt=101&fqt=1&end={end_date}&lmt=1488"  # 替换为实际API地址
  65.     try:
  66.         response = requests.get(api_url, timeout=10)
  67.         response.raise_for_status()
  68.     except requests.exceptions.RequestException as e:
  69.         return {"error": f"API请求失败: {str(e)}"}, 500
  70.     # 解析原始数据
  71.     raw_data = response.json()
  72.     klines = raw_data.get('data', {}).get('klines', [])
  73.    
  74.     # 数据清洗转换
  75.     converted_data = []
  76.     for item in klines:
  77.         parts = item.split(',')
  78.         if len(parts) < 11:
  79.             continue  # 跳过无效数据
  80.             
  81.         # 按字段位置解析数据
  82.         # 按字段位置解析数据
  83.         if(table_name=="stock_5min_k"):
  84.             local_time = datetime.strptime(parts[0] + ":00", "%Y-%m-%d %H:%M:%S")  # 添加秒并解析为本地时间
  85.         else:
  86.             local_time = datetime.strptime(parts[0] , "%Y-%m-%d")  # 添加秒并解析为本地时间
  87.         date_time_str = local_time.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")  # 转换为UTC时间
  88.         converted = {
  89.             "Date": date_time_str,
  90.             "Open": float(parts[1]),
  91.             "Close": float(parts[2]),
  92.             "High": float(parts[3]),
  93.             "Low": float(parts[4]),
  94.             "Volume": float(parts[5]),  # 交易量
  95.             "Amount": float(parts[6]),    # 交易额
  96.         }
  97.         converted_data.append(converted)
  98.     return converted_data
  99. def handle_request(code, table_name, format_type='json'):
  100.     converted_data = getdatasqlite(code,table_name)
  101.     # ...后续格式转换逻辑保持不变...
  102.     # 返回数据
  103.     if not converted_data:
  104.         return {"error": "没有有效数据"}, 404
  105.     # 根据要求格式化输出
  106.     if format_type == 'csv':
  107.         # 生成CSV
  108.         si = StringIO()
  109.         writer = csv.DictWriter(si, fieldnames=converted_data[0].keys())
  110.         writer.writeheader()
  111.         writer.writerows(converted_data)  # 写入多行数据
  112.         return si.getvalue(), 200, {'Content-Type': 'text/csv'}
  113.         
  114.     return jsonify(converted_data[-1]), 200
  115. @app.route('/dayapi', methods=['GET'])
  116. def get_daystock_data():
  117.     code = request.args.get('code')
  118.     format_type = request.args.get('format', default='json')
  119.     if not code:
  120.         return jsonify({"error": "缺少参数: code"}), 400
  121.     return handle_request(code, 'stock_day_k', format_type)
  122. @app.route('/api', methods=['GET'])
  123. def get_stock_data():
  124.     code = request.args.get('code')
  125.     format_type = request.args.get('format', default='json')
  126.     if not code:
  127.         return jsonify({"error": "缺少参数: code"}), 400
  128.     return handle_request(code, 'stock_5min_k', format_type)
  129. if __name__ == '__main__':
  130.     app.run(host='0.0.0.0', port=80, debug=True)
复制代码
访问 http://ip/dayapi?code={Value} 获取最新的一根K线
访问 http://ip/dayapi?code={Value}&format=csv 返回的是汗青的K线csv
如果成功了,则完成了基础数据的搭建,这个web是为了给Lean QuantConnect调用的。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

金歌

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表