量化交易系统:历史行情 API 批量拉取与回测数据清洗

做量化交易的人都知道,回测系统的核心不是策略有多花哨,而是数据有多可靠

如果历史行情数据本身就有问题,那么再完美的回测结果也只是“垃圾进,垃圾出”。
量化交易系统
本文从实战出发,聊聊如何通过 API 批量拉取历史行情数据,并做一套严谨的回测数据清洗流程。这些坑,我都踩过。

一、为什么历史行情数据这么难搞?

很多人以为历史行情就是“股票代码+日期+开高低收+成交量”。真上手才发现,问题一大堆:

  • 不同数据源格式不同,有的前复权、有的后复权、有的不复权

  • 停牌日、除权除息日、涨跌停板数据容易被忽略

  • API 限流、断点续传、数据缺失需要处理

  • 国内 A 股、美股、期货的数据格式和规则差异巨大

一个合格的量化回测系统,必须能从源头保证数据的完整性、一致性、无偏性

二、批量拉取的工程设计

2.1 基础思路

不要一次性拉全部历史数据,更不要写死日期。合理的设计应该是:


配置股票池 → 判断本地已有数据 → 只拉缺失区间 → 合并去重 → 校验一致性

2.2 代码示例:带断点续传的批量拉取

下面使用 iTick API 获取历史日线数据(前复权),并实现本地缓存与断点续传。


import requests

import pandas as pd

import time

from pathlib import Path

API_TOKEN = "your_token_here" # 替换为实际 Token

BASE_URL = "https://api.itick.org"

def  build_headers():

 """构造请求头,包含 API Token 验证"""

 return {

 "token": API_TOKEN,

 "Content-Type": "application/json"

    }

def  fetch_stock_history(stock_code, region="HK", k_type=8, start_date="20000101",

 end_date="20231231", cache_dir="./data/raw"):

 """

    带缓存的批量拉取,自动断点续传

    参数说明:

        stock_code : 股票代码(港股示例:00700)

        region     : 市场代码(HK/US/SZ/SH 等)

        k_type     : K线类型(8:日线,9:周线,10:月线)

        start_date : 开始日期(格式 YYYYMMDD)

        end_date   : 结束日期(格式 YYYYMMDD)

        cache_dir  : 本地缓存目录

    """

    Path(cache_dir).mkdir(parents=True, exist_ok=True)

cache_file = Path(cache_dir) / f"{stock_code}.parquet"

 # 已有数据则加载,仅拉取缺失区间

 if cache_file.exists():

        df_old = pd.read_parquet(cache_file)

        df_old['trade_date'] = pd.to_datetime(df_old['trade_date'])

        last_date = df_old['trade_date'].max()

        start_date = (last_date + pd.Timedelta(days=1)).strftime('%Y%m%d')

 if start_date > end_date:

 return df_old

 print(f"{stock_code}: 本地已有数据至 {last_date.date()},开始增量拉取...")

 else:

        df_old = pd.DataFrame()

 # 将日期范围转换为时间戳(iTick kType 模式下需通过 et 参数控制截止)

start_ts = int(pd.Timestamp(start_date).timestamp())

end_ts = int(pd.Timestamp(end_date).timestamp())

    all_data = []

    current_end_ts = end_ts

batch_days = 100 # 每批最多拉取约 100 个交易日

 while  True:

 # 计算当前批次的起始截止区间(基于天数回推)

batch_start_ts = max(start_ts, current_end_ts - batch_days * 86400)

        params = {

 "region": region,

 "code": stock_code,

 "kType": k_type,

 "limit": 500,      # 每次最多返回 500 根 K 线

 "et": current_end_ts

        }

 try:

url = f"{BASE_URL}/stock/kline"

resp = requests.get(url, headers=build_headers(), params=params, timeout=15)

 if resp.status_code != 200:

 print(f"拉取失败: {stock_code}, 状态码 {resp.status_code}")

                time.sleep(2)

 continue

            data = resp.json()

 if data.get("code") == 0  and data.get("data"):

                batch_data = data["data"]

                all_data.extend(batch_data)

 print(f"{stock_code}: 拉取到 {len(batch_data)} 条数据")

 # 判断是否还有更早的数据

                earliest_ts = batch_data[-1].get("t", 0) if batch_data else  0

 if earliest_ts <= start_ts or  len(batch_data) < 500:

 break

