pytest 与会话范围的固定装置和 asyncio 相关的问题

我有多个测试文件,每个文件都有一个异步夹具,如下所示:



@pytest.fixture(scope="module")

def event_loop(request):

    loop = asyncio.get_event_loop_policy().new_event_loop()

    yield loop

    loop.close()



@pytest.fixture(scope="module")

async def some_fixture():

    return await make_fixture()

我正在使用 xdist 进行并行化。另外我有这个装饰器:


@toolz.curry

def throttle(limit, f):

    semaphore = asyncio.Semaphore(limit)


    @functools.wraps(f)

    async def wrapped(*args, **kwargs):

        async with semaphore:

            return await f(*args, **kwargs)


    return wrapped

我有一个函数使用它:


@throttle(10)

def f():

    ...

现在f正在从多个测试文件调用,并且我收到一个异常,告诉我无法使用不同事件循环中的信号量。


我尝试转向会话级事件循环装置:




@pytest.fixture(scope="session", autouse=True)

def event_loop(request):

    loop = asyncio.get_event_loop_policy().new_event_loop()

    yield loop

    loop.close()


但这只给了我:


ScopeMismatch:您尝试使用“模块”范围请求对象访问“函数”范围固定装置“event_loop”,涉及工厂


是否有可能让 xdist + 异步装置 + 信号量一起工作?


呼唤远方
浏览 108回答 1
1回答

胡子哥哥

最终使用以下方法让它工作conftest.py:import asyncioimport pytest@pytest.fixture(scope="session")def event_loop():    return asyncio.get_event_loop()
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python