Python对接StockTV环球金融数据API的封装实现及使用教程:
- import requests
- import websockets
- import asyncio
- from typing import Dict, List, Optional, Union
- from datetime import datetime
- class StockTVClient:
- """
- StockTV全球金融数据API客户端
- 支持股票、外汇、期货、加密货币等市场数据
- """
-
- BASE_URL = "https://api.stocktv.top"
- WS_URL = "wss://ws-api.stocktv.top/connect"
-
- def __init__(self, api_key: str):
- """
- 初始化客户端
- :param api_key: API密钥,需通过官方渠道获取
- """
- self.api_key = api_key
- self.session = requests.Session()
- self.session.headers.update({"User-Agent": "StockTV-PythonClient/1.0"})
- def _handle_response(self, response: requests.Response) -> Union[Dict, List]:
- """统一处理API响应"""
- if response.status_code != 200:
- raise Exception(f"API请求失败,状态码:{response.status_code},响应:{response.text}")
- return response.json()
- # ------------------ 股票市场接口 ------------------
- def get_stock_markets(
- self,
- country_id: int,
- page: int = 1,
- page_size: int = 10
- ) -> Dict:
- """
- 获取股票市场列表
- :param country_id: 国家ID(例如14代表印度)
- :param page: 页码
- :param page_size: 每页数量
- """
- endpoint = "/stock/stocks"
- params = {
- "countryId": country_id,
- "page": page,
- "pageSize": page_size,
- "key": self.api_key
- }
- response = self.session.get(f"{self.BASE_URL}{endpoint}", params=params)
- return self._handle_response(response)
- def get_stock_kline(
- self,
- pid: int,
- interval: str = "PT15M",
- start_time: Optional[int] = None,
- end_time: Optional[int] = None
- ) -> List[Dict]:
- """
- 获取股票K线数据
- :param pid: 产品ID
- :param interval: 时间间隔(PT5M, PT15M, PT1H等)
- :param start_time: 开始时间戳(可选)
- :param end_time: 结束时间戳(可选)
- """
- endpoint = "/stock/kline"
- params = {
- "pid": pid,
- "interval": interval,
- "key": self.api_key
- }
- if start_time:
- params["startTime"] = start_time
- if end_time:
- params["endTime"] = end_time
-
- response = self.session.get(f"{self.BASE_URL}{endpoint}", params=params)
- return self._handle_response(response)
- # ------------------ 外汇接口 ------------------
- def get_forex_rates(self, base_currency: str = "USD") -> Dict:
- """获取实时外汇汇率"""
- endpoint = "/market/currencyList"
- params = {"key": self.api_key}
- response = self.session.get(f"{self.BASE_URL}{endpoint}", params=params)
- data = self._handle_response(response)
- return data.get("conversions", {}).get(base_currency, {})
- # ------------------ WebSocket实时数据 ------------------
- async def websocket_client(self, callback):
- """
- WebSocket实时数据客户端
- :param callback: 数据处理回调函数
- """
- url = f"{self.WS_URL}?key={self.api_key}"
- async with websockets.connect(url) as ws:
- while True:
- try:
- data = await ws.recv()
- await callback(json.loads(data))
- # 发送心跳保持连接
- await asyncio.sleep(30)
- await ws.send("ping")
- except Exception as e:
- print(f"WebSocket错误: {str(e)}")
- break
- # ================== 使用示例 ==================
- if __name__ == "__main__":
- API_KEY = "YOUR_API_KEY" # 替换为实际API密钥
-
- # 初始化客户端
- client = StockTVClient(API_KEY)
-
- # 示例1:获取印度股票市场列表
- india_stocks = client.get_stock_markets(country_id=14)
- print(f"印度股票市场数据:{india_stocks['data']['records'][0]}")
-
- # 示例2:获取股票K线数据
- kline_data = client.get_stock_kline(pid=7310, interval="PT1H")
- print(f"最新K线数据:{kline_data['data'][-1]}")
-
- # 示例3:WebSocket实时数据
- async def handle_realtime_data(data):
- """处理实时数据回调函数"""
- print(f"实时更新:{data}")
-
- async def main():
- await client.websocket_client(handle_realtime_data)
-
- # 运行WebSocket客户端
- asyncio.get_event_loop().run_until_complete(main())
复制代码 关键功能分析:
- 使用面向对象封装,方便扩展和维护
- 分离差别市场接口(股票、外汇等)
- 同一响应处理机制
- 参数和返回值均使用Python类型提示
- 进步代码可读性和IDE支持
- 同一HTTP响应处理
- WebSocket自动重连机制
- 非常捕获和提示
- 支持同步HTTP请求和异步WebSocket
- 机动的时间参数处理(支持时间戳)
- 可配置的分页参数
- 使用requests.Session保持毗连池
- 自定义User-Agent标识
- 美满的文档字符串
- 符合PEP8编码规范
扩展发起:
- from functools import lru_cache
- class StockTVClient:
- @lru_cache(maxsize=128)
- def get_stock_info(self, pid: int):
- """带缓存的股票信息查询"""
- # 实现代码...
复制代码- import aiohttp
- async def async_get_stock_markets(self, country_id: int):
- async with aiohttp.ClientSession() as session:
- async with session.get(url, params=params) as response:
- return await response.json()
复制代码- def convert_kline_to_dataframe(kline_data: List) -> pd.DataFrame:
- """将K线数据转换为Pandas DataFrame"""
- return pd.DataFrame(
- kline_data,
- columns=["timestamp", "open", "high", "low", "close", "volume", "vo"]
- ).set_index('timestamp')
复制代码 使用注意事项:
- 申请并妥善保管API密钥
- 遵守API调用频率限制
- 处理时区转换(全部时间戳为UTC)
- 使用try/except块捕获潜在非常
- 生产环境发起添加重试机制
完备项目应包罗:
- 单元测试
- 日志记录
- 配置文件管理
- 更美满的类型定义
- API文档生成(使用Sphinx等)
该实现为开发者提供了可靠的数据接入根本,可根据详细需求进一步扩展功能模块。发起共同官方文档使用,及时关注API更新。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |