5.1. 异步任务与并发控制决定了 Skills 的吞吐量

异步任务与并发控制决定了 Skills 的吞吐量

场景还原:
2026 年 5 月,一家电商公司的 Agent 上线了商品比价 Skill。这个 Skill 需要同时从 30 个供应商的 API 获取价格,然后汇总排序。第一版用 requests 串行调用,单次执行耗时超过 12 秒,用户根本无法接受。改用 Python 的 asyncio 后,同样的 30 个请求在 0.8 秒内全部返回,吞吐量提升了 15 倍。
本章将带你完成同样的改造:用异步 I/O、任务队列和并发控制,打造高吞吐量的 Skill,使其即使在数百个并发请求下依然保持毫秒级响应。

你需要什么

需求 说明
Python 3.10+ 内置 asyncio 模块,支持 asyncio.TaskGroup(3.11+)
aiohttp 异步 HTTP 客户端(pip install aiohttp
asyncpg 异步 PostgreSQL 驱动(可选,演示连接池)
基础 Skill 工程结构 已有同步版本的工具函数,可直接在此之上改造
约 30 分钟 完成三个核心子节的代码改造与测试

最终成果

你将拥有一个 并发安全的异步 Skill,它具备以下能力:

  • 使用 async/await 改写原有同步 I/O 调用,支持并行执行
  • 通过 asyncio.QueueSemaphore 实现任务队列并发上限控制,避免后端被打垮
  • 管理数据库或 HTTP 连接池,防止资源耗尽
  • 整体执行时间从“秒级”压缩到“百毫秒级”,单次 Skill 调用可承载数十至数百个子任务

为什么做这个: 在 Agent 系统中,Skill 经常被多个 Agent 或任务同时触发。如果 Skill 内部是串行、阻塞的,整个 Agent 循环都会被拖慢,直接影响用户体验和系统稳定性。


步骤 1 Python asyncio 与 Skills 脚本的融合

动作:将同步工具改写为 async 版本

假设你的 Skill 中有一个同步函数 fetch_price(supplier_id),它使用 requests 调用外部 API。

# skill_sync.py - 改造前的同步版本(阻塞)
import requests

def fetch_price(supplier_id: int) -> dict:
    url = f"https://api.example.com/price/{supplier_id}"
    resp = requests.get(url, timeout=5)
    resp.raise_for_status()
    return resp.json()

把它改成基于 aiohttp 的异步版本:

# skill_async.py - 改造后的异步版本
import aiohttp
import asyncio

async def fetch_price(session: aiohttp.ClientSession, supplier_id: int) -> dict:
    url = f"https://api.example.com/price/{supplier_id}"
    async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
        resp.raise_for_status()
        return await resp.json()

动作:并发调用多个 supplier

现在你的 Skill 主逻辑可以同时发起所有请求,利用 asyncio.gather 等待全部完成。

async def compare_prices(supplier_ids: list[int]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_price(session, sid) for sid in supplier_ids]
        return await asyncio.gather(*tasks)

# 入口
results = asyncio.run(compare_prices([101, 102, 103, ...]))

什么是 TaskGroup?
Python 3.11 引入了 asyncio.TaskGroup,它能更好地结构化任务管理:一个任务失败,整个组会被取消。对于 Skill 中的关键并发操作,推荐用它替代裸的 asyncio.gather

async def compare_prices_robust(supplier_ids):
    async with aiohttp.ClientSession() as session:
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(fetch_price(session, sid)) for sid in supplier_ids]
        # 此处所有任务都已经完成,tg 会自动等待并传播异常
    return [t.result() for t in tasks]

预期结果

  • 30 个供应商的请求在 1 秒内全部返回(取决于网络),主线程不会阻塞
  • Skill 内部执行流清晰,异常会被正确抛出并处理

踩坑经验

注意:事件循环不能阻塞!
不要在协程内部调用 time.sleep() 或任何同步阻塞函数(如 requests.get)。这会卡住整个事件循环,导致所有任务停止调度。如果你必须调用一段 CPU 密集型代码,请使用 await asyncio.to_thread(your_blocking_func, *args) 将其移到线程池中执行。

# 错误示例
async def bad_example():
    time.sleep(1)   # 阻塞整个事件循环

# 正确示例
async def good_example():
    await asyncio.sleep(1)

# 或使用 to_thread 运行遗留同步代码
async def legacy_wrapper():
    result = await asyncio.to_thread(legacy_blocking_io)

步骤 2 工作队列与批量处理模式

问题:无限并发会打垮下游

上一步我们一次性发起了所有请求。但当供应商数量从 30 变成 500,或者外部 API 有严格速率限制时,光靠 gather 是不够的。你需要控制并发数量

动作:引入 asyncio.Queue 实现生产者-消费者

构建一个工作队列,限制同时运行的 worker 数量(比如 20 个)。

import asyncio

async def worker(queue: asyncio.Queue, session: aiohttp.ClientSession, results: list):
    """消费者:从队列取任务,执行,直到收到毒丸"""
    while True:
        item = await queue.get()
        if item is None:          # 毒丸信号,结束 worker
            queue.task_done()
            break
        supplier_id = item
        try:
            price = await fetch_price(session, supplier_id)
            results.append(price)
        except Exception as e:
            results.append({"error": str(e)})
        finally:
            queue.task_done()

