猿问

Queue.asyncio ValueError: task_done() 调用了太多次

我实现了一段代码,它从一个队列中获取一个元素,并将相同的对象从队列列表中放入每个队列中。问题是,当我运行特定测试时,出现ValueError: task_done() called too many times异常。这个错误发生在测试代码中,而不是在被测试的代码中。

我正在asyncio.Queue使用协程和编程。我将每Queue.get一个都与一个准确的Queue.task_done电话相匹配。我正在用pytest测试代码。

我正在使用以下库:

  • 蟒蛇 3.7

  • pytest==3.10.0

  • pytest-asyncio==0.9.0

我有两个文件:middleware.py包含我的类实现和test_middleware.py实现pytest测试。

文件middlware.py

import asyncio


class DistributorMiddleware:


    def __init__(self, in_queue, out_list_queue):

        self._in = in_queue

        self._out = out_list_queue


    async def distribute(self):


        while True:

            ele = await self._in.get()

            count=0

            for queue in self._out:

                await queue.put(ele)

                count+=1

                print(f'inserted ele in {count}')

            queue.task_done()

            if ele == None:

                break

        for queue in self._out:

            await queue.join()

文件test_middleware.py:


import pytest

import asyncio                

from asyncio import Queue

from middleware import DistributorMiddleware

import random

import os



@pytest.mark.asyncio                                                                                     

async def test_distribution(request, event_loop):                                                        

    q_list = [ Queue() for _ in range(10) ]                                                              

    _in = Queue()

    distrib = DistributorMiddleware(_in, q_list)                                                         

    event_loop.create_task(distrib.distribute())                                                         

    num_ele = random.randint(1, 10)

    ele_set = set()

    for _ in range(num_ele):                                                                             

        ele = os.urandom(4)                                                                              

        ele_set.add(ele)

        await _in.put(ele)

    await _in.put(None)                                                                                  

qq_笑_17
浏览 786回答 1
1回答

达令说

您的代码中有错误。实际上,queue.task_done()应该只在从队列中取出元素时调用,而不是在将它们放入队列时调用。但是您的中间件类正在它刚刚使用的队列上调用它.put(),用于self._out列表中的最后一个队列;queue.task_done()从DistributorMiddleware.distribute()以下位置删除呼叫:async def distribute(self):    while True:        ele = await self._in.get()        count=0        for queue in self._out:            await queue.put(ele)            count+=1            print(f'inserted ele in {count}')        queue.task_done()        # ^^^^^ you didn't take anything from the queue here!当您删除该行时,您的测试就通过了。您在测试中看到异常的原因是因为只有这样队列才知道task_done()被调用得太频繁了。该queue.task_done()呼叫DistributorMiddleware.distribute()减1,未完成的任务计数器,但只有当该计数器下降到低于零能的异常进行检测。只有当最后一个任务从 中的队列中取出时,您才会到达那个点test_distribution(),此时未完成的任务计数器至少提前一步达到 0。也许那是为了改为调用self._in.task_done()?您只是在该while循环中从该队列中获取了一个元素:async def distribute(self):    while True:        ele = await self._in.get()        # getting an element from self._in        count=0        for queue in self._out:            await queue.put(ele)            count+=1            print(f'inserted ele in {count}')        self._in.task_done()        # done with ele, so decrement the self._in unfinished tasks counter
随时随地看视频慕课网APP

相关分类

Python
我要回答