将 python asyncio 与 pyee 事件发射器相结合

我正在尝试使用pyee 库AsyncIOEventEmitter中的,但没有成功。由于某种原因,发出的事件“Hi”永远不会到达完成 asyncio 的未来。我也没有在网上找到合适的例子。此外,我尝试提供当前事件并为 ,使用新的事件循环,但两者产生相同的结果。async_handlerAsyncIOEventEmitter


有人可以帮我吗?下面的示例单元测试:


import asyncio

import logging

import pytest

from pyee import AsyncIOEventEmitter


LOG = logging.getLogger(__name__)


@pytest.mark.asyncio

async def test_setup(event_loop):

    LOG.info("1 - start")

    event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())


    # Create a new Future object.

    future_result = event_loop.create_future()

    LOG.info("2 - emit event")

    event_emitter.emit("event", "Hi")


    @event_emitter.on("event")

    async def async_handler(message):

        LOG.info(">>> %s", message)

        future_result.set_result(message)

        return future_result


    # Wait until *future_result* has a result and print it.

    LOG.info(await future_result)


慕桂英3389331
浏览 372回答 1
1回答

胡说叔叔

好的,明白了,该async_handler方法必须在测试的早期定义...这现在有效:"""Event emitter playground"""import asyncioimport loggingimport pytestfrom pyee import AsyncIOEventEmitterLOG = logging.getLogger(__name__)@pytest.mark.asyncioasync def test_setup(event_loop):    """Receive event from emitter and complete future!"""    LOG.info("1 - start")    event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())    @event_emitter.on("event")    def async_handler(message):        LOG.info(">>> %s", message)        future_result.set_result(message)    future_result = event_loop.create_future()    LOG.info("2 - emit event")    event_emitter.emit("event", "Hi")    LOG.info(await future_result)
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python