分享一个基于实时行情数据API的监控脚本 
                                                    
                        
                    
                    
  
                    
                    功能描述
- 订阅 AAPL 的 1 分钟 K 线;
- 提取最新的收盘价;
- 检测价格变动是否大于一定阈值(例如 1%);
- 推送价格更新到 Telegram。
2. 准备工作
- 获取 Infoway 的 API Key:infoway.io
- 创建 Telegram Bot:向 @BotFather 创建一个 bot,获取 TELEGRAM_BOT_TOKEN;
- 获取 Chat ID:与 bot 聊天,然后访问 https://api.telegram.org/bot<YOUR_BOT_TOKEN>/getUpdates获取你的chat_id。
3. 监控股价变动
import asyncio
import json
import websockets
import aiohttp
WS_URL = "wss://data.infoway.io/ws?business=stock&apikey=yourApiKey"
TELEGRAM_BOT_TOKEN = "yourTelegramBotToken"
TELEGRAM_CHAT_ID = "yourTelegramChatID"
# 这个脚本依赖实时行情数据,需要先在infoway的官网获取免费的API key
last_close_price = None
PRICE_CHANGE_THRESHOLD = 0.01  # 1%
async def send_telegram_message(message: str):
    url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
    payload = {
        "chat_id": TELEGRAM_CHAT_ID,
        "text": message
    }
    async with aiohttp.ClientSession() as session:
        async with session.post(url, data=payload) as response:
            if response.status != 200:
                print("Failed to send Telegram message:", await response.text())
async def connect_and_monitor():
    global last_close_price
    async with websockets.connect(WS_URL) as websocket:
        init_message = {
            "code": 10004,
            "trace": "unique-trace-id-001",
            "data": {
                "arr": [{"type": 1, "codes": "AAPL"}]
            }
        }
        await websocket.send(json.dumps(init_message))
        async def send_ping():
            while True:
                await asyncio.sleep(30)
                ping_message = {"code": 10010, "trace": "ping-001"}
                await websocket.send(json.dumps(ping_message))
        ping_task = asyncio.create_task(send_ping())
        try:
            while True:
                raw_message = await websocket.recv()
                message = json.loads(raw_message)
                # 处理K线数据
                if message.get("code") == 10004 and "data" in message:
                    kline_data = message["data"].get("AAPL", [])
                    if kline_data:
                        latest = kline_data[-1]
                        close_price = latest[4]
                        if last_close_price:
                            change = abs(close_price - last_close_price) / last_close_price
                            if change >= PRICE_CHANGE_THRESHOLD:
                                msg = f"AAPL最新价格: {close_price:.2f},变动率: {change * 100:.2f}%"
                                print(msg)
                                await send_telegram_message(msg)
                        last_close_price = close_price
        except websockets.exceptions.ConnectionClosed:
            print("WebSocket连接关闭")
        finally:
            ping_task.cancel()
asyncio.run(connect_and_monitor())- latest[4]是收盘价,K线数据数组格式一般是- [时间戳, 开, 高, 低, 收, 成交量];
- PRICE_CHANGE_THRESHOLD可根据需要调整为其他百分比;
- 可扩展支持更多股票、不同周期等。
4. 监控个股RSI指标
我们也可以在监控脚本中加入RSI指标的判断,当股票的RSI大于70,或者小于30时发出预警,下面是代码示例:
import asyncio
import json
import websockets
import aiohttp
import numpy as np
from collections import deque
WS_URL = "wss://data.infoway.io/ws?business=stock&apikey=yourApiKey"
TELEGRAM_BOT_TOKEN = "yourTelegramBotToken"
TELEGRAM_CHAT_ID = "yourTelegramChatID"
# RSI 参数
RSI_PERIOD = 14
PRICE_HISTORY = deque(maxlen=RSI_PERIOD + 1)
async def send_telegram_message(message: str):
    url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
    payload = {
        "chat_id": TELEGRAM_CHAT_ID,
        "text": message
    }
    async with aiohttp.ClientSession() as session:
        async with session.post(url, data=payload) as response:
            if response.status != 200:
                print("Failed to send Telegram message:", await response.text())
def calculate_rsi(prices: list) -> float:
    if len(prices) < RSI_PERIOD + 1:
        return None
    deltas = np.diff(prices)
    gains = np.where(deltas > 0, deltas, 0)
    losses = np.where(deltas < 0, -deltas, 0)
    avg_gain = np.mean(gains)
    avg_loss = np.mean(losses)
    if avg_loss == 0:
        return 100.0  # 避免除以0,说明价格只涨不跌
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    return rsi
async def connect_and_monitor():
    async with websockets.connect(WS_URL) as websocket:
        init_message = {
            "code": 10004,
            "trace": "unique-trace-id-002",
            "data": {
                "arr": [{"type": 1, "codes": "AAPL"}]
            }
        }
        await websocket.send(json.dumps(init_message))
        async def send_ping():
            while True:
                await asyncio.sleep(30)
                ping_message = {"code": 10010, "trace": "ping-002"}
                await websocket.send(json.dumps(ping_message))
        ping_task = asyncio.create_task(send_ping())
        try:
            while True:
                raw_message = await websocket.recv()
                message = json.loads(raw_message)
                if message.get("code") == 10004 and "data" in message:
                    kline_data = message["data"].get("AAPL", [])
                    if kline_data:
                        latest = kline_data[-1]
                        close_price = latest[4]
                        # 收集价格用于计算 RSI
                        PRICE_HISTORY.append(close_price)
                        # RSI计算与推送
                        rsi = calculate_rsi(list(PRICE_HISTORY))
                        if rsi:
                            print(f"当前RSI: {rsi:.2f}")
                            if rsi > 70:
                                await send_telegram_message(f"AAPL超买警告⚠️:当前RSI = {rsi:.2f} (>70)")
                            elif rsi < 30:
                                await send_telegram_message(f"AAPL超卖警告📉:当前RSI = {rsi:.2f} (<30)")
        except websockets.exceptions.ConnectionClosed:
            print("WebSocket连接关闭")
        finally:
            ping_task.cancel()
asyncio.run(connect_and_monitor())- 一开始需要收集够 15 条价格数据(RSI_PERIOD+1)才开始计算 RSI;
- 你可以将 RSI_PERIOD设置为其他值,比如 6、9、21,根据策略需求调整;
- 为了避免重复推送相同方向的 RSI 信号,可以加入“信号状态”缓存,如有需要我可以帮你加。
本作品采用《CC 协议》,转载必须注明作者和本文链接
 
           Rockson 的个人博客
 Rockson 的个人博客
         
           
           关于 LearnKu
                关于 LearnKu
               
                     
                     
                     粤公网安备 44030502004330号
 粤公网安备 44030502004330号 
 
推荐文章: