手记

【必看】Python 的多进程编程(一)

原文出处

在并发编程的时候,多线程和多进程是经常会被使用的两种模式(此外还有协程等)。由于 CPython 的 GIL 限制(Jython 和 IronPython 没有 GIL,PyPy 在尝试去掉 GIL),只有获取了 GIL 的线程才能使用 CPU,所以除了需要处理一些可能会阻塞的 IO(读写文件、访问网络等)之外,基本没人会去使用 CPython 的多线程。因此,本文就来说说更有用的 Python 多进程编程。

注:

  1. 本文描述的环境为 Linux 操作系统(也适用于大部分 POSIX 系统)下的 CPython,可能不适用于 Windows
    操作系统或其他的 Python 实现。

  2. 为了避免歧义,下文用「主进程」或「当前进程」表示创建子进程的那个进程,而不使用「父进程」,除非明确指明了「xx 的父进程」。

与多线程编程时需要传递一个 callable 对象不同的是,多进程编程的时候,是将主进程复制到子进程,并不能直接要求子进程执行某个 callable 对象。

在 POSIX 系统中,这个复制操作是由 clone() 和 fork() 系统调用来完成的,一般主要使用后者。
如果 fork() 执行成功的话,会分别在主进程和子进程中返回子进程的 PID 和 0,然后执行代码就开始不同了。如果失败的话(内存不够、PID 达到上限等),子进程就不会创建,主进程会返回 -1,errno 会被设置为对应的错误码。
在 CPython 的实现中,os.fork() 主要是对 fork() 函数的封装,差别在于失败时抛出 OSError,该异常的 errno 属性是对应的错误码。

因此,多进程的 Python 代码大致会长成这样:

import os

try:
    pid = os.fork()
    if pid == 0:  # 子进程
        # 子进程的代码
    else:  # 主进程
        # 主进程的代码
except OSError:
    # 主进程处理 fork 失败的代码

如果想在子进程中执行一个 callable 的对象,直接在子进程的代码中调用它就行了。但是因为子进程是主进程的拷贝,所以并不是一个干净的环境,还是可能会出现一些问题的。

于是再来说明一下 fork() 函数有哪些注意事项吧:

  • 设置子进程自己的 PID,将其 PPID 设为主进程的 PID,而 PGID、SID、UID、GID、CWD 和 UMASK
    则保持和主进程一致。

  • 复制主进程的所有地址空间到子进程。也就是说,子进程可以直接使用主进程的变量,包括 import过的模块等。但是这种复制是写时拷贝(copy on write),也就是一旦你对它进行了改动,它和主进程中的变量就不是同一个了,因此表现出来的就像是值拷贝一样。

  • 复制主进程的环境变量到子进程。

  • 复制主进程的所有文件描述符(file descriptors)到子进程。文件描述符的状态在这些进程间是共享的,所以在一个进程中读写文件,会影响另一个进程的文件打开位置。但是在一个进程中关闭文件描述符,并不影响另一个进程对应的文件描述符。
  • 复制主进程的信号处理函数(由 signal.signal() 注册)、信号唤醒 fd(由signal.set_wakeup_fd()
    设置),是否重新启动被信号中断的系统调用(由signal.siginterrupt() 设置),以及阻塞哪些信号(由signal.pthread_sigmask() 设置)到子进程。但是,定时器(由signal.setitimer() 设置)只对当前进程有效,不复制到子进程。

  • 保留通过 atexit() 和 on_exit() 注册的函数。
  • 对于多线程程序而言,只有调用 fork() 的线程被复制到子进程,变成一个单线程的新进程。

  • 异步 IO 被取消。

一般来说,子进程会想要一个相对干净的环境,因此可以在 fork() 之后关闭不需要的 fd,取消注册不需要的信号处理函数,以及设置新的 SID 等。守护进程基本就是这样实现的。

