async Semaphore 装饰器
from functools import wraps
import asyncio
import inspect
def async_sem(conn):
def ret(fn):
sem = asyncio.Semaphore(conn)
@wraps(fn)
async def _(*args, **kwargs):
async with sem:
await fn(*args, **kwargs)
return _
if inspect.iscoroutinefunction(conn):
fn = conn
conn = 20
return ret(fn)
return ret
测试
@async_sem
async def test():
print("test")
await asyncio.sleep(1)
print("test don")
@async_sem(2)
async def test2():
print("test2")
await asyncio.sleep(0.5)
print("test2 don")
async def main():
await asyncio.gather(test(), test2())
asyncio.run(main())