我实现了一段代码,它从一个队列中获取一个元素,并将相同的对象从队列列表中放入每个队列中。问题是,当我运行特定测试时,出现ValueError: task_done() called too many times
异常。这个错误发生在测试代码中,而不是在被测试的代码中。
我正在asyncio.Queue
使用协程和编程。我将每Queue.get
一个都与一个准确的Queue.task_done
电话相匹配。我正在用pytest测试代码。
我正在使用以下库:
蟒蛇 3.7
pytest==3.10.0
pytest-asyncio==0.9.0
我有两个文件:middleware.py
包含我的类实现和test_middleware.py
实现pytest测试。
文件middlware.py
:
import asyncio
class DistributorMiddleware:
def __init__(self, in_queue, out_list_queue):
self._in = in_queue
self._out = out_list_queue
async def distribute(self):
while True:
ele = await self._in.get()
count=0
for queue in self._out:
await queue.put(ele)
count+=1
print(f'inserted ele in {count}')
queue.task_done()
if ele == None:
break
for queue in self._out:
await queue.join()
文件test_middleware.py:
import pytest
import asyncio
from asyncio import Queue
from middleware import DistributorMiddleware
import random
import os
@pytest.mark.asyncio
async def test_distribution(request, event_loop):
q_list = [ Queue() for _ in range(10) ]
_in = Queue()
distrib = DistributorMiddleware(_in, q_list)
event_loop.create_task(distrib.distribute())
num_ele = random.randint(1, 10)
ele_set = set()
for _ in range(num_ele):
ele = os.urandom(4)
ele_set.add(ele)
await _in.put(ele)
await _in.put(None)
达令说
相关分类