IT评测·应用市场-qidao123.com
标题:
对接马来西亚、印度、韩国、越南等环球金融数据示例
[打印本页]
作者:
伤心客
时间:
2025-3-22 14:49
标题:
对接马来西亚、印度、韩国、越南等环球金融数据示例
Python对接StockTV环球金融数据API的封装实现及使用教程:
import requests
import websockets
import asyncio
from typing import Dict, List, Optional, Union
from datetime import datetime
class StockTVClient:
"""
StockTV全球金融数据API客户端
支持股票、外汇、期货、加密货币等市场数据
"""
BASE_URL = "https://api.stocktv.top"
WS_URL = "wss://ws-api.stocktv.top/connect"
def __init__(self, api_key: str):
"""
初始化客户端
:param api_key: API密钥,需通过官方渠道获取
"""
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({"User-Agent": "StockTV-PythonClient/1.0"})
def _handle_response(self, response: requests.Response) -> Union[Dict, List]:
"""统一处理API响应"""
if response.status_code != 200:
raise Exception(f"API请求失败,状态码:{response.status_code},响应:{response.text}")
return response.json()
# ------------------ 股票市场接口 ------------------
def get_stock_markets(
self,
country_id: int,
page: int = 1,
page_size: int = 10
) -> Dict:
"""
获取股票市场列表
:param country_id: 国家ID(例如14代表印度)
:param page: 页码
:param page_size: 每页数量
"""
endpoint = "/stock/stocks"
params = {
"countryId": country_id,
"page": page,
"pageSize": page_size,
"key": self.api_key
}
response = self.session.get(f"{self.BASE_URL}{endpoint}", params=params)
return self._handle_response(response)
def get_stock_kline(
self,
pid: int,
interval: str = "PT15M",
start_time: Optional[int] = None,
end_time: Optional[int] = None
) -> List[Dict]:
"""
获取股票K线数据
:param pid: 产品ID
:param interval: 时间间隔(PT5M, PT15M, PT1H等)
:param start_time: 开始时间戳(可选)
:param end_time: 结束时间戳(可选)
"""
endpoint = "/stock/kline"
params = {
"pid": pid,
"interval": interval,
"key": self.api_key
}
if start_time:
params["startTime"] = start_time
if end_time:
params["endTime"] = end_time
response = self.session.get(f"{self.BASE_URL}{endpoint}", params=params)
return self._handle_response(response)
# ------------------ 外汇接口 ------------------
def get_forex_rates(self, base_currency: str = "USD") -> Dict:
"""获取实时外汇汇率"""
endpoint = "/market/currencyList"
params = {"key": self.api_key}
response = self.session.get(f"{self.BASE_URL}{endpoint}", params=params)
data = self._handle_response(response)
return data.get("conversions", {}).get(base_currency, {})
# ------------------ WebSocket实时数据 ------------------
async def websocket_client(self, callback):
"""
WebSocket实时数据客户端
:param callback: 数据处理回调函数
"""
url = f"{self.WS_URL}?key={self.api_key}"
async with websockets.connect(url) as ws:
while True:
try:
data = await ws.recv()
await callback(json.loads(data))
# 发送心跳保持连接
await asyncio.sleep(30)
await ws.send("ping")
except Exception as e:
print(f"WebSocket错误: {str(e)}")
break
# ================== 使用示例 ==================
if __name__ == "__main__":
API_KEY = "YOUR_API_KEY" # 替换为实际API密钥
# 初始化客户端
client = StockTVClient(API_KEY)
# 示例1:获取印度股票市场列表
india_stocks = client.get_stock_markets(country_id=14)
print(f"印度股票市场数据:{india_stocks['data']['records'][0]}")
# 示例2:获取股票K线数据
kline_data = client.get_stock_kline(pid=7310, interval="PT1H")
print(f"最新K线数据:{kline_data['data'][-1]}")
# 示例3:WebSocket实时数据
async def handle_realtime_data(data):
"""处理实时数据回调函数"""
print(f"实时更新:{data}")
async def main():
await client.websocket_client(handle_realtime_data)
# 运行WebSocket客户端
asyncio.get_event_loop().run_until_complete(main())
复制代码
关键功能分析:
模块化计划
:
使用面向对象封装,方便扩展和维护
分离差别市场接口(股票、外汇等)
同一响应处理机制
类型提示
:
参数和返回值均使用Python类型提示
进步代码可读性和IDE支持
错误处理
:
同一HTTP响应处理
WebSocket自动重连机制
非常捕获和提示
高级功能
:
支持同步HTTP请求和异步WebSocket
机动的时间参数处理(支持时间戳)
可配置的分页参数
最佳实践
:
使用requests.Session保持毗连池
自定义User-Agent标识
美满的文档字符串
符合PEP8编码规范
扩展发起:
缓存机制
:
from functools import lru_cache
class StockTVClient:
@lru_cache(maxsize=128)
def get_stock_info(self, pid: int):
"""带缓存的股票信息查询"""
# 实现代码...
复制代码
异步HTTP请求
:
import aiohttp
async def async_get_stock_markets(self, country_id: int):
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
return await response.json()
复制代码
数据转换工具
:
def convert_kline_to_dataframe(kline_data: List) -> pd.DataFrame:
"""将K线数据转换为Pandas DataFrame"""
return pd.DataFrame(
kline_data,
columns=["timestamp", "open", "high", "low", "close", "volume", "vo"]
).set_index('timestamp')
复制代码
使用注意事项:
申请并妥善保管API密钥
遵守API调用频率限制
处理时区转换(全部时间戳为UTC)
使用try/except块捕获潜在非常
生产环境发起添加重试机制
完备项目应包罗:
单元测试
日志记录
配置文件管理
更美满的类型定义
API文档生成(使用Sphinx等)
该实现为开发者提供了可靠的数据接入根本,可根据详细需求进一步扩展功能模块。发起共同官方文档使用,及时关注API更新。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/)
Powered by Discuz! X3.4