期货数据接口使用注意事项

1. 准备工作

接口类型:实时行情接口
查询品种:贵金属,商品期货,外汇,A股,港股,美股
接口类型:HTTP, WebSocket
申请密钥:https://infoway.io
官方对接文档:https://infoway.readme.io/reference/ws-subscription

2. WebSocket订阅贵金属行情

import json
import time
import schedule
import threading
import websocket
from loguru import logger

class WebsocketExample:
    def __init__(self):
        self.session = None
        # 申请免费API Key: https://infoway.io
        self.ws_url = "wss://data.infoway.io/ws?business=common&apikey=YourAPIKey" 
        self.reconnecting = False
        self.last_ping_time = 0
        self.max_ping_interval = 30  # 最大心跳包间隔时间
        self.retry_attempts = 0  # 重试次数
        self.max_retry_attempts = 5  # 最大重试次数

    def connect_all(self):
        """建立WebSocket连接并启动自动重连机制"""
        try:
            self.connect(self.ws_url)
            self.start_reconnection(self.ws_url)
        except Exception as e:
            logger.error(f"Failed to connect to {self.ws_url}: {str(e)}")

    def start_reconnection(self, url):
        """启动定时重连检查"""
        def check_connection():
            if not self.is_connected():
                logger.debug("Reconnection attempt...")
                self.retry_attempts += 1
                if self.retry_attempts <= self.max_retry_attempts:
                    self.connect(url)
                else:
                    logger.error("Exceeded max retry attempts.")

        # 使用线程定期检查连接状态
        threading.Thread(target=lambda: schedule.every(10).seconds.do(check_connection), daemon=True).start()

    def is_connected(self):
        """检查WebSocket连接状态"""
        return self.session and self.session.connected

    def connect(self, url):
        """建立WebSocket连接"""
        try:
            if self.is_connected():
                self.session.close()

            self.session = websocket.WebSocketApp(
                url,
                on_open=self.on_open,
                on_message=self.on_message,
                on_error=self.on_error,
                on_close=self.on_close
            )

            # 启动WebSocket连接(非阻塞模式)
            threading.Thread(target=self.session.run_forever, daemon=True).start()
        except Exception as e:
            logger.error(f"Failed to connect to the server: {str(e)}")

    def on_open(self, ws):
        """WebSocket连接建立成功后的回调"""
        logger.info(f"Connection opened")

        try:
            # 发送贵金属实时成交明细订阅请求
            trade_send_obj = {
                "code": 10000,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
                "data": {"codes": "XAUUSD"}  # XAUUSD 为贵金属黄金(黄金/美元)
            }
            self.send_message(trade_send_obj)

            # 不同请求之间间隔一段时间
            time.sleep(5)

            # 发送贵金属实时盘口数据订阅请求
            depth_send_obj = {
                "code": 10003,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
                "data": {"codes": "XAUUSD"}  # XAUUSD为黄金的实时盘口数据
            }
            self.send_message(depth_send_obj)

            # 不同请求之间间隔一段时间
            time.sleep(5)

            # 发送贵金属实时K线数据订阅请求
            kline_data = {
                "arr": [
                    {
                        "type": 1,
                        "codes": "XAUUSD"  # XAUUSD 为黄金K线数据
                    }
                ]
            }
            kline_send_obj = {
                "code": 10006,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
                "data": kline_data
            }
            self.send_message(kline_send_obj)

            # 启动定时心跳任务
            threading.Thread(target=lambda: schedule.every(30).seconds.do(self.ping), daemon=True).start()

        except Exception as e:
            logger.error(f"Error sending initial messages: {str(e)}")

    def on_message(self, ws, message):
        """接收消息的回调"""
        try:
            logger.info(f"Message received: {message}")
        except Exception as e:
            logger.error(f"Error processing message: {str(e)}")

    def on_close(self, ws, close_status_code, close_msg):
        """连接关闭的回调"""
        logger.info(f"Connection closed: {close_status_code} - {close_msg}")

    def on_error(self, ws, error):
        """错误处理的回调"""
        logger.error(f"WebSocket error: {str(error)}")

    def send_message(self, message_obj):
        """发送消息到WebSocket服务器"""
        if self.is_connected():
            try:
                self.session.send(json.dumps(message_obj))
            except Exception as e:
                logger.error(f"Error sending message: {str(e)}")
        else:
            logger.warning("Cannot send message: Not connected")

    def ping(self):
        """发送心跳包"""
        current_time = time.time()
        if current_time - self.last_ping_time >= self.max_ping_interval:
            ping_obj = {
                "code": 10010,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a"
            }
            self.send_message(ping_obj)
            self.last_ping_time = current_time
        else:
            logger.debug(f"Ping skipped: Time interval between pings is less than {self.max_ping_interval} seconds.")