有几个特殊的 fd 是需要特别注意的:0、1 和 2,它们分别代表 stdin、stdout 和 stderr。
如果不关闭的话,子进程就会使用主进程的标准输入输出。这有可能会带来麻烦,因此在子进程中,经常会将它们重定向到 /dev/null 或磁盘文件。
Python 的 sys 模块中定义了 stdin、stdout 和 stderr 这 3 个变量。如果只是想临时性地重定向它们,将其赋值为新的文件对象即可;如果要恢复的话,sys 模块中还定义了 stdinstdoutstderr 这 3 个变量,将其赋值回来即可:

try:
    with open('/tmp/stdout', 'wb') as stdout:
        sys.stdout = stdout
        print 'this will be written to file'
finally:
    sys.stdout = sys.__stdout__

但这样做只在 Python 中有效,一旦子进程跳出了 Python 解释器的范围,这个设置就失效了。
在 C 语言中,正确的做法是关闭该 fd,打开一个新的 fd,将其 dup2() 到原 fd,再关闭新的 fd。而在 Python 中,上述的几个文件变量的 close() 方法并不会关闭对应的 fd,而只是将这个文件对象标记为 closed,对应的 fd 设为 None。于是只能模拟 C 的实现了:

def redirect_fd(fd, path, mode):
    try:
        os.close(fd)
    except OSError:
        pass
    new_fd = os.open(path, mode)
    if new_fd != fd:
        os.dup2(new_fd, fd)
        os.close(new_fd)

