async Semaphore 装饰器

from functools import wraps
import asyncio
import  inspect
def async_sem(conn):
    def ret(fn):
        sem = asyncio.Semaphore(conn)

        @wraps(fn)
        async def _(*args, **kwargs):
            async with sem:
                await fn(*args, **kwargs)
        return _
    if inspect.iscoroutinefunction(conn):
        fn = conn
        conn = 20
        return ret(fn)
    return ret

测试

@async_sem
async def test():
    print("test")
    await asyncio.sleep(1)
    print("test don")


@async_sem(2)
async def test2():
    print("test2")
    await asyncio.sleep(0.5)
    print("test2 don")



async def main():
    await asyncio.gather(test(), test2())


asyncio.run(main())
謎麟
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!