async def compare_prices_batch(supplier_ids: list[int], max_concurrency: int = 20):
    queue = asyncio.Queue()
    for sid in supplier_ids:
        queue.put_nowait(sid)
    # 放入与 worker 数量相等的“毒丸”
    for _ in range(max_concurrency):
        queue.put_nowait(None)

    results = []
    async with aiohttp.ClientSession() as session:
        workers = [
            asyncio.create_task(worker(queue, session, results))
            for _ in range(max_concurrency)
        ]
        await queue.join()        # 等待所有任务完成
        # 确保所有 worker 正常退出
        await asyncio.gather(*workers)
    return results

动作(备选):用 Semaphore 控制并发数

如果你的任务生成简单,直接用 asyncio.Semaphore 限制并发协程数量会更简洁。

sem = asyncio.Semaphore(20)   # 最多同时运行 20 个

async def fetch_with_limit(session, sid):
    async with sem:
        return await fetch_price(session, sid)

async def compare_prices_sem(supplier_ids):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(session, sid) for sid in supplier_ids]
        return await asyncio.gather(*tasks)

预期结果

  • 无论任务列表多大,同时发出的 HTTP 请求永远不超过设定值
  • 队列模式还能平滑处理突发流量,天然形成背压:没有 worker 空闲时,新任务就在队列中等待

关于 Celery 的说明

如果你的 Skill 运行在分布式 Agent 系统中,并且任务执行时间不可预测(如需要等待外部审批),可以考虑引入 Celery 作为专用的任务队列。但注意,Celery 是进程外队列,会引入序列化开销和管理复杂度。
对于大多数 IO 密集型 Skill,使用 asyncio.Queue + 协程就足够了,轻量且高效。


步骤 3 连接池与资源限制

问题:每个任务自己建连会耗尽出口端口

当并发量上来后,很多开发者会遇到 OSError: [Errno 99] Cannot assign requested address,这是因为没有复用 TCP 连接,短时间打开了过多临时端口。

动作:使用 aiohttp 的会话级连接池

在上面的例子中,我们创建了一个 aiohttp.ClientSession,它内部维护了连接池,默认会复用连接。但是你需要显式配置连接限制。

connector = aiohttp.TCPConnector(
    limit=100,                # 总连接池大小
    limit_per_host=30,        # 对同一个 host 的最大连接数
    ttl_dns_cache=300          # DNS 缓存时间(秒)
)
async with aiohttp.ClientSession(connector=connector) as session:
    # 所有请求共享这个连接池
    ...

经验法则: limit 不要超过系统 ulimit 允许的文件描述符数量的一半。limit_per_host 应依据目标服务的并发能力设定,通常在 10~50 之间。

动作:管理数据库连接池(以 asyncpg 为例)

如果你的 Skill 要连接 PostgreSQL,强烈建议使用 asyncpg 的连接池。

import asyncpg

async def init_db_pool():
    return await asyncpg.create_pool(
        dsn="postgresql://user:pass@localhost/db",
        min_size=5,          # 启动时建立的最小连接数
        max_size=20,         # 最大连接数
        max_queries=50000,   # 单个连接最多执行多少查询后关闭(防止内存泄漏)
        timeout=30           # 获取连接的超时时间
    )

async def run_skill():
    pool = await init_db_pool()
    async with pool.acquire() as conn:
        # 此处获得的连接已是复用状态,用完后自动放回池子
        rows = await conn.fetch("SELECT ...")

预期结果

  • HTTP 请求不再出现大量 TIME_WAIT 或端口耗尽错误
  • 数据库连接数稳定在配置范围,不会因为突发流量导致“too many connections”
  • Skill 在执行完成后关闭池子,释放资源

踩坑经验

注意:忘记关闭资源
务必将 SessionPool 作为最长生命周期对象来管理。通常在一个 Skill 调用内部创建 async with 上下文即可。如果错误地全局共享一个 ClientSession 而不关闭,会导致连接泄漏。更严重的是,如果在 asyncio.run() 结束后还有后台协程未完成,Python 会报 RuntimeWarning: coroutine ... was never awaited


回顾

你花了约 30 分钟完成了以下改造:

  1. 异步化改造:将 Skill 中的同步 I/O 调用替换为 async/await 版本,并利用 asyncio.gatherTaskGroup 实现并发执行。
  2. 并发控制:通过 asyncio.QueueSemaphore 限制了 worker 数量,为下游服务提供了天然的流量塑形和过载保护。
  3. 资源池化:为 HTTP 客户端和数据库连接配置了合理的连接池上限,从根本上避免了端口耗尽和连接风暴。

现在你的 Skill 即使在数百个任务同时涌入时,也能平稳、高效地处理——吞吐量由受控的并发度决定,而不再被串行阻塞或者无限制的创建所拖垮。

行动清单

  • [ ] 识别 Skill 中所有同步 I/O 点(HTTP 请求、磁盘读写、数据库查询),改为异步库实现
  • [ ] 为每个并发调用点设置 Semaphore 或使用 Queue 限制最大并发数,数值通过压测确定
  • [ ] 检查 aiohttp.ClientSession 的连接池配置,并确保所有会话都在 async with 块内使用
  • [ ] 为数据库连接引入连接池(如 asyncpg.create_pool),设定 max_size 并监控使用率
  • [ ] 在正式上线前,用 asyncio.run() 一次性启动 Skill 主协程,并确保无未等待的任务

下一章 《外部 API 集成必须内置安全与可靠性》,我们将深入探讨如何安全地连接 REST、GraphQL 等外部服务,覆盖认证、加密、限流和熔断,让你的 Skill 在真实世界中更加健壮。

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 只看当前版本


暂无话题~