current_end_ts = earliest_ts - 86400 # 继续拉取更早数据

 else:

 print(f"拉取失败: {stock_code}, 错误信息: {data.get('msg')}")

 break

            time.sleep(0.5)  # 限流控制

 except  Exception  as e:

 print(f"拉取异常: {stock_code}, 错误: {e}")

            time.sleep(5)

 continue

 if  not all_data:

 return df_old

 # 数据转换与合并

    df_new = pd.DataFrame(all_data)

 # 将时间戳转换为日期

    df_new['trade_date'] = pd.to_datetime(df_new['t'], unit='s')

 # 重命名字段为统一格式

    df_new = df_new.rename(columns={

 'o': 'open', 'h': 'high', 'l': 'low',

 'c': 'close', 'v': 'volume'

    })

    df_new = df_new[['trade_date', 'open', 'high', 'low', 'close', 'volume']]

df_combined = pd.concat([df_old, df_new], ignore_index=True) if  not df_old.empty else df_new

    df_combined = df_combined.drop_duplicates(subset=['trade_date']).sort_values('trade_date')

df_combined.to_parquet(cache_file, index=False)

 print(f"{stock_code}: 数据保存至 {cache_file}, 共计 {len(df_combined)} 条")

 return df_combined

这个函数做了几件关键的事:

  • 检查本地缓存(Parquet 格式),只拉取缺失区间

  • 通过 limit 和分批区间控制拉取量,支持大量历史数据的自动分页

  • 异常重试与限流睡眠

  • 时间戳自动转换为标准化日期字段

2.3 多股票并发拉取

单线程循环拉取效率较低,可使用线程池实现并发,但仍需控制并发数以避免 API 限流:


from concurrent.futures import ThreadPoolExecutor, as_completed

def  fetch_batch(stock_list, region="HK", max_workers=3):

 """

    批量拉取多只股票的历史数据

    max_workers: 并发数建议 ≤ 5,防止被限流

    """

    results = {}

 with ThreadPoolExecutor(max_workers=max_workers) as executor:

        futures = {

            executor.submit(fetch_stock_history, code, region): code

 for code in stock_list

        }

 for future in as_completed(futures):

            code = futures[future]

 try:

                results[code] = future.result()

 print(f"{code}: 拉取完成")

 except  Exception  as e:

 print(f"{code}: 拉取失败, 错误: {e}")

 return results

并发数建议不超过 5,否则容易被数据源封禁。

三、回测数据清洗 Checklist

拉下来的原始数据,离直接用于回测还差好几步。这是我总结的清洗流程,每一步都不能省。

3.1 时间轴处理


# 确保交易日连续,无跳空

def  align_trading_days(df, trading_calendar=None):

    df['trade_date'] = pd.to_datetime(df['trade_date'])

    df = df.sort_values('trade_date').set_index('trade_date')

 if trading_calendar is  None:

 # 生成完整日历(工作日频率)

        full_calendar = pd.date_range(start=df.index.min(), end=df.index.max(), freq='B')

 else:

        full_calendar = trading_calendar

    df = df.reindex(full_calendar)

 return df

用工作日频率(freq='B')生成完整日历,缺失日期会自动填入 NaN,后续再填充或标记。

3.2 除权除息与复权统一

这是最大的坑!

很多新手直接用不复权数据做回测,结果会发现某天价格突然跳空低开 30%(实际上是除权),策略却以为是大跌而错误开平仓。

最佳实践:全程使用 前复权(qfq) 数据,保持历史价格连续可比。但要注意,前复权会导致早期价格出现负数(极端分红),需要做截断处理:


# 剔除前复权后的负价格或极小价格

df = df[(df['close'] > 0.01) & (df['high'] > 0.01)]

3.3 涨跌停板标记

回测时,如果策略根据信号在涨停价买入,实际根本无法成交。需要提前标记:


# 计算涨跌停价(A股主板±10%,科创/创业±20%,港股无涨跌停板限制)

def  calc_limit_prices(df, stock_code):

 # 根据股票代码判断市场

 if stock_code.startswith('688') or stock_code.startswith('300'):

limit_pct = 0.20  # 科创板/创业板

 elif stock_code.startswith('600') or stock_code.startswith('000'):

limit_pct = 0.10  # A股主板

 else:

 # 港股无涨跌停板限制,直接返回

        df['is_limit_up'] = False

        df['is_limit_down'] = False

 return df

    df['prev_close'] = df['close'].shift(1)

    df['upper_limit'] = df['prev_close'] * (1 + limit_pct)

    df['lower_limit'] = df['prev_close'] * (1 - limit_pct)

 # 标记一字板

    df['is_limit_up'] = (df['open'] >= df['upper_limit'] - 0.001) & (df['close'] >= df['upper_limit'] - 0.001)

    df['is_limit_down'] = (df['open'] <= df['lower_limit'] + 0.001) & (df['close'] <= df['lower_limit'] + 0.001)

 return df

回测执行时,遇到 is_limit_up 且为买入信号,应跳过或转换策略。

3.4 停牌数据处理

停牌期间,没有成交,不应填充为前一日价格(会导致回测出现不合理收益)。正确做法:


# 停牌日成交量应该为0或NaN,不做前向填充

df['volume'] = df['volume'].fillna(0)

# 对于价格字段,停牌日保持NaN,后续回测引擎遇到NaN应直接跳过该日

3.5 数据对齐(多股票回测)

多股票回测时,需将所有股票对齐到同一个交易日历:


def  align_multi_stocks(stock_dfs, trading_days):

 """

    stock_dfs: dict {code: DataFrame}

    trading_days: 交易日列表(pd.DatetimeIndex)

    """

    aligned = {}

 for code, df in stock_dfs.items():

        df_aligned = df.set_index('trade_date').reindex(trading_days)

        aligned[code] = df_aligned

 return aligned

四、数据质量校验

清洗完毕后,一定要跑一遍自动化校验:


def  validate_data(df, stock_code):

    checks = {

 "是否有重复日期": df.index.duplicated().sum() == 0,

 "是否有空价格": df[['open','high','low','close']].isna().any().any() == False,

 "最低价是否高于最高价": (df['low'] <= df['high']).all(),

 "成交量是否非负": (df['volume'] >= 0).all(),

 "价格序列是否单调异常": (

            (df['close'] - df['close'].shift(1)).abs() / df['close'].shift(1) < 0.2

        ).all(),  # 除去涨跌停

    }

 for name, result in checks.items():

 print(f"{stock_code} - {name}: {'通过'  if result else  '失败'}")

 return  all(checks.values())

五、存储与版本管理建议

  • 格式:强烈推荐 ParquetFeather,比 CSV 快 10 倍以上,且占用空间小。

  • 目录结构

    
    data/
    
     raw/          # 原始API拉取数据(按股票保存)
    
     cleaned/      # 清洗后数据(已复权、对齐、填充)
    
     meta/         # 股票列表、交易日历、除权因子备份
    
  • 版本控制:历史数据不要放 Git,用 DVC(Data Version Control)或直接云存储(S3、OSS)。

六、个人建议

  1. 永远保留原始拉取数据,清洗脚本可重复执行。否则哪天发现清洗逻辑错了,你还得全部重拉。

  2. 不要完美主义。回测数据做不到 100%精确,但必须保证无偏性(误差在买卖双方随机出现)。

  3. 先验小样本。对某只股票拉 3 年数据,手动核对除权除息日、涨跌停日,确信流程正确后再批量跑。

  4. 备胎数据源。核心股票池至少准备两个数据源交叉验证。

最后,记住一句话:回测是用来排除坏策略的,不是用来证明好策略的。 而这一切的起点,就是靠谱的历史行情数据。希望这篇文章能帮你少走弯路。

参考文档:https://docs.itick.org/websocket/stocks

GitHub:https://github.com/itick-org/

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!