5.1. 异步任务与并发控制决定了 Skills 的吞吐量
异步任务与并发控制决定了 Skills 的吞吐量
场景还原:
2026 年 5 月,一家电商公司的 Agent 上线了商品比价 Skill。这个 Skill 需要同时从 30 个供应商的 API 获取价格,然后汇总排序。第一版用 requests 串行调用,单次执行耗时超过 12 秒,用户根本无法接受。改用 Python 的 asyncio 后,同样的 30 个请求在 0.8 秒内全部返回,吞吐量提升了 15 倍。
本章将带你完成同样的改造:用异步 I/O、任务队列和并发控制,打造高吞吐量的 Skill,使其即使在数百个并发请求下依然保持毫秒级响应。
你需要什么
| 需求 | 说明 |
|---|---|
| Python 3.10+ | 内置 asyncio 模块,支持 asyncio.TaskGroup(3.11+) |
aiohttp |
异步 HTTP 客户端(pip install aiohttp) |
asyncpg |
异步 PostgreSQL 驱动(可选,演示连接池) |
| 基础 Skill 工程结构 | 已有同步版本的工具函数,可直接在此之上改造 |
| 约 30 分钟 | 完成三个核心子节的代码改造与测试 |
最终成果
你将拥有一个 并发安全的异步 Skill,它具备以下能力:
- 使用
async/await改写原有同步 I/O 调用,支持并行执行 - 通过
asyncio.Queue和Semaphore实现任务队列与并发上限控制,避免后端被打垮 - 管理数据库或 HTTP 连接池,防止资源耗尽
- 整体执行时间从“秒级”压缩到“百毫秒级”,单次 Skill 调用可承载数十至数百个子任务
为什么做这个: 在 Agent 系统中,Skill 经常被多个 Agent 或任务同时触发。如果 Skill 内部是串行、阻塞的,整个 Agent 循环都会被拖慢,直接影响用户体验和系统稳定性。
步骤 1 Python asyncio 与 Skills 脚本的融合
动作:将同步工具改写为 async 版本
假设你的 Skill 中有一个同步函数 fetch_price(supplier_id),它使用 requests 调用外部 API。
# skill_sync.py - 改造前的同步版本(阻塞)
import requests
def fetch_price(supplier_id: int) -> dict:
url = f"https://api.example.com/price/{supplier_id}"
resp = requests.get(url, timeout=5)
resp.raise_for_status()
return resp.json()
把它改成基于 aiohttp 的异步版本:
# skill_async.py - 改造后的异步版本
import aiohttp
import asyncio
async def fetch_price(session: aiohttp.ClientSession, supplier_id: int) -> dict:
url = f"https://api.example.com/price/{supplier_id}"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
resp.raise_for_status()
return await resp.json()
动作:并发调用多个 supplier
现在你的 Skill 主逻辑可以同时发起所有请求,利用 asyncio.gather 等待全部完成。
async def compare_prices(supplier_ids: list[int]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_price(session, sid) for sid in supplier_ids]
return await asyncio.gather(*tasks)
# 入口
results = asyncio.run(compare_prices([101, 102, 103, ...]))
什么是 TaskGroup?
Python 3.11 引入了asyncio.TaskGroup,它能更好地结构化任务管理:一个任务失败,整个组会被取消。对于 Skill 中的关键并发操作,推荐用它替代裸的asyncio.gather。
async def compare_prices_robust(supplier_ids):
async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_price(session, sid)) for sid in supplier_ids]
# 此处所有任务都已经完成,tg 会自动等待并传播异常
return [t.result() for t in tasks]
预期结果
- 30 个供应商的请求在 1 秒内全部返回(取决于网络),主线程不会阻塞
- Skill 内部执行流清晰,异常会被正确抛出并处理
踩坑经验
注意:事件循环不能阻塞!
不要在协程内部调用time.sleep()或任何同步阻塞函数(如requests.get)。这会卡住整个事件循环,导致所有任务停止调度。如果你必须调用一段 CPU 密集型代码,请使用await asyncio.to_thread(your_blocking_func, *args)将其移到线程池中执行。
# 错误示例
async def bad_example():
time.sleep(1) # 阻塞整个事件循环
# 正确示例
async def good_example():
await asyncio.sleep(1)
# 或使用 to_thread 运行遗留同步代码
async def legacy_wrapper():
result = await asyncio.to_thread(legacy_blocking_io)
步骤 2 工作队列与批量处理模式
问题:无限并发会打垮下游
上一步我们一次性发起了所有请求。但当供应商数量从 30 变成 500,或者外部 API 有严格速率限制时,光靠 gather 是不够的。你需要控制并发数量。
动作:引入 asyncio.Queue 实现生产者-消费者
构建一个工作队列,限制同时运行的 worker 数量(比如 20 个)。
import asyncio
async def worker(queue: asyncio.Queue, session: aiohttp.ClientSession, results: list):
"""消费者:从队列取任务,执行,直到收到毒丸"""
while True:
item = await queue.get()
if item is None: # 毒丸信号,结束 worker
queue.task_done()
break
supplier_id = item
try:
price = await fetch_price(session, supplier_id)
results.append(price)
except Exception as e:
results.append({"error": str(e)})
finally:
queue.task_done()
async def compare_prices_batch(supplier_ids: list[int], max_concurrency: int = 20):
queue = asyncio.Queue()
for sid in supplier_ids:
queue.put_nowait(sid)
# 放入与 worker 数量相等的“毒丸”
for _ in range(max_concurrency):
queue.put_nowait(None)
results = []
async with aiohttp.ClientSession() as session:
workers = [
asyncio.create_task(worker(queue, session, results))
for _ in range(max_concurrency)
]
await queue.join() # 等待所有任务完成
# 确保所有 worker 正常退出
await asyncio.gather(*workers)
return results
动作(备选):用 Semaphore 控制并发数
如果你的任务生成简单,直接用 asyncio.Semaphore 限制并发协程数量会更简洁。
sem = asyncio.Semaphore(20) # 最多同时运行 20 个
async def fetch_with_limit(session, sid):
async with sem:
return await fetch_price(session, sid)
async def compare_prices_sem(supplier_ids):
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, sid) for sid in supplier_ids]
return await asyncio.gather(*tasks)
预期结果
- 无论任务列表多大,同时发出的 HTTP 请求永远不超过设定值
- 队列模式还能平滑处理突发流量,天然形成背压:没有 worker 空闲时,新任务就在队列中等待
关于 Celery 的说明
如果你的 Skill 运行在分布式 Agent 系统中,并且任务执行时间不可预测(如需要等待外部审批),可以考虑引入 Celery 作为专用的任务队列。但注意,Celery 是进程外队列,会引入序列化开销和管理复杂度。
对于大多数 IO 密集型 Skill,使用 asyncio.Queue + 协程就足够了,轻量且高效。
步骤 3 连接池与资源限制
问题:每个任务自己建连会耗尽出口端口
当并发量上来后,很多开发者会遇到 OSError: [Errno 99] Cannot assign requested address,这是因为没有复用 TCP 连接,短时间打开了过多临时端口。
动作:使用 aiohttp 的会话级连接池
在上面的例子中,我们创建了一个 aiohttp.ClientSession,它内部维护了连接池,默认会复用连接。但是你需要显式配置连接限制。
connector = aiohttp.TCPConnector(
limit=100, # 总连接池大小
limit_per_host=30, # 对同一个 host 的最大连接数
ttl_dns_cache=300 # DNS 缓存时间(秒)
)
async with aiohttp.ClientSession(connector=connector) as session:
# 所有请求共享这个连接池
...
经验法则:
limit不要超过系统 ulimit 允许的文件描述符数量的一半。limit_per_host应依据目标服务的并发能力设定,通常在 10~50 之间。
动作:管理数据库连接池(以 asyncpg 为例)
如果你的 Skill 要连接 PostgreSQL,强烈建议使用 asyncpg 的连接池。
import asyncpg
async def init_db_pool():
return await asyncpg.create_pool(
dsn="postgresql://user:pass@localhost/db",
min_size=5, # 启动时建立的最小连接数
max_size=20, # 最大连接数
max_queries=50000, # 单个连接最多执行多少查询后关闭(防止内存泄漏)
timeout=30 # 获取连接的超时时间
)
async def run_skill():
pool = await init_db_pool()
async with pool.acquire() as conn:
# 此处获得的连接已是复用状态,用完后自动放回池子
rows = await conn.fetch("SELECT ...")
预期结果
- HTTP 请求不再出现大量
TIME_WAIT或端口耗尽错误 - 数据库连接数稳定在配置范围,不会因为突发流量导致“too many connections”
- Skill 在执行完成后关闭池子,释放资源
踩坑经验
注意:忘记关闭资源
务必将Session、Pool作为最长生命周期对象来管理。通常在一个 Skill 调用内部创建async with上下文即可。如果错误地全局共享一个ClientSession而不关闭,会导致连接泄漏。更严重的是,如果在asyncio.run()结束后还有后台协程未完成,Python 会报RuntimeWarning: coroutine ... was never awaited。
回顾
你花了约 30 分钟完成了以下改造:
- 异步化改造:将 Skill 中的同步 I/O 调用替换为
async/await版本,并利用asyncio.gather或TaskGroup实现并发执行。 - 并发控制:通过
asyncio.Queue或Semaphore限制了 worker 数量,为下游服务提供了天然的流量塑形和过载保护。 - 资源池化:为 HTTP 客户端和数据库连接配置了合理的连接池上限,从根本上避免了端口耗尽和连接风暴。
现在你的 Skill 即使在数百个任务同时涌入时,也能平稳、高效地处理——吞吐量由受控的并发度决定,而不再被串行阻塞或者无限制的创建所拖垮。
行动清单
- [ ] 识别 Skill 中所有同步 I/O 点(HTTP 请求、磁盘读写、数据库查询),改为异步库实现
- [ ] 为每个并发调用点设置
Semaphore或使用Queue限制最大并发数,数值通过压测确定 - [ ] 检查
aiohttp.ClientSession的连接池配置,并确保所有会话都在async with块内使用 - [ ] 为数据库连接引入连接池(如
asyncpg.create_pool),设定max_size并监控使用率 - [ ] 在正式上线前,用
asyncio.run()一次性启动 Skill 主协程,并确保无未等待的任务
下一章 《外部 API 集成必须内置安全与可靠性》,我们将深入探讨如何安全地连接 REST、GraphQL 等外部服务,覆盖认证、加密、限流和熔断,让你的 Skill 在真实世界中更加健壮。
agent skills 入门到精通
关于 LearnKu