代码实现
看完了设计模式的原理,我们下面来试着用代码来实现一下,
由于这个设计模式非常简单,这个代码并不长只有几行:
from queue import Queue
from threading import Thread
def producer(que):
data = 0
while True:
data += 1
que.put(data)
def consumer(que):
while True:
data = que.get()
print(data)
que = Queue()
t1 = Thread(target=consumer, args=(que, ))
t2 = Thread(target=producer, args=(que, ))
t1.start()
t2.start()
我们运行一下就会发现它是可行的,并且由于队列先进先出的限制,可以保证了consumer线程读取到的内容的顺序和producer生产的顺序是一致的。
如果我们运行一下这个代码会发现它是不会结束的,因为consumer和producer当中都用到了while True构建的死循环,假设我们希望可以控制程序的结束,应该怎么办?
其实也很简单,我们也可以利用队列。我们创建一个特殊的信号量,约定好当consumer接受到这个特殊值的时候就停止程序。这样当我们要结束程序的时候,我们只需要把这个信号量加入队列即可。
singal = object()
def producer(que):
data = 0
while data < 20:
data += 1
que.put(data)
que.put(singal)
def consumer(que):
while True:
data = que.get()
if data is singal:
# 继续插入singal
que.put(singal)
break
print(data)
这里有一个细节是我们在consumer当中,当读取到singal的时候,在跳出循环之前我们又把singal放回了队列。原因也很简单,因为有时候consumer线程不止一个,这个singal上游只放置了一个,只会被一个线程读取进来,其他线程并不会知道已经获得了singal的消息,所以还是会继续执行。
而当consumer关闭之前放入singal就可以保证每一个consumer在关闭的之前都会再传递一个结束的信号给其他未关闭的consumer读取。这样一个一个的传递,就可以保证所有consumer都关闭。
这里还有一个小细节,虽然利用队列可以解决生产者和消费者通信的问题,但是上游的生产者并不知道下游的消费者是否已经执行完成了。假如我们想要知道,应该怎么办?
Python的设计者们也考虑到了这个问题,所以他们在Queue这个类当中加入了task_done和join方法。利用task_done,消费者可以通知queue这一个任务已经执行完成了。而通过调用join,可以等待所有的consumer完成。
from queue import Queue
from threading import Thread
def producer(que):
data = 0
while data < 20:
data += 1
que.put(data)
def consumer(que):
while True:
data = que.get()
print(data)
que.task_done()
que = Queue()
t1 = Thread(target=consumer, args=(que, ))
t2 = Thread(target=producer, args=(que, ))
t1.start()
t2.start()
que.join()
除了使用task_done之外,我们还可以在que传递的消息当中加入一个Event,这样我们还可以继续感知到每一个Event执行的情况。
优先队列与其他设置
我们之前在介绍一些分布式调度系统的时候曾经说到过,在调度系统当中,调度者会用一个优先队列来管理所有的任务。当有机器空闲的时候,会有限调度那些优先级高的任务。
其实这个调度系统也是基于我们刚才介绍的生产消费者模型开发的,只不过将调度队列从普通队列换成了优先队列而已。所以如果我们也希望我们的consumer能够根据任务的优先级来改变执行顺序的话,也可以使用优先队列来进行管理任务。
关于优先队列的实现我们已经很熟悉了,但是有一个问题是我们需要实现挂起等待的阻塞功能。这个我们自己实现是比较麻烦的,但好在我们可以通过调用相关的库来实现。比如threading中的Condition,Condition是一个条件变量可以通知其他线程,也可以实现挂起等待。
from threading import Thread, Condition
class PriorityQueue:
def __init__(self):
self._queue = []
self._cv = Condition()
def put(self, item, priority):
with self._cv:
heapq.heappush(self._queue, (-priority, self._count, item))
# 通知下游,唤醒wait状态的线程
self._cv.notify()
def get(self):
with self._cv:
# 如果对列为空则挂起
while len(self._queue) == 0:
self._cv.wait()
# 否则返回优先级最大的
return heapq.heappop(self._queue)[-1]
最后介绍一下Queue的其他设置,比如我们可以通过size参数设置队列的大小,由于这是一个阻塞式队列,所以如果我们设置了队列的大小,那么当队列被装满的时候,往其中插入数据的操作也会被阻塞。此时producer线程会被挂起,一直到队列不再满为止。
当然我们也可以通过block参数将队列的操作设置成非阻塞。比如que.get(block=False),那么当队列为空的时候,将会抛出一个队列为空的异常。同样,que.put(data, block=False)时也一样会得到一个队列已满的异常。
总结
比如kafka等消息系统,以及yarn等调度系统等等,几乎只要是涉及到多线程上下游通信的,往往都会用到。也正因此它的使用场景太广了,所以它经常在各种面试当中出现,也可以认为是工程师必须知道的几种基础设计模式之一
另外,队列也是一个在设计模式以及使用场景当中经常出现的数据结构。从侧面也说明了,为什么算法和数据结构非常重要,许多大公司喜欢问一些算法题,也是因为有实际的使用场景,并且的的确确能锻炼工程师的思维能力。经常有同学问我算法和数据结构的使用案例,这就是一个很好的例子。