红颜莎娜
如果使用queue.PriorityQueue,则可以使用通道对象作为优先级获得类似的行为:import threading, loggingimport random, string, timefrom queue import PriorityQueue, Emptyfrom contextlib import contextmanagerlogging.basicConfig(level=logging.NOTSET, format="%(threadName)s - %(message)s")class ChannelManager(object): next_priority = 0 def __init__(self): self.queue = PriorityQueue() self.channels = [] def put(self, channel, item, *args, **kwargs): self.queue.put((channel, item), *args, **kwargs) def get(self, *args, **kwargs): return self.queue.get(*args, **kwargs) @contextmanager def select(self, ordering=None, default=False): if default: try: channel, item = self.get(block=False) except Empty: channel = 'default' item = None else: channel, item = self.get() yield channel, item def new_channel(self, name): channel = Channel(name, self.next_priority, self) self.channels.append(channel) self.next_priority += 1 return channelclass Channel(object): def __init__(self, name, priority, manager): self.name = name self.priority = priority self.manager = manager def __str__(self): return self.name def __lt__(self, other): return self.priority < other.priority def put(self, item): self.manager.put(self, item)if __name__ == '__main__': num_channels = 3 num_producers = 4 num_items_per_producer = 2 num_consumers = 3 num_items_per_consumer = 3 manager = ChannelManager() channels = [manager.new_channel('Channel#{0}'.format(i)) for i in range(num_channels)] def producer_target(): for i in range(num_items_per_producer): time.sleep(random.random()) channel = random.choice(channels) message = random.choice(string.ascii_letters) logging.info('Putting {0} in {1}'.format(message, channel)) channel.put(message) producers = [threading.Thread(target=producer_target, name='Producer#{0}'.format(i)) for i in range(num_producers)] for producer in producers: producer.start() for producer in producers: producer.join() logging.info('Producers finished') def consumer_target(): for i in range(num_items_per_consumer): time.sleep(random.random()) with manager.select(default=True) as (channel, item): if channel: logging.info('Received {0} from {1}'.format(item, channel)) else: logging.info('No data received') consumers = [threading.Thread(target=consumer_target, name='Consumer#{0}'.format(i)) for i in range(num_consumers)] for consumer in consumers: consumer.start() for consumer in consumers: consumer.join() logging.info('Consumers finished')输出示例:Producer#0 - Putting x in Channel#2Producer#2 - Putting l in Channel#0Producer#2 - Putting A in Channel#2Producer#3 - Putting c in Channel#0Producer#3 - Putting z in Channel#1Producer#1 - Putting I in Channel#1Producer#1 - Putting L in Channel#1Producer#0 - Putting g in Channel#1MainThread - Producers finishedConsumer#1 - Received c from Channel#0Consumer#2 - Received l from Channel#0Consumer#0 - Received I from Channel#1Consumer#0 - Received L from Channel#1Consumer#2 - Received g from Channel#1Consumer#1 - Received z from Channel#1Consumer#0 - Received A from Channel#2Consumer#1 - Received x from Channel#2Consumer#2 - Received None from defaultMainThread - Consumers finished在这个例子中,ChannelManager只是一个包装器queue.PriorityQueue,将select方法实现contextmanager为使其看起来类似于selectGo中的语句。注意事项:定购在Go示例中,select如果有多个通道可用的数据,则在语句中写入通道的顺序确定将执行哪个通道的代码。在python示例中,顺序由分配给每个通道的优先级确定。但是,可以将优先级动态分配给每个通道(如示例中所示),因此可以使用更复杂的select方法来更改顺序,该方法将根据该方法的参数来分配新的优先级。同样,一旦上下文管理器完成,可以重新建立旧的顺序。封锁在Go示例中,select如果default存在案例,则该语句将阻塞。在python示例中,必须将boolean参数传递给该select方法,以在需要阻止/非阻止时使其清晰可见。在非阻塞情况下,上下文管理器返回的通道只是字符串,'default'因此在内部代码中很容易在with语句内部的代码中检测到此情况。线程:queue如示例中所示,模块中的对象已经为多生产者,多消费者的场景做好了准备。