Python - 在 asyncio 中取消任务?

我在下面为异步池编写了代码。在__aexit__任务完成后我取消了 _worker 任务。但是当我运行代码时,工作任务不会被取消并且代码会永远运行。这就是任务的样子:<Task pending coro=<AsyncPool._worker() running at \async_pool.py:17> wait_for=<Future cancelled>>。正在asyncio.wait_for取消但不是工作任务。


class AsyncPool:

    def __init__(self,coroutine,no_of_workers,timeout):

        self._loop           = asyncio.get_event_loop()

        self._queue          = asyncio.Queue()

        self._no_of_workers  = no_of_workers

        self._coroutine      = coroutine

        self._timeout        = timeout

        self._workers        = None


    async def _worker(self): 

        while True:

            try:

                ret = False

                queue_item           = await self._queue.get()

                ret = True

                result               = await asyncio.wait_for(self._coroutine(queue_item), timeout = self._timeout,loop= self._loop)

            except Exception as e:

                print(e)

            finally:

                if ret:

                    self._queue.task_done()



    async def push_to_queue(self,item):

        self._queue.put_nowait(item)

    

    async def __aenter__(self):

        assert self._workers == None

        self._workers = [asyncio.create_task(self._worker()) for _ in range(self._no_of_workers)]

        return self

    

    async def __aexit__(self,type,value,traceback):

        await self._queue.join()


        for worker in self._workers:

            worker.cancel()


        await asyncio.gather(*self._workers, loop=self._loop, return_exceptions =True)

要使用异步池:


async def something(item):

    print("got", item)

    await asyncio.sleep(item)

 

async def main():

    async with AsyncPool(something, 5, 2) as pool:

        for i in range(10):

            await pool.push_to_queue(i)

 

asyncio.run(main())

我终端的输出:

http://img1.mukewang.com/64916d100001db9d01370421.jpg

慕无忌1623718
浏览 237回答 3
3回答

侃侃尔雅

问题是您的except Exception例外条款也会捕获取消并忽略它。更令人困惑的是,print(e)在 a 的情况下只打印一个空行CancelledError,这是输出中空行的来源。(将其更改为print(type(e))显示发生了什么。)要更正此问题,请更改except Exception为更具体的内容,例如except asyncio.TimeoutError.&nbsp;Python 3.8 中不需要此更改,它asyncio.CancelledError不再派生自Exception,而是派生自BaseException,因此except Exception不会捕获它。

慕慕森

这似乎有效。这event是一个计数计时器,当它到期时它的cancels任务。import asynciofrom datetime import datetime as dtfrom datetime import timedelta as tdimport randomimport timeclass Program:&nbsp; &nbsp; def __init__(self):&nbsp; &nbsp; &nbsp; &nbsp; self.duration_in_seconds = 20&nbsp; &nbsp; &nbsp; &nbsp; self.program_start = dt.now()&nbsp; &nbsp; &nbsp; &nbsp; self.event_has_expired = False&nbsp; &nbsp; &nbsp; &nbsp; self.canceled_success = False&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; async def on_start(self):&nbsp; &nbsp; &nbsp; &nbsp; print("On Start Event Start! Applying Overrides!!!")&nbsp; &nbsp; &nbsp; &nbsp; await asyncio.sleep(random.randint(3, 9))&nbsp; &nbsp; async def on_end(self):&nbsp; &nbsp; &nbsp; &nbsp; print("On End Releasing All Overrides!")&nbsp; &nbsp; &nbsp; &nbsp; await asyncio.sleep(random.randint(3, 9))&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; async def get_sensor_readings(self):&nbsp; &nbsp; &nbsp; &nbsp; print("getting sensor readings!!!")&nbsp; &nbsp; &nbsp; &nbsp; await asyncio.sleep(random.randint(3, 9))&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; async def evauluate_data(self):&nbsp; &nbsp; &nbsp; &nbsp; print("checking data!!!")&nbsp; &nbsp; &nbsp; &nbsp; await asyncio.sleep(random.randint(3, 9))&nbsp; &nbsp;&nbsp; &nbsp; async def check_time(self):&nbsp; &nbsp; &nbsp; &nbsp; if (dt.now() - self.program_start > td(seconds = self.duration_in_seconds)):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.event_has_expired = True&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print("Event is DONE!!!")&nbsp; &nbsp; &nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print("Event is not done! ",dt.now() - self.program_start)&nbsp; &nbsp; async def main(self):&nbsp; &nbsp; &nbsp; &nbsp; # script starts, do only once self.on_start()&nbsp; &nbsp; &nbsp; &nbsp; await self.on_start()&nbsp; &nbsp; &nbsp; &nbsp; print("On Start Done!")&nbsp; &nbsp; &nbsp; &nbsp; while not self.canceled_success:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; readings = asyncio.ensure_future(self.get_sensor_readings())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; analysis = asyncio.ensure_future(self.evauluate_data())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; checker = asyncio.ensure_future(self.check_time())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if not self.event_has_expired:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; await readings&nbsp; &nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; await analysis&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; await checker&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # close other tasks before final shutdown&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; readings.cancel()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; analysis.cancel()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; checker.cancel()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.canceled_success = True&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print("cancelled hit!")&nbsp; &nbsp; &nbsp; &nbsp; # script ends, do only once self.on_end() when even is done&nbsp; &nbsp; &nbsp; &nbsp; await self.on_end()&nbsp; &nbsp; &nbsp; &nbsp; print('Done Deal!')async def main():&nbsp; &nbsp; program = Program()&nbsp; &nbsp; await program.main()

红颜莎娜

当您asyncio创建然后取消任务时,您仍然有需要“回收”的任务。所以你想要await worker它。然而,一旦你await取消了这样的任务,因为它永远不会给你返回预期的返回值,就会asyncio.CancelledError被提高,你需要在某个地方抓住它。由于这种行为,我认为您不应该为每个取消的任务gather执行它们await,因为它们应该立即返回:async def __aexit__(self,type,value,traceback):&nbsp; &nbsp; await self._queue.join()&nbsp; &nbsp; for worker in self._workers:&nbsp; &nbsp; &nbsp; &nbsp; worker.cancel()&nbsp; &nbsp; for worker in self._workers:&nbsp; &nbsp; &nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;await worker&nbsp; &nbsp; &nbsp; &nbsp; except asyncio.CancelledError:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;print("worker cancelled:", worker)
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python