队列上有多路复用

如何queue.Queue同时在多个对象上进行“选择” ?


Golang的频道具有所需的功能:


select {

case i1 = <-c1:

    print("received ", i1, " from c1\n")

case c2 <- i2:

    print("sent ", i2, " to c2\n")

case i3, ok := (<-c3):  // same as: i3, ok := <-c3

    if ok {

        print("received ", i3, " from c3\n")

    } else {

        print("c3 is closed\n")

    }

default:

    print("no communication\n")

}

其中第一个要解除阻塞的通道执行相应的块。我将如何在Python中实现这一目标?

更新0

根据tux21b的答案中给出的链接,所需的队列类型具有以下属性:

  • 多生产者/多消费者队列(MPMC)

  • 提供按生产者的FIFO / LIFO

  • 当队列为空/完整的消费者/生产者被阻止时

此外,渠道可能会被阻塞,生产者将阻塞,直到消费者取回该物品为止。我不确定Python的Queue是否可以做到这一点。


莫回无
浏览 230回答 3
3回答

PIPIONE

生产者-消费者队列有许多不同的实现方式,例如queue.Queue可用。它们通常具有许多不同的属性,例如Dmitry Vyukov在此出色的文章中列出的属性。如您所见,可能有超过1万种不同的组合。根据要求,用于此类队列的算法也相差很大。仅扩展现有队列算法以保证其他属性是不可能的,因为这通常需要不同的内部数据结构和不同的算法。Go的频道提供了相对较高的保证属性,因此这些频道可能适用于许多程序。最困难的要求之一是支持一次读取/阻塞多个通道(select语句),并且如果select语句中可以有多个分支可以继续进行,则要公平地选择一个通道,这样就不会留下任何消息。 。Python的queue.Queue不提供此功能,因此根本无法使用它来存档相同的行为。因此,如果要继续使用queue.Queue,则需要找到解决该问题的方法。但是,变通办法有其自身的缺点列表,并且较难维护。寻找另一个提供所需功能的生产者-消费者队列可能是一个更好的主意!无论如何,这是两个可能的解决方法:轮询while True:&nbsp; try:&nbsp; &nbsp; i1 = c1.get_nowait()&nbsp; &nbsp; print "received %s from c1" % i1&nbsp; except queue.Empty:&nbsp; &nbsp; pass&nbsp; try:&nbsp; &nbsp; i2 = c2.get_nowait()&nbsp; &nbsp; print "received %s from c2" % i2&nbsp; except queue.Empty:&nbsp; &nbsp; pass&nbsp; time.sleep(0.1)在轮询通道时,这可能会占用大量CPU周期,并且在有很多消息时可能会变慢。使用具有指数退避时间(而不是此处显示的恒定0.1秒)的time.sleep()可能会大大改善此版本。单个通知队列queue_id = notify.get()if queue_id == 1:&nbsp; i1 = c1.get()&nbsp; print "received %s from c1" % i1elif queue_id == 2:&nbsp; i2 = c2.get()&nbsp; print "received %s from c2" % i2使用此设置,您必须在发送到c1或c2之后将某些内容发送到通知队列。只要您只有一个这样的通知队列就足够了(即您没有多个“选择”,每个“选择”阻塞在您通道的不同子集上),这可能对您有用。另外,您也可以考虑使用Go。无论如何,Go的goroutines和并发支持比Python的有限线程功能强大得多。

红颜莎娜

