期货数据接口使用注意事项
1. 准备工作
接口类型:实时行情接口
查询品种:贵金属,商品期货,外汇,A股,港股,美股
接口类型:HTTP, WebSocket
申请密钥:https://infoway.io
官方对接文档:https://infoway.readme.io/reference/ws-subscription
2. WebSocket订阅贵金属行情
import json
import time
import schedule
import threading
import websocket
from loguru import logger
class WebsocketExample:
def __init__(self):
self.session = None
# 申请免费API Key: https://infoway.io
self.ws_url = "wss://data.infoway.io/ws?business=common&apikey=YourAPIKey"
self.reconnecting = False
self.last_ping_time = 0
self.max_ping_interval = 30 # 最大心跳包间隔时间
self.retry_attempts = 0 # 重试次数
self.max_retry_attempts = 5 # 最大重试次数
def connect_all(self):
"""建立WebSocket连接并启动自动重连机制"""
try:
self.connect(self.ws_url)
self.start_reconnection(self.ws_url)
except Exception as e:
logger.error(f"Failed to connect to {self.ws_url}: {str(e)}")
def start_reconnection(self, url):
"""启动定时重连检查"""
def check_connection():
if not self.is_connected():
logger.debug("Reconnection attempt...")
self.retry_attempts += 1
if self.retry_attempts <= self.max_retry_attempts:
self.connect(url)
else:
logger.error("Exceeded max retry attempts.")
# 使用线程定期检查连接状态
threading.Thread(target=lambda: schedule.every(10).seconds.do(check_connection), daemon=True).start()
def is_connected(self):
"""检查WebSocket连接状态"""
return self.session and self.session.connected
def connect(self, url):
"""建立WebSocket连接"""
try:
if self.is_connected():
self.session.close()
self.session = websocket.WebSocketApp(
url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
# 启动WebSocket连接(非阻塞模式)
threading.Thread(target=self.session.run_forever, daemon=True).start()
except Exception as e:
logger.error(f"Failed to connect to the server: {str(e)}")
def on_open(self, ws):
"""WebSocket连接建立成功后的回调"""
logger.info(f"Connection opened")
try:
# 发送贵金属实时成交明细订阅请求
trade_send_obj = {
"code": 10000,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
"data": {"codes": "XAUUSD"} # XAUUSD 为贵金属黄金(黄金/美元)
}
self.send_message(trade_send_obj)
# 不同请求之间间隔一段时间
time.sleep(5)
# 发送贵金属实时盘口数据订阅请求
depth_send_obj = {
"code": 10003,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
"data": {"codes": "XAUUSD"} # XAUUSD为黄金的实时盘口数据
}
self.send_message(depth_send_obj)
# 不同请求之间间隔一段时间
time.sleep(5)
# 发送贵金属实时K线数据订阅请求
kline_data = {
"arr": [
{
"type": 1,
"codes": "XAUUSD" # XAUUSD 为黄金K线数据
}
]
}
kline_send_obj = {
"code": 10006,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
"data": kline_data
}
self.send_message(kline_send_obj)
# 启动定时心跳任务
threading.Thread(target=lambda: schedule.every(30).seconds.do(self.ping), daemon=True).start()
except Exception as e:
logger.error(f"Error sending initial messages: {str(e)}")
def on_message(self, ws, message):
"""接收消息的回调"""
try:
logger.info(f"Message received: {message}")
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
def on_close(self, ws, close_status_code, close_msg):
"""连接关闭的回调"""
logger.info(f"Connection closed: {close_status_code} - {close_msg}")
def on_error(self, ws, error):
"""错误处理的回调"""
logger.error(f"WebSocket error: {str(error)}")
def send_message(self, message_obj):
"""发送消息到WebSocket服务器"""
if self.is_connected():
try:
self.session.send(json.dumps(message_obj))
except Exception as e:
logger.error(f"Error sending message: {str(e)}")
else:
logger.warning("Cannot send message: Not connected")
def ping(self):
"""发送心跳包"""
current_time = time.time()
if current_time - self.last_ping_time >= self.max_ping_interval:
ping_obj = {
"code": 10010,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a"
}
self.send_message(ping_obj)
self.last_ping_time = current_time
else:
logger.debug(f"Ping skipped: Time interval between pings is less than {self.max_ping_interval} seconds.")
# 使用示例
if __name__ == "__main__":
ws_client = WebsocketExample()
ws_client.connect_all()
# 保持主线程运行
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logger.info("Exiting...")
if ws_client.is_connected():
ws_client.session.close()
3. 使用期货数据接口的注意事项
3.1 处理连接和心跳
- 保持连接:WebSocket是持久连接,所以需要确保连接保持活跃。如果连接断开,需要实现自动重连机制。
- 心跳包机制:大多数WebSocket服务会提供心跳包(ping/pong)机制来检测连接是否活跃。客户端需要定期发送心跳包,以防连接超时或断开。
- 通常,期货数据WebSocket接口会要求客户端每隔一段时间(例如30秒或60秒)发送一个ping请求,如果没有收到响应,则连接可能会被断开。
- 需要根据API文档配置合适的心跳时间间隔。
3.2 确保消息顺序
- 消息顺序问题:WebSocket是基于流的协议,因此在某些情况下,消息可能会乱序到达。对于期货数据,特别是实时行情数据,乱序可能导致错误的数据解释。
- 消息的去重与同步:在客户端处理实时数据时,需要设计去重机制,并确保历史数据和实时数据同步。例如,处理成交量、价格等信息时,要确保这些信息是按时间顺序正确处理。
3.3 数据订阅
- 选择合适的数据订阅类型:期货数据接口通常提供多种数据类型(例如实时成交、盘口数据、K线数据等),需要根据实际需求订阅相关的数据流。
- 批量订阅:为了提高效率,可以批量订阅多个品种的数据,而不是单独为每个品种订阅。但要注意API限制,避免超出最大订阅数限制。
- 取消订阅:如果不再需要某个数据流,及时取消订阅以减少网络负担和资源占用。
3.4 处理消息和数据格式
- JSON格式:大多数期货WebSocket数据接口使用JSON格式传输数据。确保客户端能够正确解析JSON,并能处理可能出现的异常或错误格式的数据。
- 数据解码与验证:收到的数据需要进行校验(例如验证是否是预期格式、是否包含错误代码、是否包含丢失的字段等)。
3.5 容错和错误处理
- 错误回调机制:确保客户端实现了适当的错误回调机制,能够在连接失败、消息处理失败等情况下作出响应。
- 错误日志记录:记录错误日志非常重要,这有助于后续问题排查和性能分析。
- 异常处理:处理可能出现的异常,如网络中断、数据解析失败、连接超时等。
3.6 连接重试和恢复
- 自动重连机制:当WebSocket连接丢失或中断时,需要实现自动重连机制,并在成功连接后重新订阅所需的数据流。
- 重试次数限制:为了避免无限重试,设置最大重试次数。例如,连接失败超过5次后,停止重试并报告错误。
- 恢复状态:在重新连接后,确保客户端能够恢复到断开时的状态,并重新订阅所需的数据。
3.7 性能优化
- 高并发处理:期货市场的数据量通常较大,尤其是高频交易的数据。需要确保客户端能够高效地处理大量的实时数据。
- 数据过滤与处理:客户端可以根据业务需求过滤不必要的数据,以减少带宽占用和计算负担。
- 内存管理:实时数据流可能会占用大量内存,特别是在长时间运行时。需要确保有合适的内存管理措施,如清理过期数据、设置缓存大小限制等。
本作品采用《CC 协议》,转载必须注明作者和本文链接