我试图用 asyncio 队列运行这个简单的代码,但捕获异常,甚至嵌套异常。
我想获得一些帮助,使 asyncio 中的队列正常工作:
import asyncio, logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("asyncio").setLevel(logging.WARNING)
num_workers = 1
in_queue = asyncio.Queue()
out_queue = asyncio.Queue()
tasks = []
async def run():
for request in range(1):
await in_queue.put(request)
# each task consumes from 'input_queue' and produces to 'output_queue':
for i in range(num_workers):
tasks.append(asyncio.create_task(worker(name=f'worker-{i}')))
# tasks.append(asyncio.create_task(saver()))
print('waiting for queues...')
await in_queue.join()
# await out_queue.join()
print('all queues done')
for task in tasks:
task.cancel()
print('waiting until all tasks cancelled')
await asyncio.gather(*tasks, return_exceptions=True)
print('done')
async def worker(name):
while True:
try:
print(f"{name} started")
num = await in_queue.get()
print(f'{name} got {num}')
await asyncio.sleep(0)
# await out_queue.put(num)
except Exception as e:
print(f"{name} exception {e}")
finally:
print(f"{name} ended")
in_queue.task_done()
async def saver():
while True:
try:
print("saver started")
num = await out_queue.get()
print(f'saver got {num}')
await asyncio.sleep(0)
print("saver ended")
except Exception as e:
print(f"saver exception {e}")
finally:
out_queue.task_done()
asyncio.run(run(), debug=True)
print('Done!')
输出:
waiting for queues...
worker-0 started
worker-0 got 0
worker-0 ended
worker-0 started
worker-0 exception
worker-0 ended
这是基本流程,我稍后想做的是在更多工作人员上运行更多请求,其中每个工作人员将把数字从 移动in_queue到out_queue,然后保护程序将从out_queue.
相关分类