如果使用queue.PriorityQueue,则可以使用通道对象作为优先级获得类似的行为:import threading, loggingimport random, string, timefrom queue import PriorityQueue, Emptyfrom contextlib import contextmanagerlogging.basicConfig(level=logging.NOTSET,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; format="%(threadName)s - %(message)s")class ChannelManager(object):&nbsp; &nbsp; next_priority = 0&nbsp; &nbsp; def __init__(self):&nbsp; &nbsp; &nbsp; &nbsp; self.queue = PriorityQueue()&nbsp; &nbsp; &nbsp; &nbsp; self.channels = []&nbsp; &nbsp; def put(self, channel, item, *args, **kwargs):&nbsp; &nbsp; &nbsp; &nbsp; self.queue.put((channel, item), *args, **kwargs)&nbsp; &nbsp; def get(self, *args, **kwargs):&nbsp; &nbsp; &nbsp; &nbsp; return self.queue.get(*args, **kwargs)&nbsp; &nbsp; @contextmanager&nbsp; &nbsp; def select(self, ordering=None, default=False):&nbsp; &nbsp; &nbsp; &nbsp; if default:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; channel, item = self.get(block=False)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; except Empty:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; channel = 'default'&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; item = None&nbsp; &nbsp; &nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; channel, item = self.get()&nbsp; &nbsp; &nbsp; &nbsp; yield channel, item&nbsp; &nbsp; def new_channel(self, name):&nbsp; &nbsp; &nbsp; &nbsp; channel = Channel(name, self.next_priority, self)&nbsp; &nbsp; &nbsp; &nbsp; self.channels.append(channel)&nbsp; &nbsp; &nbsp; &nbsp; self.next_priority += 1&nbsp; &nbsp; &nbsp; &nbsp; return channelclass Channel(object):&nbsp; &nbsp; def __init__(self, name, priority, manager):&nbsp; &nbsp; &nbsp; &nbsp; self.name = name&nbsp; &nbsp; &nbsp; &nbsp; self.priority = priority&nbsp; &nbsp; &nbsp; &nbsp; self.manager = manager&nbsp; &nbsp; def __str__(self):&nbsp; &nbsp; &nbsp; &nbsp; return self.name&nbsp; &nbsp; def __lt__(self, other):&nbsp; &nbsp; &nbsp; &nbsp; return self.priority < other.priority&nbsp; &nbsp; def put(self, item):&nbsp; &nbsp; &nbsp; &nbsp; self.manager.put(self, item)if __name__ == '__main__':&nbsp; &nbsp; num_channels = 3&nbsp; &nbsp; num_producers = 4&nbsp; &nbsp; num_items_per_producer = 2&nbsp; &nbsp; num_consumers = 3&nbsp; &nbsp; num_items_per_consumer = 3&nbsp; &nbsp; manager = ChannelManager()&nbsp; &nbsp; channels = [manager.new_channel('Channel#{0}'.format(i))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for i in range(num_channels)]&nbsp; &nbsp; def producer_target():&nbsp; &nbsp; &nbsp; &nbsp; for i in range(num_items_per_producer):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.sleep(random.random())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; channel = random.choice(channels)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; message = random.choice(string.ascii_letters)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; logging.info('Putting {0} in {1}'.format(message, channel))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; channel.put(message)&nbsp; &nbsp; producers = [threading.Thread(target=producer_target,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name='Producer#{0}'.format(i))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;for i in range(num_producers)]&nbsp; &nbsp; for producer in producers:&nbsp; &nbsp; &nbsp; &nbsp; producer.start()&nbsp; &nbsp; for producer in producers:&nbsp; &nbsp; &nbsp; &nbsp; producer.join()&nbsp; &nbsp; logging.info('Producers finished')&nbsp; &nbsp; def consumer_target():&nbsp; &nbsp; &nbsp; &nbsp; for i in range(num_items_per_consumer):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.sleep(random.random())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; with manager.select(default=True) as (channel, item):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if channel:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; logging.info('Received {0} from {1}'.format(item, channel))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; logging.info('No data received')&nbsp; &nbsp; consumers = [threading.Thread(target=consumer_target,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name='Consumer#{0}'.format(i))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;for i in range(num_consumers)]&nbsp; &nbsp; for consumer in consumers:&nbsp; &nbsp; &nbsp; &nbsp; consumer.start()&nbsp; &nbsp; for consumer in consumers:&nbsp; &nbsp; &nbsp; &nbsp; consumer.join()&nbsp; &nbsp; logging.info('Consumers finished')输出示例:Producer#0 - Putting x in Channel#2Producer#2 - Putting l in Channel#0Producer#2 - Putting A in Channel#2Producer#3 - Putting c in Channel#0Producer#3 - Putting z in Channel#1Producer#1 - Putting I in Channel#1Producer#1 - Putting L in Channel#1Producer#0 - Putting g in Channel#1MainThread - Producers finishedConsumer#1 - Received c from Channel#0Consumer#2 - Received l from Channel#0Consumer#0 - Received I from Channel#1Consumer#0 - Received L from Channel#1Consumer#2 - Received g from Channel#1Consumer#1 - Received z from Channel#1Consumer#0 - Received A from Channel#2Consumer#1 - Received x from Channel#2Consumer#2 - Received None from defaultMainThread - Consumers finished在这个例子中,ChannelManager只是一个包装器queue.PriorityQueue,将select方法实现contextmanager为使其看起来类似于selectGo中的语句。注意事项:定购在Go示例中,select如果有多个通道可用的数据,则在语句中写入通道的顺序确定将执行哪个通道的代码。在python示例中,顺序由分配给每个通道的优先级确定。但是,可以将优先级动态分配给每个通道(如示例中所示),因此可以使用更复杂的select方法来更改顺序,该方法将根据该方法的参数来分配新的优先级。同样,一旦上下文管理器完成,可以重新建立旧的顺序。封锁在Go示例中,select如果default存在案例,则该语句将阻塞。在python示例中,必须将boolean参数传递给该select方法,以在需要阻止/非阻止时使其清晰可见。在非阻塞情况下,上下文管理器返回的通道只是字符串,'default'因此在内部代码中很容易在with语句内部的代码中检测到此情况。线程:queue如示例中所示,模块中的对象已经为多生产者,多消费者的场景做好了准备。

SMILET

该pychan项目复制在Python围棋频道,包括复用。它实现了与Go相同的算法,因此符合您所有需要的属性:多个生产者和消费者可以通过Chan进行交流。当生产者和消费者都准备就绪时,他们对生产者和消费者按到达顺序得到服务(FIFO)空(满)队列将阻止使用者(生产者)。您的示例如下所示:c1 = Chan(); c2 = Chan(); c3 = Chan()try:&nbsp; &nbsp; chan, value = chanselect([c1, c3], [(c2, i2)])&nbsp; &nbsp; if chan == c1:&nbsp; &nbsp; &nbsp; &nbsp; print("Received %r from c1" % value)&nbsp; &nbsp; elif chan == c2:&nbsp; &nbsp; &nbsp; &nbsp; print("Sent %r to c2" % i2)&nbsp; &nbsp; else:&nbsp; # c3&nbsp; &nbsp; &nbsp; &nbsp; print("Received %r from c3" % value)except ChanClosed as ex:&nbsp; &nbsp; if ex.which == c3:&nbsp; &nbsp; &nbsp; &nbsp; print("c3 is closed")&nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; raise(完全公开:我写了这个库)
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go