# 使用示例
if __name__ == "__main__":
    ws_client = WebsocketExample()
    ws_client.connect_all()

    # 保持主线程运行
    try:
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Exiting...")
        if ws_client.is_connected():
            ws_client.session.close()

3. 使用期货数据接口的注意事项

3.1 处理连接和心跳

  • 保持连接:WebSocket是持久连接,所以需要确保连接保持活跃。如果连接断开,需要实现自动重连机制。
  • 心跳包机制:大多数WebSocket服务会提供心跳包(ping/pong)机制来检测连接是否活跃。客户端需要定期发送心跳包,以防连接超时或断开。
    • 通常,期货数据WebSocket接口会要求客户端每隔一段时间(例如30秒或60秒)发送一个ping请求,如果没有收到响应,则连接可能会被断开。
    • 需要根据API文档配置合适的心跳时间间隔。

3.2 确保消息顺序

  • 消息顺序问题:WebSocket是基于流的协议,因此在某些情况下,消息可能会乱序到达。对于期货数据,特别是实时行情数据,乱序可能导致错误的数据解释。
  • 消息的去重与同步:在客户端处理实时数据时,需要设计去重机制,并确保历史数据和实时数据同步。例如,处理成交量、价格等信息时,要确保这些信息是按时间顺序正确处理。

3.3 数据订阅

  • 选择合适的数据订阅类型:期货数据接口通常提供多种数据类型(例如实时成交、盘口数据、K线数据等),需要根据实际需求订阅相关的数据流。
  • 批量订阅:为了提高效率,可以批量订阅多个品种的数据,而不是单独为每个品种订阅。但要注意API限制,避免超出最大订阅数限制。
  • 取消订阅:如果不再需要某个数据流,及时取消订阅以减少网络负担和资源占用。

3.4 处理消息和数据格式

  • JSON格式:大多数期货WebSocket数据接口使用JSON格式传输数据。确保客户端能够正确解析JSON,并能处理可能出现的异常或错误格式的数据。
  • 数据解码与验证:收到的数据需要进行校验(例如验证是否是预期格式、是否包含错误代码、是否包含丢失的字段等)。

3.5 容错和错误处理

  • 错误回调机制:确保客户端实现了适当的错误回调机制,能够在连接失败、消息处理失败等情况下作出响应。
  • 错误日志记录:记录错误日志非常重要,这有助于后续问题排查和性能分析。
  • 异常处理:处理可能出现的异常,如网络中断、数据解析失败、连接超时等。

3.6 连接重试和恢复

  • 自动重连机制:当WebSocket连接丢失或中断时,需要实现自动重连机制,并在成功连接后重新订阅所需的数据流。
  • 重试次数限制:为了避免无限重试,设置最大重试次数。例如,连接失败超过5次后,停止重试并报告错误。
  • 恢复状态:在重新连接后,确保客户端能够恢复到断开时的状态,并重新订阅所需的数据。

3.7 性能优化

  • 高并发处理:期货市场的数据量通常较大,尤其是高频交易的数据。需要确保客户端能够高效地处理大量的实时数据。
  • 数据过滤与处理:客户端可以根据业务需求过滤不必要的数据,以减少带宽占用和计算负担。
  • 内存管理:实时数据流可能会占用大量内存,特别是在长时间运行时。需要确保有合适的内存管理措施,如清理过期数据、设置缓存大小限制等。
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!