redirect_fd(0, os.devnull, os.O_RDONLY)
redirect_fd(1, '/tmp/stdout', os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
redirect_fd(2, '/tmp/stderr', os.O_WRONLY | os.O_CREAT | os.O_TRUNC)

另一种常见的做法是打开一个管道(PIPE),将其作为子进程的输入输出,使得主进程可以很容易地利用管道与子进程通信:

r, w = os.pipe()
pid = os.fork()
if pid == 0:
    os.close(w)  # 如果有必要的话,可以重定向到标准输入输出
    print os.read(r, 1024)
else:
    os.close(r)
    os.write(w, 'test')

这种做法非常常见,Linux 的 popen() 函数也是据此实现的,Python 也提供了基于此原理的 os.popen() 函数和 subprocess 模块。

除了标准输入输出外,还需要特别注意有没有使用一些网络库,例如 Requests 的 session 需要 close() 才会回收连接池,logging 模块中的 handlers 需要 close() 才会关闭 fd。也有一种粗暴的做法,就是将所有 fd
一股脑全关闭:
MAX_FD = os.sysconf("SC_OPEN_MAX") # 或者 resource.getrlimit(resource.RLIMIT_NOFILE)[0]
os.closerange(3, MAX_FD)

再来说下信号处理。

信号(signal)是操作系统提供的一种进程间异步通知的方式。信号可以由 kill() 系统调用(Python 将其封装成了 os.kill() 函数)产生,也可能由内核根据事件触发。

内核给进程发出信号后,该进程会中断正常的执行流,转向执行该信号的处理函数(signal handler);执行完毕后,又回到正常的执行流。要注意的是,所有非原子的指令(可以理解为单条机器指令)都可以被信号中断,因此在执行信号处理函数时,也可能会收到新的信号,从而再次中断该进程,执行新的信号处理函数。

而在 Python 环境下,信号处理函数是被封装过的,并不会被直接执行。Python 解释器在接收到信号后,只是将其处理函数放到一个队列里,并标记有函数等待执行就返回了。当 Python 解释器开始执行下一条字节码指令时,会检查有没有需要执行的函数,然后看是不是在主线程中。是的话,就执行信号处理函数;否则就进行线程切换,然后继续之前的检查。这也就意味着在 Python 环境下,信号只能在 Python 解释器的原子指令(可以理解为单条 Python 字节码指令)间中断,所以 C 函数的调用、获取列表的 slice 这种可能会执行很久的操作也不会被自定义的信号处理函数中断,并且只有主线程才会执行信号处理函数。

每个线程有个自己的信号掩码(signal mask)。信号产生后,内核会设置对应进程(或线程)的该信号位有信号了。如果进程暂时不想处理这个信号的话,可以通过 sigprocmask() 或 pthread_sigmask() 函数设置自己的信号掩码(signal mask)。内核检查到这个信号掩码被设置了,就会阻塞(block)该信号,而将其置于 pending 状态。此外,当正在执行信号处理函数时,内核会自动阻塞掉该信号,以避免重复调用,除非这个信号的 sigaction.sa_mask 设置了 SA_NODEFER 位。当进程解除阻塞信号时,处于 pending 状态的信号会继续传达给该进程。Python 3.3 起提供了 signal.pthread_sigmask() 函数来封装这个系统调用。

进程可以通过 sigaction() 或 signal() 系统调用来设置信号的处理函数,Python 中则将其封装成了 signal.signal() 函数。
信号处理函数应该写成可重入的(reentrant,也就是可以在执行的过程中被任意中断,重新执行它不会有任何问题)。假如进程在执行时被信号中断了,它原本依赖的一些数据无征兆地被信号处理函数改写了,这个进程就可能就会出现各种未定义的 bug。
要避免这个问题的话,你有两种选择:

  1. 在信号处理函数中,只调用 async-signal-safe 的函数,且对于全局变量(含静态变量)的操作是可重入的。

  2. 在整个程序中,一旦调用非 async-signal-safe的函数,或者操作可能被信号处理函数使用的全局变量时,先阻塞信号(即不会有信号处理函数被调用)。

一般来说,你肯定会选择前者,否则你需要写很多防御性的代码。

更安全的做法是在信号处理函数中不调用任何函数,只改动 volatile std::sig_atomic_t 类型的全局变量。其中,volatile 会告诉编译器这个变量可能在任意时间被当前作用域之外的代码改动,不能对其做某些优化(比如编译器已经将这个变量的值放入寄存器了,编译器会假定寄存器中的值和内存中的值是一样的,它并不能从当前的代码中预知这个值可能已经被信号处理函数或其他线程修改过了);sig_atomic_t 则是 C++ 定义的一个 typedef,一般会用 int 来实现,主要是能保证对这个变量的操作是原子性的(例如在 32 位的系统上操作一个 64 位的整数,可能会分成 2 条指令来执行)。

虽然 Python 的信号处理函数会被延迟调用,但要求基本也是差不多的。而且 Python 并没有办法定义 volatile 的全局变量,使用锁也不一定是安全的(因为信号处理函数是在主线程执行的,而被中断的线程可能就是主线程),因此最好不要操作全局变量。如果想输出点什么的话,使用 logging 模块是不安全的;实在需要的话,os.write()、socket.send() 和 socket.sendto() 是安全的,但是要自行处理被信号中断而导致数据没有完整写入的情况。

除了自定义信号处理函数外,还有两个常量可以使用:SIG_IGN 和 SIG_DFL,前者表示忽略该信号,后者表示使用系统默认的行为。

信号的默认处理行为各不相同,有些会被忽略(如 SIGCHLD),有些会导致进程停止(如 SIGSTOP),有的可以使进程恢复运行(如 SIGCONT),有些会终止进程(如 SIGTERM)等。大多数的信号都可以通过设置信号处理函数来捕获,但是 SIGSTOP 和 SIGKILL 是不能被捕获、忽略和阻塞的。

需要注意的是,系统调用也是可能被信号中断的,这可能会导致一些不必要的麻烦。Linux 允许在调用 sigaction() 时,设置 sa_flags 的 SA_RESTART 位;这样当被该信号中断时,执行完信号处理函数后,会自动重试这个系统调用。但这个重试机制只对部分系统调用有效,具体可以看 signal 手册的《Interruption of system calls and library functions by signal handlers》部分。
此外,Linux 还提供了 siginterrupt() 函数来修改这个设置。尽管该函数已经被摒弃了,但因为 Python 在调用 signal.signal() 时不设置 SA_RESTART,所以只好提供一个 signal.siginterrupt() 函数来曲线救国:

signal.signal(signal.SIGALRM, handle_alarm)
signal.siginterrupt(signal.SIGALRM, False)

当然,捕捉系统调用的异常,看 errno 是否为 EINTR,再自行重试也是可以的(下文会展示其代码)。

有了以上知识,就已经能正确处理子进程了。但是这样的子进程只能执行 Python 代码,并不是很够用。

为此 Linux 提供了 exec() 这一系列的函数,用于执行一个可执行文件。这个家族一共有 6 个函数,差别在于是否传递环境变量,是否使用 PATH 环境变量来定位可执行文件,以及参数是用数组还是不定参数。Python 则将其封装成了 8 个 os.exec() 函数。
成功执行该类函数后,当前进程的镜像(image)将被替换为新的进程的镜像,继承原进程的 UID 和 GID 等,需要注意的主要有:
打开的文件描述符不会被刷新。

  • 设置的信号处理函数恢复为 SIG_DFL,但被设为 SIG_IGN 的保持不变。信号的阻塞也保持不变。
  • 定时器被取消。

  • 共享内存被取消映射。

  • 通过 atexit() 和 on_exit() 注册的函数被取消。

  • 异步 IO 被取消。

除当前线程外,该进程的其他线程都被销毁。

值得一提的是,os.execv() 的第二个参数 args 列表的第 0 号元素和 C 语言的 main() 函数一样,表示进程的执行文件名,但大部分程序其实会忽略它。例如执行 ls /,这两种写法都可以:

os.execv('/bin/ls', ['/bin/ls', '/'])
os.execv('/bin/ls', ['balabala', '/'])

对于较长的命令,解析参数会比较麻烦,可以用 shlex.split() 函数来分割。
如果要省事的话,也可以直接使用 sh -c 来执行:

os.execv('/bin/sh', ['/bin/sh', '-c', 'ls / && sleep 1 && ls /tmp'])

弊端就是多起了一个 sh 进程,而且如果这个 sh 进程意外挂掉的话,是拿不到它的子进程的退出状态的,甚至可能丧失了对其的控制(因为不知道子进程的 PID)。

另外,可以用 fcntl 设置文件描述符的 close-on-exec 标志,这样在执行 exec*() 家族的系统调用后,该文件描述符会被自动关闭:

flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)

