Python实践:外汇API实时数据整理与处理
在处理外汇行情相关的项目时,实时数据接入是核心环节。使用 Python 接收外汇API推送的数据,可以快速整理成统一结构,便于分析或策略处理。本文记录了接入、数据整理及管理的一些方式,方便在不同场景中复用。
一、WebSocket 连接结构
实时行情通常通过 WebSocket 推送,比轮询更高效。在 Python 中可使用 websocket-client 建立连接。以 AllTick API 为例,WebSocket 接入可以这样写:
import websocket
import json
def on_message(ws, message):
data = json.loads(message)
print(data)
def on_error(ws, error):
print("error:", error)
def on_close(ws):
print("closed")
def on_open(ws):
sub_param = {
"cmd_id": 22002,
"seq_id": 123,
"trace": "test",
"data": {
"symbol_list": [
{"code": "EURUSD", "depth_level": 1}
]
}
}
ws.send(json.dumps(sub_param))
ws = websocket.WebSocketApp(
"wss://quote.alltick.co/quote-b-ws-api",
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
ws.run_forever()
WebSocket 流程主要包括:建立连接 → 订阅交易对 → 接收数据。
二、数据结构整理
接收到的数据示例:
{
"cmd_id": 22002,
"data": {
"symbol": "EURUSD",
"tick": {
"bid": 1.0852,
"ask": 1.0853,
"timestamp": 1710000000
}
}
}
可整理为表格,便于处理:
| 字段 | 含义 | 使用方式 |
|---|---|---|
| symbol | 交易对 | 作为主键 |
| bid | 买价 | 参考下单 |
| ask | 卖价 | 计算点差 |
| timestamp | 时间戳 | K线或缓存 |
标准化处理函数:
def normalize_tick(data):
tick = data["data"]["tick"]
return {
"symbol": data["data"]["symbol"],
"bid": tick["bid"],
"ask": tick["ask"],
"ts": tick["timestamp"]
}
三、缓存与管理
实时数据更新频繁,直接写入数据库可能影响性能。可先在内存中缓存:
market_cache = {}
def handle_tick(tick):
symbol = tick["symbol"]
market_cache[symbol] = tick
需要扩展时可使用 Redis 或其他存储方案。
四、节奏控制
高频数据可能导致处理压力,可加简单节流:
import time
last_print_time = 0
def handle_tick_with_throttle(tick):
global last_print_time
now = time.time()
if now - last_print_time > 1:
print(tick)
last_print_time = now
五、批量订阅交易对
多交易对订阅可以封装函数,提高代码可读性:
def build_subscribe(symbols):
return {
"cmd_id": 22002,
"seq_id": 1,
"trace": "multi",
"data": {
"symbol_list": [{"code": s, "depth_level": 1} for s in symbols]
}
}
ws.send(json.dumps(build_subscribe(["EURUSD", "GBPUSD"])))
六、使用注意点
WebSocket 连接可能断开,需要重连机制
时间戳可统一转换为毫秒或本地时间
数据统一格式方便后续处理
不同交易对精度可能不同,需统一处理
本作品采用《CC 协议》,转载必须注明作者和本文链接
learnnnn 的个人博客
关于 LearnKu