手记

Python线程锁的实现


Python 线程锁的实现

Lock 的实现

锁只有两种状态,锁定或者未锁定

Lock = _allocate_lock

_allocate_lock = thread.allocate_lock

thread.allocate_lock 是用C代码实现的,代码位置 Python/thread_pthread.h

假设我们的系统支持 POSIX semaphores

首先看下 sem_init 的原型

#include <semaphore.h>int sem_init(sem_t *sem, int pshared, unsigned int value);

pshared决定了这个信号量是在进程中共享还是在线程中共享。

  • pshared 为 非零值,那么不同进程中都可以共享

  • pshared 为 零值,那么在当前进程的线程中共享。

https://svn.python.org/projects/python/trunk/Python/thread_pthread.h

PyThread_type_lockPyThread_allocate_lock(void){
    ...    /* 申请内存 */
    lock = (sem_t *)malloc(sizeof(sem_t));    if (lock) {        /*
        初始化
        value 为1,表明这个锁是 unlocked,被该进程的所有线程共享
        */
        status = sem_init(lock,0,1);
        CHECK_STATUS("sem_init");
        ....
    }
    ...
}

Acquire

// waitflag 默认为 trueintPyThread_acquire_lock(PyThread_type_lock lock, int waitflag){    int success;    sem_t *thelock = (sem_t *)lock;    int status, error = 0;

    dprintf(("PyThread_acquire_lock(%p, %d) called\n", lock, waitflag));    do {        if (waitflag)            //默认执行到这里
            status = fix_status(sem_wait(thelock));        else
            status = fix_status(sem_trywait(thelock));
    } while (status == EINTR); /* Retry if interrupted by a signal */

    if (waitflag) {
        CHECK_STATUS("sem_wait");
    } else if (status != EAGAIN) {
        CHECK_STATUS("sem_trywait");
    }

    success = (status == 0) ? 1 : 0;

    dprintf(("PyThread_acquire_lock(%p, %d) -> %d\n", lock, waitflag, success));    return success;
}

Release

voidPyThread_release_lock(PyThread_type_lock lock){    sem_t *thelock = (sem_t *)lock;    int status, error = 0;

    dprintf(("PyThread_release_lock(%p) called\n", lock));    // sem_post 是关键,释放锁
    status = sem_post(thelock);
    CHECK_STATUS("sem_post");
}

RLock 的实现

RLock表示的是 reentrant lock,如果该锁已经被获取,那么acquire 可以被同一个线程(进程)多次无阻塞调用。但是 release 必须被匹配的使用。

下面可以看到 RLock 不过是一个浅包装

def RLock(*args, **kwargs):
    return _RLock(*args, **kwargs)

RLock 内部保存了一个普通的锁(thread.allocate_lock 生成),同时保存了 这个锁的 owner,

class _RLock():
    def __init__(self):
        # 内部使用的 一个锁
        self.__block = _allocate_lock()        # __owner 用来保存 acquire 成功时的线程 id
        self.__owner = None
        # acquire被重复调用的次数
        self.__count = 0

python3 的实现

python3 会判断系统是否支持 reentrant lock,如果支持则用系统的,否则用 python 代码实现一个。


下面我们将看到,如何只用一个 Lock来实现其他的同步机制, Condition, Event, Semaphore等

Condition 的实现

多个线程可以用 condition 来等待同一个事件的发生,当一个事件发生后,所有等待的线程都可以得到通知。

一个 Condition 总是和一个锁关联在一起的。可以传递一个锁,也可以由 构造函数自己创建一个。

先看下如何使用

import loggingimport randomimport threadingimport time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-9s) %(message)s',)

queue = []def consumer(cv, q):
    logging.debug('Consumer thread started ...')    while True:        with cv:            while not q:
                logging.debug("Nothing in queue, consumer is waiting")
                cv.wait()
            num = q.pop(0)
            logging.debug("Consumed %s", num)
            time.sleep(random.randint(1,3))def producer(cv, q):
    logging.debug('Producer thread started ...')    while True:        with cv:
            nums = range(5)
            num = random.choice(nums)
            q.append(num)
            logging.debug("Produced %s", num)
            cv.notify_all()if __name__ == '__main__':
    condition = threading.Condition()    for i in range(10):
        threading.Thread(name='consumer%s' % i, target=consumer, args=(condition, queue)).start()
    pd = threading.Thread(name='producer', target=producer, args=(condition, queue))
    pd.start()

下面看如何实现

class _Condition:
    def __init__(self, lock=None, verbose=None):
        # 必须关联一个 Lock,如果没有的话,则自己创建一个 RLock
        if lock is None:
            lock = RLock()
        self.__lock = lock        # 可以在 Condition上调用 acquire() and release() 方法,实际是调用的是内部锁的方法
        self.acquire = lock.acquire
        self.release = lock.release        # 如果锁定义了 _release_save _acquire_restore _is_owned 方法,那么使用之,否则用自己定义的
        #......
        # 这个很重要,保存了等待在这个Condition上的信息
        self.__waiters = []

下面看 wait方法,为了篇幅,省略了部分代码

    def wait(self, timeout=None):
        # 必须先成功调用acquire方法,才能调用wait
        if not self._is_owned():            raise RuntimeError("cannot wait on un-acquired lock")        # 生成一个锁,并调用 acquire,使得它处于 locked 状态
        # 这个锁代表一个waiter
        waiter = _allocate_lock()
        waiter.acquire()        # 保存起来
        self.__waiters.append(waiter)
        saved_state = self._release_save()        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:                # 再次调用 acquire 方法,等待锁被释放
                waiter.acquire()                if __debug__:
                    self._note("%s.wait(): got it", self)            else:                # 。。。。。。
        finally:            # 必须恢复锁原来的状态,这个方法很重要
            self._acquire_restore(saved_state)

再看下 notify方法

    def notify(self, n=1):
        # 同样,必须得调用 acquire成功,才可以调用本方法
        if not self._is_owned():            raise RuntimeError("cannot notify on un-acquired lock")

        __waiters = self.__waiters
        waiters = __waiters[:n]        if not waiters:            if __debug__:
                self._note("%s.notify(): no waiters", self)            return
        self._note("%s.notify(): notifying %d waiter%s", self, n,
                   n!=1 and "s" or "")        for waiter in waiters:            # 调用 锁上的 release 方法,使得等待者可以继续
            waiter.release()            try:
                __waiters.remove(waiter)            except ValueError:                pass



作者:zhaoxg_cat
链接:https://www.jianshu.com/p/e84b2d201b40


0人推荐
随时随地看视频
慕课网APP