因为 fork() + exec() 是个很常用的模式,Linux 将其封装成了 posix_spawn() 函数,此外,Linux 的 system() 函数、daemon() 函数和 Python 的 subprocess 模块都是用这个模式实现的。

再来看如何处理子进程的退出。

子进程执行结束时,会进入僵尸(zombie)状态,它的父进程会收到 SIGCHLD 信号。僵尸状态的子进程其实已经执行结束了,所以用 kill -SIGKILL 也是杀不掉的,只能由它的收割者(reaper)通过 waitpid()、wait() 和 waitid() 系统调用来获取子进程的退出状态并清理。这个收割者一般是该进程的父进程,如果这个父进程先退出了,它的父进程会被操作系统重设为 1 号进程(Linux 下为 init 进程)。

在 Python 中,这些系统调用被封装成了 os.wait() 和 os.waitpid()。

其中,os.wait() 不接受参数,等待直到任意一个子进程结束。os.waitpid() 接受 (pid, options) 两个参数。它的 pid 参数如果大于 0,就等待该 pid 对应的子进程退出;如果是 0,就等待该进程组(相同 PGID)的任意子进程结束;如果是 -1,就等待任意子进程结束;如果小于 -1,则等待 PGID 为 -pid 的任意子进程结束。它的 options 参数可以是 0 或者以下常量的或值:WNOHANG 为如果没有结束的子进程也立刻返回,WUNTRACED 为如果有停止状态(可以使用 ctrl + z 来发送 SIGSTOP 信号)的子进程也返回,WCONTINUED 为如果有停止状态的子进程恢复运行了也返回。

它们的返回值是一个由 (pid, status) 组成的元组。其中,pid 是子进程的 PID,如果设置了 WNOHANG 且没有子进程退出的话,会是 0。而 status 是子进程的退出状态,是一个 16 位的无符号整数,高 8 位是退出码,低 8 位是杀死它的信号。可以用 exit_code, kill_signal = divmod(exit_status, 256) 来获取它们,os 模块也提供了一些函数来检查进程是否正常退出等。
它们调用出错时会抛出 OSError,其 errno 属性为错误码。主要的错误码有:ECHILD 表示没有子进程,或者 pid 对应的不是当前进程的子进程,或者 SIGCHLD 信号的处理函数被设为了 SIG_IGN;EINTR 表示等待的过程中被其他信号中断了。一般来说可以忽略 EINTR 并重试:

