手记

Python并行编程(四):多线程同步之condition(条件变量)实现带有缓冲区的生产者-消费者模型

什么是Condtion?

所谓condition条件变量,即这种机制是在满足了特定的条件后,线程才可以访问相关的数据。 这种同步机制就是一个线程等待特定的条件,另一个线程通知它条件已经发生。一旦条件发生,该线程就会获取锁,从而独占共享资源的访问。
Condition包含以下部分:

  • c.acquire(*args):获取底层锁。此方法将调用底层锁上对应的acquire(*args)方法。
  • c.release():释放底层锁。此方法将调用底层锁上对应的release()方法
  • c.wait(timeout):等待直到获取通知或出现超时为止。此方法在调用线程已经获取锁之后调用。
    调用时,将释放底层锁,而且线程将进入睡眠状态,直到另一个线程在条件变量上执行notify()notify_all()方法将其唤醒为止。在线程被唤醒后,线程讲重新获取锁,方法也会返回。timeout是浮点数,单位为秒。
    如果超时,线程将被唤醒,重新获取锁,而控制将被返回。
  • c.notify(n):唤醒一个或多个等待此条件变量的线程。此方法只会在调用线程已经获取锁之后调用,
    而且如果没有正在等待的线程,它就什么也不做。
    n指定要唤醒的线程数量,默认为1.被唤醒的线程在它们重新获取锁之前不会从wait()调用返回。
  • c.notify_all():唤醒所有等待此条件的线程。

通俗的解释:

Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的 acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则 wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复 这一过程,从而解决复杂的同步问题。
可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对 象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify 方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁。
Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock。

带有缓冲区的生产者-消费者模型

我们可以根据所谓的wait池构建一个带有缓冲区的生产者-消费者模型,即缓冲区好比一个仓库,生产者可以不断生产商品知道仓库装满,然后告知消费者消费,而消费者也可以判断仓库是否满了从而告知生产者继续生产商品:


import threading
import time

# 假设商品数量
goods = 0

condition = threading.Condition()


def consumer():
    global goods
    while True:
        condition.acquire()
        if goods <= 0:
            # 仓库空了,即特定条件满足了,通知生产者生产
            condition.notify()
            condition.wait()
        time.sleep(2)
        goods -= 1
        print('consume 1, left {}'.format(goods))
        time.sleep(2)

        condition.release()


def producer():
    global goods
    while True:
        condition.acquire()
        if goods >= 5:
            # 仓库满了,即特定条件满足了,通知消费者消费
            condition.notify()
            condition.wait()

        time.sleep(2)
        goods += 1
        print('produce 1, already {}'.format(goods))
        time.sleep(2)

        condition.release()


if __name__ == '__main__':
    thread_consumer = threading.Thread(target=consumer)
    thread_producer = threading.Thread(target=producer)

    thread_consumer.start()
    thread_producer.start()

    thread_consumer.join()
    thread_producer.join()

    print('consumer-producer example end.')

运行截图如下:

2人推荐
随时随地看视频
慕课网APP