马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
本文基于 SQLite 轻量级数据库与 QuantConnect Lean 框架,构建了一套支持高频数据回测、策略无缝实盘部署的本地化办理方案。
0x01获取所有股票
- import baostock as bs
- import pandas as pd
- import sqlite3
- from tqdm import tqdm
- import datetime
- def save_to_sqlite(df, db_path, table_name):
- """将DataFrame存储到SQLite数据库"""
- try:
- conn = sqlite3.connect(db_path)
- df.to_sql(table_name, conn, if_exists='replace', index=False)
- print(f"数据成功保存至 {db_path} 的 {table_name} 表")
- except Exception as e:
- print(f"数据库操作失败:{str(e)}")
- finally:
- if 'conn' in locals() and conn:
- conn.close()
- def validate_stock_code(code):
- """规范股票代码格式为交易所.6位数字(增强创业板/科创板处理)[7,12](@ref)"""
- if not isinstance(code, str):
- code = str(code)
-
- code = code.strip().lower().replace(" ", "")
-
- if code.startswith(("sh.", "sz.", "bj.")):
- prefix, num = code.split(".")
- return f"{prefix}.{num.zfill(6)}"
-
- if code.isdigit():
- code = code.zfill(6)
- first_char = code[0]
- if first_char in ['5','6','9','7'] or code.startswith("688"):
- return f"sh.{code}"
- elif first_char in ['0','1','2','3'] or code.startswith("30"):
- return f"sz.{code}"
- elif code.startswith("8"):
- return f"bj.{code}"
-
- raise ValueError(f"无法识别的股票代码格式: {code}")
- def get_index_components(index_func):
- """通用指数成分股获取函数(支持自动重试)[8,11](@ref)"""
- max_retries = 3
- for attempt in range(max_retries):
- try:
- rs = index_func()
- if rs.error_code == '0':
- return {validate_stock_code(item[1]) for item in rs.data}
- except Exception as e:
- print(f"指数查询第{attempt+1}次失败: {str(e)}")
- if attempt == max_retries - 1:
- return set()
- return set()
- def get_all_stocks_with_industry():
- """获取全市场股票及指数成分标记"""
- try:
- # 登录系统
- if bs.login().error_code != '0':
- raise ConnectionError("Baostock登录失败")
- # 获取三大指数成分股(提前获取避免多次查询)[7,11](@ref)
- sz50_set = get_index_components(bs.query_sz50_stocks)
- hs300_set = get_index_components(bs.query_hs300_stocks)
- zz500_set = get_index_components(bs.query_zz500_stocks)
- print(f"指数成分股数量:SZ50={len(sz50_set)}, HS300={len(hs300_set)}, ZZ500={len(zz500_set)}")
- # 获取全市场股票(含最新交易状态)[13](@ref)
- query_date = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
- rs = bs.query_all_stock(day=query_date)
- if rs.error_code != '0':
- raise ValueError(f"股票查询失败:{rs.error_msg}")
- # 处理原始数据(增强状态过滤)[13](@ref)
- raw_df = pd.DataFrame(rs.data, columns=rs.fields)
- valid_df = raw_df[raw_df['tradeStatus'] == '1'] # 过滤有效交易股票
- print(f"初始数据量:{len(raw_df)},有效交易股票:{len(valid_df)}")
- # 代码格式转换
- valid_df['valid_code'] = valid_df['code'].apply(lambda x: validate_stock_code(x))
-
- # 获取行业数据(批量查询优化)[3,9](@ref)
- industry_rs = bs.query_stock_industry()
- industry_data = []
- fields = industry_rs.fields
- while industry_rs.next():
- row = industry_rs.get_row_data()
- try:
- industry_data.append({
- 'valid_code': validate_stock_code(row[1]),
- 'industry': row[3],
- 'industry_type': row[4]
- })
- except Exception as e:
- print(f"跳过无效行业数据: {row[1]}, 原因: {str(e)}")
- industry_df = pd.DataFrame(industry_data)
- # 合并数据
- merged_df = pd.merge(
- valid_df[['valid_code', 'code_name']].rename(columns={'valid_code':'code', 'code_name':'name'}),
- industry_df,
- left_on='code',
- right_on='valid_code',
- how='left'
- ).drop(columns=['valid_code'])
- # 标记指数成分[7,11](@ref)
- merged_df['issz50'] = merged_df['code'].isin(sz50_set).astype(int)
- merged_df['ishs300'] = merged_df['code'].isin(hs300_set).astype(int)
- merged_df['iszz500'] = merged_df['code'].isin(zz500_set).astype(int)
- # 拆分交易所代码
- merged_df['exchange'] = merged_df['code'].str.split('.').str[0]
- merged_df['code'] = merged_df['code'].str.split('.').str[1]
-
- # 填充缺失值
- merged_df.fillna({
- 'industry': '未知',
- 'industry_type': '未分类'
- }, inplace=True)
- return merged_df[[
- 'code', 'name', 'industry', 'industry_type',
- 'issz50', 'ishs300', 'iszz500', 'exchange'
- ]]
- except Exception as e:
- print(f"处理失败:{str(e)}")
- return None
- finally:
- bs.logout()
- if __name__ == "__main__":
- df = get_all_stocks_with_industry()
- if df is not None:
- print("\n数据样例:")
- print(df[df['industry'] != '未知'].head(5))
- save_to_sqlite(df, r"../project/data/AAshares/code.db", "all_stocks")
复制代码 0x02 获取日K线数据
- import baostock as bs
- import pandas as pd
- import sqlite3
- from tqdm import tqdm
- import time
- from datetime import datetime
- def get_stock_codes(conn):
- """从数据库获取带交易所前缀的股票代码(结构不变)"""
- df = pd.read_sql('SELECT exchange, code FROM all_stocks', conn)
- return [f"{row['exchange'].lower().strip()}.{str(row['code']).zfill(6)}"
- for _, row in df.iterrows()]
- def download_daily_data(code, start_date, end_date, max_retries=3):
- """下载日K线数据(仅修改frequency和fields)"""
- for attempt in range(max_retries):
- rs = bs.query_history_k_data_plus(
- code=code,
- fields="date,code,open,high,low,close,volume,amount",
- start_date=start_date,
- end_date=end_date,
- frequency="d", # 改为日线[1,7](@ref)
- adjustflag="2" # 保持前复权
- )
- if rs.error_code == '0':
- return rs
- time.sleep(1)
- return rs
- def process_daily_data(rs):
- """处理日K线数据(移除时间合并步骤)"""
- data_list = []
- while rs.next():
- data_list.append(rs.get_row_data())
- df = pd.DataFrame(data_list, columns=rs.fields)
-
- # 数据清洗(仅保留日期处理)
- df['code'] = df['code'].str.split('.').str[1]
- numeric_cols = ['open', 'high', 'low', 'close', 'volume', 'amount']
- df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')
- return df.dropna().reset_index(drop=True)
- def save_to_db(df, conn):
- # 将DataFrame转换为字典列表
- records = df.to_dict('records')
- for record in records:
- try:
- # 单条插入语句
- conn.execute(
- """
- INSERT OR IGNORE INTO stock_day_k
- (code, date, open, close, high, low, volume, amount)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?)
- """,
- (record['code'], record['date'],
- record['open'], record['close'],
- record['high'], record['low'],
- record['volume'], record['amount'])
- )
- except IntegrityError:
- print(f"主键冲突已跳过:{record['code']} - {record['date']}")
- continue
- conn.commit()
- def main():
- # 初始化数据库(修改表结构)[5](@ref)
- conn = sqlite3.connect(r"../project/data/AAshares/code.db")
- conn.executescript('''
- CREATE TABLE IF NOT EXISTS stock_day_k (
- date TEXT, code TEXT, open REAL, high REAL,
- low REAL, close REAL, volume INTEGER, amount REAL,
- PRIMARY KEY (date, code)
- );
- CREATE INDEX IF NOT EXISTS idx_day ON stock_day_k(code);
- ''')
- # 登录BaoStock(保持原登录逻辑)
- if bs.login().error_code != '0':
- print("登录失败")
- return
- try:
- codes = get_stock_codes(conn)
- start_date = '2025-01-01' # 保持时间范围不变
- end_date = '2025-04-01'
-
- for code in tqdm(codes, desc="下载进度"):
- try:
- rs = download_daily_data(code, start_date, end_date)
- if rs.error_code != '0': continue
-
- df = process_daily_data(rs)
- if not df.empty:
- save_to_db(df, conn)
-
- time.sleep(0.5) # 保持反爬策略不变[8](@ref)
-
- except Exception as e:
- print(f"{code} 下载失败: {str(e)}")
-
- finally:
- conn.close()
- bs.logout()
- if __name__ == "__main__":
- main()
复制代码 0x03 获取5分钟K线数据
- import baostock as bs
- import pandas as pd
- import sqlite3
- from tqdm import tqdm
- import time
- from datetime import datetime
- def get_stock_codes(conn):
- """从数据库获取带交易所前缀的股票代码(网页6最佳实践)"""
- df = pd.read_sql('SELECT exchange, code FROM all_stocks', conn)
- return [f"{row['exchange'].lower().strip()}.{str(row['code']).zfill(6)}"
- for _, row in df.iterrows()]
- def download_5min_data(code, start_date, end_date, max_retries=3):
- """下载5分钟K线数据(含错误重试机制,网页8优化)"""
- for attempt in range(max_retries):
- rs = bs.query_history_k_data_plus(
- code=code,
- fields="date,time,code,open,high,low,close,volume,amount",
- start_date=start_date,
- end_date=end_date,
- frequency="5",
- adjustflag="2" # 前复权
- )
- if rs.error_code == '0':
- return rs
- time.sleep(1)
- return rs
- def process_5min_data(rs):
- """处理5分钟数据格式(网页7时间处理优化)"""
- data_list = []
- while rs.next():
- data_list.append(rs.get_row_data())
- df = pd.DataFrame(data_list, columns=rs.fields)
-
- # 合并日期和时间字段
- df['datetime'] = df['time'].apply(
- lambda x: f"{x[:4]}-{x[4:6]}-{x[6:8]} {x[8:10]}:{x[10:12]}:00")
- df = df.drop(columns=['date', 'time'])
- df.rename(columns={'datetime': 'date'}, inplace=True)
- # 数据清洗
- df['code'] = df['code'].str.split('.').str[1]
- numeric_cols = ['open', 'high', 'low', 'close', 'volume','amount']
- df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')
- return df.dropna().reset_index(drop=True)
- def save_to_db(df, conn):
- # 将DataFrame转换为字典列表
- records = df.to_dict('records')
- for record in records:
- try:
- # 单条插入语句
- conn.execute(
- """
- INSERT OR IGNORE INTO stock_5min_k
- (code, date, open, close, high, low, volume, amount)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?)
- """,
- (record['code'], record['date'],
- record['open'], record['close'],
- record['high'], record['low'],
- record['volume'], record['amount'])
- )
- except IntegrityError:
- print(f"主键冲突已跳过:{record['code']} - {record['date']}")
- continue
- conn.commit()
- def main():
- # 初始化数据库(网页6表结构)
- conn = sqlite3.connect(r"../project/data/AAshares/code.db")
- conn.executescript('''
- CREATE TABLE IF NOT EXISTS stock_5min_k (
- date TEXT, code TEXT, open REAL, high REAL,
- low REAL, close REAL, volume INTEGER,amount REAL,
- PRIMARY KEY (date, code)
- );
- CREATE INDEX IF NOT EXISTS idx_5min ON stock_5min_k(code);
- ''')
- # 登录BaoStock(网页7规范)
- if bs.login().error_code != '0':
- print("登录失败")
- return
- try:
- codes = get_stock_codes(conn)
- start_date = '2025-04-19'
- end_date = datetime.now().strftime('%Y-%m-%d') # 格式为 'YYYY-MM-DD'
-
- for code in tqdm(codes, desc="下载进度"):
- try:
- rs = download_5min_data(code, start_date, end_date)
- if rs.error_code != '0': continue
-
- df = process_5min_data(rs)
- if not df.empty:
- save_to_db(df, conn)
-
- time.sleep(0.5) # 反爬策略(网页8建议)
-
- except Exception as e:
- print(f"{code} 下载失败: {str(e)}")
-
- finally:
- conn.close()
- bs.logout()
- if __name__ == "__main__":
- main()
复制代码 0x04 通过web将获取的数据读出来
- import csv
- from io import StringIO
- from flask import Flask, jsonify, request, Response
- import sqlite3
- from datetime import datetime, timedelta, timezone
- import pandas as pd
- import requests
- app = Flask(__name__)
- def get_db_connection():
- conn = sqlite3.connect(r"../project/data/AAshares/code.db")
- conn.row_factory = sqlite3.Row
- return conn
- def getdatasqlite(code,table_name):
- """统一处理数据请求"""
- conn = get_db_connection()
- cursor = conn.cursor()
- cursor.execute(f"""
- SELECT
- date AS Date,
- open AS Open,
- close AS Close,
- high AS High,
- low AS Low,
- volume AS Volume,
- amount AS Amount
- FROM {table_name}
- WHERE code = ?
- ORDER BY date
- """, (code,))
-
- data = cursor.fetchall()
- converted_data = []
- for row in data:
- # 按新的字段顺序解析
- date_str = row[0] # 直接使用数据库返回的日期字符串[3,8](@ref)
- # 添加时区转换逻辑(假设数据库存储UTC时间)
- if(table_name=="stock_5min_k"):
- local_time = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S") # 添加秒并解析为本地时间
- else:
- local_time = datetime.strptime(date_str, "%Y-%m-%d") # 添加秒并解析为本地时间
- date_time_str = local_time.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") # 转换为UTC时间
- converted = {
- "Date": date_time_str, # 统一时间格式[5,7](@ref)
- "Open": float(row[1]),
- "Close": float(row[2]),
- "High": float(row[3]),
- "Low": float(row[4]),
- "Volume": float(row[5]), # 交易量
- "Amount": float(row[6]) # 修正字段索引[2,6](@ref)
- }
- converted_data.append(converted)
- return converted_data
- def getdatahttp(code,table_name):
- # 请求第三方接口
- if code.startswith("6"):
- secid = "1." + code
- else:
- secid = "0." + code
- # end_date = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')
- end_date = (datetime.now()).strftime('%Y%m%d')
- if(table_name=="stock_5min_k"):
- api_url = f"https://push2his.eastmoney.com/api/qt/stock/kline/get?secid={secid}&fields1=f1%2Cf2%2Cf3%2Cf4%2Cf5%2Cf6&fields2=f51%2Cf52%2Cf53%2Cf54%2Cf55%2Cf56%2Cf57%2Cf58%2Cf59%2Cf60%2Cf61&klt=5&fqt=1&end={end_date}&lmt=1488" # 替换为实际API地址
- else:
- api_url = f"https://push2his.eastmoney.com/api/qt/stock/kline/get?secid={secid}&fields1=f1%2Cf2%2Cf3%2Cf4%2Cf5%2Cf6&fields2=f51%2Cf52%2Cf53%2Cf54%2Cf55%2Cf56%2Cf57%2Cf58%2Cf59%2Cf60%2Cf61&klt=101&fqt=1&end={end_date}&lmt=1488" # 替换为实际API地址
- try:
- response = requests.get(api_url, timeout=10)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- return {"error": f"API请求失败: {str(e)}"}, 500
- # 解析原始数据
- raw_data = response.json()
- klines = raw_data.get('data', {}).get('klines', [])
-
- # 数据清洗转换
- converted_data = []
- for item in klines:
- parts = item.split(',')
- if len(parts) < 11:
- continue # 跳过无效数据
-
- # 按字段位置解析数据
- # 按字段位置解析数据
- if(table_name=="stock_5min_k"):
- local_time = datetime.strptime(parts[0] + ":00", "%Y-%m-%d %H:%M:%S") # 添加秒并解析为本地时间
- else:
- local_time = datetime.strptime(parts[0] , "%Y-%m-%d") # 添加秒并解析为本地时间
- date_time_str = local_time.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") # 转换为UTC时间
- converted = {
- "Date": date_time_str,
- "Open": float(parts[1]),
- "Close": float(parts[2]),
- "High": float(parts[3]),
- "Low": float(parts[4]),
- "Volume": float(parts[5]), # 交易量
- "Amount": float(parts[6]), # 交易额
- }
- converted_data.append(converted)
- return converted_data
- def handle_request(code, table_name, format_type='json'):
- converted_data = getdatasqlite(code,table_name)
- # ...后续格式转换逻辑保持不变...
- # 返回数据
- if not converted_data:
- return {"error": "没有有效数据"}, 404
- # 根据要求格式化输出
- if format_type == 'csv':
- # 生成CSV
- si = StringIO()
- writer = csv.DictWriter(si, fieldnames=converted_data[0].keys())
- writer.writeheader()
- writer.writerows(converted_data) # 写入多行数据
- return si.getvalue(), 200, {'Content-Type': 'text/csv'}
-
- return jsonify(converted_data[-1]), 200
- @app.route('/dayapi', methods=['GET'])
- def get_daystock_data():
- code = request.args.get('code')
- format_type = request.args.get('format', default='json')
- if not code:
- return jsonify({"error": "缺少参数: code"}), 400
- return handle_request(code, 'stock_day_k', format_type)
- @app.route('/api', methods=['GET'])
- def get_stock_data():
- code = request.args.get('code')
- format_type = request.args.get('format', default='json')
- if not code:
- return jsonify({"error": "缺少参数: code"}), 400
- return handle_request(code, 'stock_5min_k', format_type)
- if __name__ == '__main__':
- app.run(host='0.0.0.0', port=80, debug=True)
复制代码 访问 http://ip/dayapi?code={Value} 获取最新的一根K线
访问 http://ip/dayapi?code={Value}&format=csv 返回的是汗青的K线csv
如果成功了,则完成了基础数据的搭建,这个web是为了给Lean QuantConnect调用的。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
|