Python 线程锁的实现
Lock 的实现
锁只有两种状态,锁定或者未锁定
Lock = _allocate_lock _allocate_lock = thread.allocate_lock
thread.allocate_lock
是用C代码实现的,代码位置 Python/thread_pthread.h
假设我们的系统支持 POSIX
semaphores
首先看下 sem_init
的原型
#include <semaphore.h>int sem_init(sem_t *sem, int pshared, unsigned int value);
pshared
决定了这个信号量是在进程中共享还是在线程中共享。
pshared
为 非零值,那么不同进程中都可以共享pshared
为 零值,那么在当前进程的线程中共享。
https://svn.python.org/projects/python/trunk/Python/thread_pthread.h
PyThread_type_lockPyThread_allocate_lock(void){ ... /* 申请内存 */ lock = (sem_t *)malloc(sizeof(sem_t)); if (lock) { /* 初始化 value 为1,表明这个锁是 unlocked,被该进程的所有线程共享 */ status = sem_init(lock,0,1); CHECK_STATUS("sem_init"); .... } ... }
Acquire
// waitflag 默认为 trueintPyThread_acquire_lock(PyThread_type_lock lock, int waitflag){ int success; sem_t *thelock = (sem_t *)lock; int status, error = 0; dprintf(("PyThread_acquire_lock(%p, %d) called\n", lock, waitflag)); do { if (waitflag) //默认执行到这里 status = fix_status(sem_wait(thelock)); else status = fix_status(sem_trywait(thelock)); } while (status == EINTR); /* Retry if interrupted by a signal */ if (waitflag) { CHECK_STATUS("sem_wait"); } else if (status != EAGAIN) { CHECK_STATUS("sem_trywait"); } success = (status == 0) ? 1 : 0; dprintf(("PyThread_acquire_lock(%p, %d) -> %d\n", lock, waitflag, success)); return success; }
Release
voidPyThread_release_lock(PyThread_type_lock lock){ sem_t *thelock = (sem_t *)lock; int status, error = 0; dprintf(("PyThread_release_lock(%p) called\n", lock)); // sem_post 是关键,释放锁 status = sem_post(thelock); CHECK_STATUS("sem_post"); }
RLock 的实现
RLock表示的是 reentrant lock,如果该锁已经被获取,那么acquire
可以被同一个线程(进程)多次无阻塞调用。但是 release
必须被匹配的使用。
下面可以看到 RLock 不过是一个浅包装
def RLock(*args, **kwargs): return _RLock(*args, **kwargs)
RLock 内部保存了一个普通的锁(thread.allocate_lock 生成),同时保存了 这个锁的 owner,
class _RLock(): def __init__(self): # 内部使用的 一个锁 self.__block = _allocate_lock() # __owner 用来保存 acquire 成功时的线程 id self.__owner = None # acquire被重复调用的次数 self.__count = 0
python3 的实现
python3 会判断系统是否支持 reentrant lock,如果支持则用系统的,否则用 python 代码实现一个。
下面我们将看到,如何只用一个 Lock来实现其他的同步机制, Condition, Event, Semaphore等
Condition 的实现
多个线程可以用 condition 来等待同一个事件的发生,当一个事件发生后,所有等待的线程都可以得到通知。
一个 Condition 总是和一个锁关联在一起的。可以传递一个锁,也可以由 构造函数自己创建一个。
先看下如何使用
import loggingimport randomimport threadingimport time logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-9s) %(message)s',) queue = []def consumer(cv, q): logging.debug('Consumer thread started ...') while True: with cv: while not q: logging.debug("Nothing in queue, consumer is waiting") cv.wait() num = q.pop(0) logging.debug("Consumed %s", num) time.sleep(random.randint(1,3))def producer(cv, q): logging.debug('Producer thread started ...') while True: with cv: nums = range(5) num = random.choice(nums) q.append(num) logging.debug("Produced %s", num) cv.notify_all()if __name__ == '__main__': condition = threading.Condition() for i in range(10): threading.Thread(name='consumer%s' % i, target=consumer, args=(condition, queue)).start() pd = threading.Thread(name='producer', target=producer, args=(condition, queue)) pd.start()
下面看如何实现
class _Condition: def __init__(self, lock=None, verbose=None): # 必须关联一个 Lock,如果没有的话,则自己创建一个 RLock if lock is None: lock = RLock() self.__lock = lock # 可以在 Condition上调用 acquire() and release() 方法,实际是调用的是内部锁的方法 self.acquire = lock.acquire self.release = lock.release # 如果锁定义了 _release_save _acquire_restore _is_owned 方法,那么使用之,否则用自己定义的 #...... # 这个很重要,保存了等待在这个Condition上的信息 self.__waiters = []
下面看 wait方法,为了篇幅,省略了部分代码
def wait(self, timeout=None): # 必须先成功调用acquire方法,才能调用wait if not self._is_owned(): raise RuntimeError("cannot wait on un-acquired lock") # 生成一个锁,并调用 acquire,使得它处于 locked 状态 # 这个锁代表一个waiter waiter = _allocate_lock() waiter.acquire() # 保存起来 self.__waiters.append(waiter) saved_state = self._release_save() try: # restore state no matter what (e.g., KeyboardInterrupt) if timeout is None: # 再次调用 acquire 方法,等待锁被释放 waiter.acquire() if __debug__: self._note("%s.wait(): got it", self) else: # 。。。。。。 finally: # 必须恢复锁原来的状态,这个方法很重要 self._acquire_restore(saved_state)
再看下 notify方法
def notify(self, n=1): # 同样,必须得调用 acquire成功,才可以调用本方法 if not self._is_owned(): raise RuntimeError("cannot notify on un-acquired lock") __waiters = self.__waiters waiters = __waiters[:n] if not waiters: if __debug__: self._note("%s.notify(): no waiters", self) return self._note("%s.notify(): notifying %d waiter%s", self, n, n!=1 and "s" or "") for waiter in waiters: # 调用 锁上的 release 方法,使得等待者可以继续 waiter.release() try: __waiters.remove(waiter) except ValueError: pass
作者:zhaoxg_cat
链接:https://www.jianshu.com/p/e84b2d201b40