def wait_pid(pid, options):
    while True:
        try:
            return os.waitpid(pid, options)
        except OSError as e:
            if e.errno == errno.EINTR:
                continue
            raise

不过 Python 3 扩展了 OSError 的子类,提供了 InterruptedError,捕捉起来会更方便些。而从 Python 3.5 开始,根据 PEP 475,所有系统调用都会被自动重试,不需要捕捉了。

值得一提的是,一般情况下,只有某个进程的父进程才能拿到它的退出状态。如果这个进程的父进程已经退出了,也就无法得知其退出码了。

如果非要获取的话,可以使用 ptrace() 系统调用来成为这个进程的 tracer,然后就能通过 waitpid() 来获取退出码了。最常使用的 request 常量是 PTRACE_ATTACH 和 PTRACE_SEIZE,前者会向 tracee 进程发送 SIGSTOP 信号,可以手动向其发送 SIGCONT 信号来恢复运行。

如果 ptrace() 调用失败的话,会返回 -1,并设置 errno。主要错误码有:ESRCH 表示进程不存在,EPERM 表示没有权限(例如 1 号进程和已经被 traced 过的进程等)。

Python 下有个叫 python-ptrace 的第三方库封装了这个调用。

此外,Linux 下的 prctl() 系统调用从 3.4 版开始提供了 PR_SET_CHILD_SUBREAPER 这个 option,如果设为 1 的话,会将当前进程的收割者设为离它最近的祖先进程(即如果它的父进程退出了,但爷进程还存活,则设为爷进程)。

Python 下也有个叫 python-prctl 的第三方库封装了这个调用。

进程退出的方式也是挺多的,除了被信号杀死外,在 main() 函数中返回,或者调用 exit() 函数和 _exit() 系统调用都可以退出进程。其中,exit() 函数会调用通过 atexit() 和 on_exit() 注册的函数,刷新和关闭所有打开的 stdio 流,删除用 tmpfile() 创建的文件,调用 C++ 等语言的析构函数;后者则只是关闭所有打开的 fd。它们都接收一个 int status 参数,它的低 8 位(0 ~ 255)将作为进程的退出码。

Python 提供了 builtin.exit()、sys.exit() 和 os._exit() 这 3 个函数来退出。其中,os._exit() 是对 _exit() 的简单封装,用来立即退出进程;builtin.exit() 和 sys.exit() 都是抛出一个 SystemExit 异常,如果没有被捕捉的话,就会退出当前线程(多线程程序一般使用它来退出)。

Python 的文档中提到建议只在子进程中使用 os._exit() 函数。我在用 Python 的 unittest 模块时发现它会捕捉 SystemExit 异常,并将其当成一个失败的用例,而非退出进程。这就导致了子进程将继续执行后面的单元测试,输出也因此变得乱七八糟。所以我建议至少单元测试中的子进程都用 os._exit() 来退出。

此外,还可以用 abort() 函数来进行非正常的退出。它会先解除阻塞 SIGABRT 信号,然后给当前进程发送 SIGABRT 信号。如果当前进程设置了 SIGABRT 信号的处理函数,除非该信号处理函数没有返回(例如使用 longjmp() 跳转走了),否则在执行完处理函数后,或者发现处理函数设为了 SIG_IGN 后,abort() 函数会将其设为 SIG_DFL,然后再次发送 SIGABRT 信号,以确保进程退出。

它的表现行为与 _exit() 很相似,但它会刷新和关闭所有打开的 stdio 流,并且 SIGABRT 信号的默认处理行为会产生 core dump。
Python 同样也提供了 os.abort() 来封装这个函数,但是并不会调用通过 signal.signal() 注册的 SIGABRT 信号处理函数。

未完待续Python 的多进程编程(二)

3人推荐
随时随地看视频
慕课网APP