如何用 Python 对接实时股票 API 并构建预警系统?:实战分享

AI摘要
本文是一篇技术知识分享,详细介绍了如何使用Python构建一个完整的股票价格实时监控与自动预警系统。内容涵盖系统架构设计、技术栈选择、核心模块实现(包括数据获取、预警规则引擎、通知发送)以及部署运维建议,提供了完整的代码示例和配置说明。

在量化交易和个人投资管理中,实时监控股票价格并设置自动预警是提升决策效率的关键。今天,我将手把手教你如何使用 Python 对接实时股票 API,并构建一个功能完整的股票价格预警系统。
如何用 Python 对接实时股票 API 并构建预警系统

系统架构设计

我们的预警系统将包含以下核心模块:

  1. 数据获取模块 - 通过 WebSocket 连接实时股票 API

  2. 数据处理模块 - 解析和处理实时行情数据

  3. 预警规则引擎 - 根据预设条件触发预警

  4. 通知模块 - 通过多种渠道发送预警信息

技术栈选择

  • Python 3.8+ - 主要编程语言

  • WebSocket 客户端 - 实时数据接收

  • Pandas - 数据处理和分析

  • SQLite/Redis - 预警规则和状态存储

  • SMTP/Telegram API - 预警通知发送

第一步:配置开发环境


# requirements.txt

# 实时股票API客户端和数据处理库

websocket-client>=1.3.0

requests>=2.28.0

pandas>=1.5.0

numpy>=1.24.0

python-dotenv>=0.21.0

schedule>=1.1.0

redis>=4.5.0

# 可选的通知库

python-telegram-bot>=20.0.0

twilio>=8.0.0

第二步:实现基于iTick WebSocket的实时数据连接器


# tick_data_connector.py

import websocket

import json

import threading

import time

import logging

from datetime import datetime

import pandas as pd

from typing import Dict, List, Callable, Optional

class  ITickWebSocketClient:

 """

    iTick Stocks WebSocket 实时行情客户端

    """

 def  __init__(self, token: str, symbols: List[str]):

 self.token = token

 self.symbols = symbols  # 例如 ["AAPL$US", "TSLA$US"]

 self.ws_url = "wss://api.itick.org/stock"

 self.ws = None

 self.is_connected = False

 self.is_authenticated = False

 self.data_callbacks = []

 self.error_callbacks = []

 # 数据缓存(以 symbol 如 "AAPL$US" 为 key)

 self.price_cache: Dict[str, Dict] = {}

 self.last_update: Dict[str, datetime] = {}

        logging.basicConfig(

 level=logging.INFO,

 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'

        )

 self.logger = logging.getLogger(__name__)

 def  add_data_callback(self, callback: Callable):

 self.data_callbacks.append(callback)

 def  add_error_callback(self, callback: Callable):

 self.error_callbacks.append(callback)

 def  on_message(self, ws, message):

 try:

            data = json.loads(message)

 # 连接成功

 if data.get("msg") == "Connected Successfully":

 self.logger.info("WebSocket 连接成功")

 # 认证结果

 elif data.get("resAc") == "auth":

 if data.get("code") == 1:

 self.is_authenticated = True

 self.logger.info("认证成功")

 self._subscribe()

 else:

 self.logger.error("认证失败")

                    ws.close()

 # 订阅结果

 elif data.get("resAc") == "subscribe":

 if data.get("code") == 1:

 self.logger.info("订阅成功")

 else:

 self.logger.error(f"订阅失败: {data.get('msg')}")

 # 心跳响应

 elif data.get("resAc") == "pong":

 self.logger.debug("收到 pong")

 # 实际行情数据

 elif data.get("data"):

                market_data = data["data"]

                data_type = market_data.get("type", "")

                symbol = market_data.get("s") or market_data.get("s")  # tick/quote 使用 s

 if data_type == "tick":

tick = self._parse_tick_data(market_data)

 self.price_cache[symbol] = tick

 self.last_update[symbol] = datetime.now()

 for callback in  self.data_callbacks:

                        callback(tick)

 # 如需处理 quote、depth,可在此扩展

 except  Exception  as e:

 self.logger.error(f"消息处理异常: {e}")

 def  _parse_tick_data(self, raw: Dict) -> Dict:

 """解析 iTick tick 数据"""

 return {

 'symbol': raw.get('s'),

 'price': raw.get('ld'),        # 最新成交价

 'volume': raw.get('v', 0),

 'timestamp': datetime.fromtimestamp(raw.get('t', 0) / 1000).isoformat(),

 'received_at': datetime.now().isoformat()

        }

 def  on_error(self, ws, error):

 self.logger.error(f"WebSocket 错误: {error}")

 for cb in  self.error_callbacks:

            cb(error)

 def  on_close(self, ws, close_status_code, close_msg):

 self.is_connected = False

 self.logger.info("WebSocket 连接关闭")

 def  on_open(self, ws):

 self.is_connected = True

 self.logger.info("WebSocket 已打开,等待认证...")

 def  _subscribe(self):

 """发送订阅消息"""

        subscribe_msg = {

 "ac": "subscribe",

 "params": ",".join(self.symbols),

 "types": "tick,quote,depth"  # 可根据需要添加 K 数据

        }

 self.ws.send(json.dumps(subscribe_msg))

 self.logger.info(f"已发送订阅: {subscribe_msg['params']}")

 def  _send_ping(self):

 """心跳线程"""

 while  self.is_connected:

            time.sleep(30)

 if  self.is_connected:

                ping_msg = {

 "ac": "ping",

 "params": str(int(time.time() * 1000))

                }

 self.ws.send(json.dumps(ping_msg))

 self.logger.debug("发送 ping")

 def  connect(self):

        headers = {"token": self.token}

 self.ws = websocket.WebSocketApp(

 self.ws_url,

 header=headers,

 on_open=self.on_open,

 on_message=self.on_message,

 on_error=self.on_error,

 on_close=self.on_close

        )

 # 主线程运行 WebSocket

        wst = threading.Thread(target=self.ws.run_forever)

wst.daemon = True

        wst.start()

 # 启动心跳线程

        ping_thread = threading.Thread(target=self._send_ping)

ping_thread.daemon = True

        ping_thread.start()

 # 等待连接建立

 for _ in  range(30):

 if  self.is_authenticated:

 break

            time.sleep(1)

 def  disconnect(self):

 if  self.ws:

 self.ws.close()

 self.is_connected = False

 def  get_current_price(self, symbol: str) -> Optional[float]:

 if symbol in  self.price_cache:

 return  self.price_cache[symbol].get('price')

 return  None

第三步:构建预警规则引擎


# alert_engine.py

import sqlite3

from datetime import datetime, timedelta

from typing import Dict, List, Any

import json

import logging

class  AlertEngine:

 """预警规则引擎"""

 def  __init__(self, db_path: str = "alerts.db"):

 self.db_path = db_path

 self.active_alerts = {}

 self.logger = logging.getLogger(__name__)

 self._init_database()

 def  _init_database(self):

 """初始化数据库"""

        conn = sqlite3.connect(self.db_path)

        cursor = conn.cursor()

 # 创建预警规则表

        cursor.execute('''

            CREATE TABLE IF NOT EXISTS alert_rules (

                id INTEGER PRIMARY KEY AUTOINCREMENT,

                symbol TEXT NOT NULL,

                rule_type TEXT NOT NULL,

                condition TEXT NOT NULL,

                threshold REAL,

                value TEXT,

                is_active INTEGER DEFAULT 1,

                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

                last_triggered TIMESTAMP

            )

        ''')

 # 创建预警历史表

        cursor.execute('''

            CREATE TABLE IF NOT EXISTS alert_history (

                id INTEGER PRIMARY KEY AUTOINCREMENT,

                alert_rule_id INTEGER,

                symbol TEXT NOT NULL,

                trigger_price REAL,

                trigger_value TEXT,

                triggered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

                FOREIGN KEY (alert_rule_id) REFERENCES alert_rules (id)

            )

        ''')

        conn.commit()

        conn.close()

 # 从数据库加载活动预警

 self._load_active_alerts()

 def  _load_active_alerts(self):

 """从数据库加载活动预警规则"""

        conn = sqlite3.connect(self.db_path)

        cursor = conn.cursor()

        cursor.execute(

 "SELECT * FROM alert_rules WHERE is_active = 1"

        )

 for row in cursor.fetchall():

            rule_id = row[0]

 self.active_alerts[rule_id] = {

 'symbol': row[1],

 'rule_type': row[2],

 'condition': row[3],

 'threshold': row[4],

 'value': row[5],

 'last_triggered': row[7]

            }

        conn.close()

 self.logger.info(f"加载了 {len(self.active_alerts)} 个活动预警规则")

 def  add_alert_rule(self, rule_data: Dict) -> int:

 """添加预警规则"""

        conn = sqlite3.connect(self.db_path)

        cursor = conn.cursor()

        cursor.execute('''

            INSERT INTO alert_rules

            (symbol, rule_type, condition, threshold, value)

            VALUES (?, ?, ?, ?, ?)

        ''', (

            rule_data['symbol'],

            rule_data['rule_type'],

            rule_data['condition'],

            rule_data.get('threshold'),

            json.dumps(rule_data.get('value', {}))

        ))

        rule_id = cursor.lastrowid

        conn.commit()

        conn.close()

 # 添加到活动预警

 self.active_alerts[rule_id] = {

 'symbol': rule_data['symbol'],

 'rule_type': rule_data['rule_type'],

 'condition': rule_data['condition'],

 'threshold': rule_data.get('threshold'),

 'value': rule_data.get('value', {}),

 'last_triggered': None

        }

 self.logger.info(f"添加预警规则: {rule_data['symbol']} - {rule_data['rule_type']}")

 return rule_id

 def  check_price_alert(self, symbol: str, price: float) -> List[Dict]:

 """检查价格预警"""

        triggered_alerts = []

 for rule_id, rule in  self.active_alerts.items():

 if rule['symbol'] != symbol:

 continue

 if rule['rule_type'] == 'price':

triggered = self._evaluate_price_condition(

                    price,

                    rule['condition'],

                    rule['threshold']

                )

 if triggered:

 # 检查是否在冷却期内(避免频繁触发)

 if  self._is_in_cooldown(rule['last_triggered']):

 continue

                    alert_info = {

 'rule_id': rule_id,

 'symbol': symbol,

 'rule_type': 'price',

 'condition': rule['condition'],

 'threshold': rule['threshold'],

 'trigger_price': price,

 'message': self._generate_alert_message(

                            symbol, price, rule['condition'], rule['threshold']

                        )

                    }

                    triggered_alerts.append(alert_info)

 # 更新最后触发时间

 self._update_last_triggered(rule_id)

 return triggered_alerts

 def  _evaluate_price_condition(self, price: float,

                                 condition: str, threshold: float) -> bool:

 """评估价格条件"""

 if condition == 'above':

 return price > threshold

 elif condition == 'below':

 return price < threshold

 elif condition == 'cross_above':

 # 需要历史价格数据,这里简化为单次比较

 return price >= threshold

 elif condition == 'cross_below':

 return price <= threshold

 elif condition == 'percentage_change':

 # 需要历史价格计算百分比变化

 # 实际应用中需要更复杂的实现

 return  False

 return  False

 def  _is_in_cooldown(self, last_triggered, cooldown_minutes: int = 5) -> bool:

 """检查是否在冷却期内"""

 if  not last_triggered:

 return  False

 if  isinstance(last_triggered, str):

            last_triggered = datetime.fromisoformat(last_triggered)

        cooldown_end = last_triggered + timedelta(minutes=cooldown_minutes)

 return datetime.now() < cooldown_end

 def  _update_last_triggered(self, rule_id: int):

 """更新最后触发时间"""

        conn = sqlite3.connect(self.db_path)

        cursor = conn.cursor()

        cursor.execute('''

            UPDATE alert_rules

            SET last_triggered = CURRENT_TIMESTAMP

            WHERE id = ?

        ''', (rule_id,))

        conn.commit()

        conn.close()

 # 更新内存中的记录

 if rule_id in  self.active_alerts:

 self.active_alerts[rule_id]['last_triggered'] = datetime.now().isoformat()

 def  _generate_alert_message(self, symbol: str, price: float,

                               condition: str, threshold: float) -> str:

 """生成预警消息"""

 if condition == 'above':

 return  f"🚨 {symbol} 价格突破 {threshold},当前价格: {price:.2f}"

 elif condition == 'below':

 return  f"⚠️ {symbol} 价格跌破 {threshold},当前价格: {price:.2f}"

 else:

 return  f"📈 {symbol} 触发预警条件,当前价格: {price:.2f}"

第四步:实现通知发送器


# notifier.py

import smtplib

from email.mime.text import MIMEText

from email.mime.multipart import MIMEMultipart

import logging

from typing import Dict

class  AlertNotifier:

 """预警通知器"""

 def  __init__(self, config: Dict):

 self.config = config

 self.logger = logging.getLogger(__name__)

 def  send_email(self, to_email: str, subject: str, body: str) -> bool:

 """发送邮件通知"""

 try:

            msg = MIMEMultipart()

            msg['From'] = self.config.get('email_from')

            msg['To'] = to_email

            msg['Subject'] = subject

msg.attach(MIMEText(body, 'plain'))

 with smtplib.SMTP(

 self.config.get('smtp_server'),

 self.config.get('smtp_port', 587)

) as server:

                server.starttls()

                server.login(

 self.config.get('email_user'),

 self.config.get('email_password')

                )

                server.send_message(msg)

 self.logger.info(f"邮件发送成功: {to_email}")

 return  True

 except  Exception  as e:

 self.logger.error(f"邮件发送失败: {e}")

 return  False

 def  send_telegram(self, chat_id: str, message: str) -> bool:

 """发送Telegram通知(简化版)"""

 try:

 # 实际应用中需要使用python-telegram-bot库

 # 这里使用requests模拟

 import requests

bot_token = self.config.get('telegram_bot_token')

 if  not bot_token:

 self.logger.warning("未配置Telegram Bot Token")

 return  False

url = f"https://api.telegram.org/bot{bot_token}/sendMessage"

            payload = {

 'chat_id': chat_id,

 'text': message,

 'parse_mode': 'HTML'

            }

response = requests.post(url, json=payload)

 if response.status_code == 200:

 self.logger.info(f"Telegram消息发送成功: {chat_id}")

 return  True

 else:

 self.logger.error(f"Telegram发送失败: {response.text}")

 return  False

 except  ImportError:

 self.logger.warning("未安装requests库,无法发送Telegram消息")

 return  False

 except  Exception  as e:

 self.logger.error(f"Telegram发送错误: {e}")

 return  False

 def  send_console_notification(self, alert_info: Dict):

 """控制台通知(开发调试用)"""

 print("\n" + "="*50)

 print("🚨 股票预警触发 🚨")

 print(f"股票: {alert_info['symbol']}")

 print(f"价格: {alert_info['trigger_price']:.2f}")

 print(f"条件: {alert_info['condition']}  {alert_info['threshold']}")

 print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

 print("="*50 + "\n")

第五步:整合主应用程序


# main.py

import json

import time

import signal

import sys

from datetime import datetime

from typing import List

import logging

from tick_data_connector import TickDataAPIClient

from alert_engine import AlertEngine

from notifier import AlertNotifier

class  StockAlertSystem:

 """股票预警系统主程序"""

 def  __init__(self, config_path: str = "config.json"):

 # 加载配置

 with  open(config_path, 'r') as f:

 self.config = json.load(f)

 # 设置日志

 self._setup_logging()

 self.logger = logging.getLogger(__name__)

 # 初始化组件

 self.api_client = TickDataAPIClient(

 api_key=self.config['itick']['token'],

 symbols=self.config['monitor_symbols']

        )

 self.alert_engine = AlertEngine(

 db_path=self.config.get('database_path', 'alerts.db')

        )

 self.notifier = AlertNotifier(self.config['notifications'])

 # 添加预定义预警规则

 self._setup_default_alerts()

 # 运行标志

 self.running = False

 def  _setup_logging(self):

 """配置日志系统"""

log_config = self.config.get('logging', {})

log_level = getattr(logging, log_config.get('level', 'INFO'))

        logging.basicConfig(

 level=log_level,

 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',

 handlers=[

                logging.FileHandler(log_config.get('file', 'stock_alert.log')),

                logging.StreamHandler()

            ]

        )

 def  _setup_default_alerts(self):

 """设置默认预警规则"""

default_alerts = self.config.get('default_alerts', [])

 for alert_rule in default_alerts:

 self.alert_engine.add_alert_rule(alert_rule)

 def  handle_tick_data(self, tick_data: Dict):

 """处理实时tick数据"""

        symbol = tick_data['symbol']

        price = tick_data['price']

 # 检查是否触发预警

triggered_alerts = self.alert_engine.check_price_alert(symbol, price)

 # 发送通知

 for alert in triggered_alerts:

 self._send_alert_notifications(alert)

 def  _send_alert_notifications(self, alert_info: Dict):

 """发送预警通知"""

        message = alert_info['message']

 # 控制台通知(开发用)

 self.notifier.send_console_notification(alert_info)

 # 邮件通知

 if  self.config['notifications'].get('enable_email', False):

 for email in  self.config['notifications'].get('email_recipients', []):

 self.notifier.send_email(

                    email,

 f"股票预警: {alert_info['symbol']}",

                    message

                )

 # Telegram通知

 if  self.config['notifications'].get('enable_telegram', False):

 for chat_id in  self.config['notifications'].get('telegram_chat_ids', []):

 self.notifier.send_telegram(chat_id, message)

 # 记录到日志

 self.logger.info(f"预警触发: {message}")

 def  run(self):

 """运行主程序"""

 self.running = True

 # 设置信号处理

signal.signal(signal.SIGINT, self.shutdown)

signal.signal(signal.SIGTERM, self.shutdown)

 # 注册数据回调

 self.api_client.add_data_callback(self.handle_tick_data)

 # 连接API

 self.logger.info("正在连接Tick API...")

 self.api_client.connect()

 # 主循环

 self.logger.info("股票预警系统已启动")

 try:

 while  self.running:

 # 检查系统状态

 self._monitor_system_health()

                time.sleep(1)

 except  KeyboardInterrupt:

 self.shutdown()

 except  Exception  as e:

 self.logger.error(f"系统运行错误: {e}")

 self.shutdown()

 def  _monitor_system_health(self):

 """监控系统健康状态"""

 # 这里可以添加更多的健康检查逻辑

 # 例如:检查API连接状态、数据库连接、磁盘空间等

 # 示例:检查最近数据更新时间

 for symbol in  self.config['monitor_symbols']:

last_update = self.api_client.last_update.get(symbol)

 if last_update:

                time_diff = (datetime.now() - last_update).total_seconds()

 if time_diff > 60:  # 超过60秒没有数据

 self.logger.warning(f"{symbol} 数据更新延迟: {time_diff:.0f}秒")

 def  shutdown(self, signum=None, frame=None):

 """关闭系统"""

 self.logger.info("正在关闭系统...")

 self.running = False

 # 断开API连接

 self.api_client.disconnect()

 self.logger.info("系统已关闭")

        sys.exit(0)

if  __name__ == "__main__":

 # 创建配置文件(示例)

    sample_config = {

 "itick": {

 "token": "your_iTick_api_key_here",

        },

 "monitor_symbols": ["AAPL$US", "TSLA$US", "GOOGL$US", "MSFT$US"],

 "default_alerts": [

            {

 "symbol": "AAPL$US",

 "rule_type": "price",

 "condition": "above",

 "threshold": 230.0,

 "value": {}

            },

            {

 "symbol": "TSLA$US",

 "rule_type": "price",

 "condition": "below",

 "threshold": 300.0,

 "value": {}

            }

        ],

 "notifications": {

 "enable_email": False,

 "email_user": "your_email@gmail.com",

 "email_password": "your_app_password",

 "smtp_server": "smtp.gmail.com",

 "smtp_port": 587,

 "email_from": "your_email@gmail.com",

 "email_recipients": ["recipient@example.com"],

 "enable_telegram": False,

 "telegram_bot_token": "your_bot_token",

 "telegram_chat_ids": ["your_chat_id"]

        },

 "logging": {

 "level": "INFO",

 "file": "stock_alert.log"

        },

 "database_path": "alerts.db"

    }

 # 保存示例配置

 with  open("config_sample.json", "w") as f:

json.dump(sample_config, f, indent=2)

 print("示例配置文件已生成: config_sample.json")

 print("请复制并修改为 config.json,然后运行系统")

 # 实际运行代码(取消注释以下行)

 # system = StockAlertSystem("config.json")

 # system.run()

第六步:扩展功能和优化建议

1. 添加更多预警规则类型


# 技术指标预警

def  add_technical_alert(self, symbol: str, indicator: str,

                       condition: str, value: float):

 """添加技术指标预警"""

 pass # 实现MACD、RSI、布林带等指标预警

# 成交量异常预警

def  add_volume_alert(self, symbol: str, volume_multiplier: float = 2.0):

 """添加成交量异常预警"""

 pass

2. 实现数据持久化和分析


def  save_tick_data_to_database(self, tick_data: Dict):

 """保存tick数据到数据库"""

    conn = sqlite3.connect("tick_data.db")

    cursor = conn.cursor()

    cursor.execute('''

        INSERT INTO tick_data

        (symbol, price, volume, timestamp, bid, ask, bid_size, ask_size)

        VALUES (?, ?, ?, ?, ?, ?, ?, ?)

    ''', (

        tick_data['symbol'],

        tick_data['price'],

        tick_data['volume'],

        tick_data['timestamp'],

        tick_data['bid'],

        tick_data['ask'],

        tick_data['bid_size'],

        tick_data['ask_size']

    ))

    conn.commit()

    conn.close()

3. 添加 Web 界面(使用 Flask 或 FastAPI)


# web_interface.py

from flask import Flask, render_template, request, jsonify

app = Flask(__name__)

@app.route('/')

def  dashboard():

 """监控仪表板"""

 return render_template('dashboard.html')

@app.route('/api/alerts', methods=['GET'])

def  get_alerts():

 """获取预警列表API"""

 # 从数据库查询预警

 return jsonify({"alerts": []})

@app.route('/api/add_alert', methods=['POST'])

def  add_alert():

 """添加预警规则API"""

    rule_data = request.json

 # 添加到预警引擎

 return jsonify({"success": True})

部署和运维建议

  1. 服务器选择:使用云服务器(AWS EC2、阿里云 ECS)确保 24 小时运行

  2. 进程管理:使用 Supervisor 或 systemd 管理进程

  3. 日志监控:配置日志轮转和监控告警

  4. 备份策略:定期备份数据库和配置文件

  5. 安全考虑

    • API 密钥使用环境变量存储

    • 数据库加密

    • 定期更新依赖库

故障排除和调试


# debug_utils.py

def  check_api_connection():

 """检查API连接状态"""

 pass

def  simulate_tick_data():

 """模拟tick数据用于测试"""

 pass

def  validate_alert_rules():

 """验证预警规则配置"""

 pass

总结

通过本教程,你已经学会了:

  1. 如何使用 Python 连接实时股票 API

  2. 构建一个可扩展的预警规则引擎

  3. 实现多种通知方式(邮件、Telegram 等)

  4. 设计一个完整的股票监控系统架构

下一步改进方向

  1. 性能优化:使用异步 IO 提高并发处理能力

  2. 机器学习集成:添加基于机器学习的智能预警

  3. 多市场支持:扩展支持加密货币、外汇等市场

  4. 移动应用:开发手机 App 实时接收预警

参考文档:https://blog.itick.org/quant-trading/stock-api-integration-with-telegram-alerts

GitHub 项目地址https://github.com/itick-org/

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!