手记

多个进程读写一个文件,怎样不出岔子

题图,多进程

案例故事:

我是一个快乐的菜鸟程序员,每天写着简单的代码。听说过多进程/多线程,但是一直觉得是很高深的技术,迟迟不敢涉足。慢慢地,我开始不满足于单进程代码的速度。很多时候,我的任务是检索一个数据库,对里面每一条数据进行简单处理并输出一些结果。对每一条数据的处理都是相互独立的,和其他数据完全没有关系。这时我开始思考或许把不同数据交给不同线程操作,会大大加快程序运行速度。

事实证明我是对的,而且利用Python的multiprocessing模块,并发地执行多个独立任务是个非常简单的事情。你要做的,只是将需要执行的任务做成函数,用一个multiprocessing.Pool就可以了。(当然multiprocessing.Process也可以实现并发。具体请参照这个模块的文档。如果有需求,我也可以写一点multiprocessing包的入门教程。嗯,如果有需求的话。。。)我非常开心,感觉自己已经是一头具备了多线程编程能力的程序猿了!并发编程简直就是小菜一碟!直到我的每一个进程都需要往同一个文件里写入数据。。。

一个简单的并发程序,各个进程/线程之间相互独立,无需信息交流,也无需同步。这样的程序往往很简单,因为每一个进程只需要执行自己的任务,并不需要意识到其它进程的存在。但好景不长,很快,我就开始面临进程通信的问题。比如,很多时候我们需要不同的进程对同一个文件进行写入,或者对同一个文件进行更新操作。这时候,如果每个进程太无视其他进程的操作,就有可能发生意想不到的事情。

本文以多个进程读写同一个文件为例,探索两种常见的并发编程的概念,一个是进程锁,一个是进程间通信。

进程锁

进程锁的逻辑概念

进程锁的概念很简单。有一些操作(比如写入文档),需要保证每次只能有一个进程在运行,直到其运行结束。这样以来,每个进程要执行这个操作的时候,先要查看是否有其他进程在执行改操作。如果有,就等待那个进程完成它的操作。如何实现这个想法呢?不难想到,一个进程间的全局变量就可以做到。为了简化问题,我们考虑一个整数类型的全局变量(我们管它叫“current_writing_proc”),每个进程都可以访问。如何用一个整数来给多进程写入同一个文件的任务做一个进程锁呢?我们可以做以下规定:

  1. 初始状态下,没有任何进程访问文件。此时current_writing_proc = 0.

  2. 当某个进程试图访问文件时,它先要检查current_writing_prod变量的值。如果该值为0, 则表示目前没有任何进程访问文件。如果其值不为0,则表示有程序正在占有文件,其必须等待该值重新变为0才可以访问文件。

  3. 假设一个进程等到了current_writing_proc == 0,即此时文件没有被任何进程访问的时候。那此进程可以开始它的操作:首先要将此变量值修改为自己的"process id" (在Python中,可以使用os.getpid函数获取当前进程的进程id),然后对文件进行操作。

  4. 当该进程完成操作后,要将current_writing_proc重新设为0,以使得其他进程可以获得权限来操作文件。

总的来说,该变量存储着当前拥有文件操作权限的进程ID号。此模型虽然简单,但是已经具备了进程锁的几大要素:等待进程锁(第2步),获取进程锁(第3步),释放进程锁(第4步)。

为何一般的全局变量不能用来作进程锁?

有同学可能会问,既然进程锁这么简单,为何multiprocessing包要专门实现Lock这个类?我自己定义一个整形全局变量不就好了?

可达鸭眉头一皱,发现事情并不简单

这里涉及到多进程的一个核心问题:当一个进程产生(fork)出多个进程时,可以认为,其所有变量都会被拷贝,每个进程拥有一个这个变量的副本。假设我们在父进程中定义一个全局变量current_writing_proc。紧接着,我们创建10个子进程,那么基本上,变量current_writing_proc会被复制出10个副本,每个子进程中拥有一个。既然是副本,那么进程A中修改该变量的值,将不会影响到进程B中的值。换句话说,进程B将无法通过这个变量得知进程A的任何信息。

这跟我们定义此变量的初衷是违背的。为了实现所有进程间共享一个变量的值,我们就要用到进程通信。当这个变量值被修改的时候,进程需要通过进程间的通信渠道,告知其他进程这个变量值被修改了。multiprocessing包里的Lock变量就是通过一个进程通信机制保证了每一个锁都是真正意义上的进程级别的全局变量,从而实现上面提到的逻辑。(当然,Lock里还有一些进程锁操作合法性的代码。有兴趣的同学不妨研究一下Python的multiprocessing的官方文档。)

注意:这里的讨论只适用于多进程,不适用于多线程。同一个进程的不同线程之间,可以方便的共享内存。这并不需要通过其他通信手段来完成。这也是为什么多线程编程往往比多进程编程更简单。若不是Python中有一个全局线程锁(Global Interpretator Lock), 多线程编程在Python世界里应该远比现在更受欢迎。

简单的进程锁文件写入代码

有了进程锁,我们展示一个简单的文档写入的代码。

from multiprocessing import Pool, Lockdef write_with_lock(lock, filename, s): # 在获取lock后,将字符串s的内容写入到文件filename中。
    lock.acquire() # 等待获取进程锁。
    # 执行文件操作。比如:
    open(filename, 'a').write(s) # 为了保证程序流畅运行,应使获取到释放进程锁中间的代码尽量简洁。
    lock.release() #完成操作后,释放进程锁
    # 执行其他不需要进程锁的代码
    def main():
    filename = "hello.txt"
    s = "<你需要写入的字符串>"
    lock = Lock()
    pool = Pool(processes=20) # 创建20个进程
    pool.starmap(write_with_lock, [[lock, filename, s]]*20) # 每个进程都执行write_with_lock函数。
    pool.close()
    pool.join()

文件操作的代理进程

有些时候,我们会创建很多进程来执行一些计算任务。每个进程都会偶尔对一个文件进行一些写入操作,跟花在计算上的时间相比,这些文件写入发生的频率并不大(比如每个进程执行自己的任务,只有发生某些错误时才会将该错误记录到某个日志文件中)。在这种情况下,可以考虑用一个进程来完成所有的文件读写操作。其他的任务进程,只需要将其需要写入的内容发送给这个负责文件读写的进程,由它代为操作未见就可以了。这样的进程,我们称它为文件操作代理进程,简称“代理进程”。其他的进程,我们称它们为“任务进程”。(注意:该命名只是方便在本文中讨论问题时容易区分,并不一定是标准的命名,请勿对名字当真。)

由于代理进程和任务进程之间要进行通信,这里我们先讲一下进程间通信队列。

进程间通信,消息队列

如何从进程A向进程B发送消息?想象在两个进程之间架设一根单向管道。进程A可以将消息通过管道发给进程B。在这个过程中,A最先发出去的消息肯定最先到达B,后发出去的消息后到达B。程序员们管这种“先到先得”的数据结构叫做队列

Python的multiprocessing.Queuemultiprocessing.SimpleQueue都是实现进程间消息队列的类。通过调用这两个类的putget函数,就可以向队列中发送数据,或者从队列中获取数据。具体例子见下一小节的演示。

利用代理进程的思想建立多进程文档读写模型

接下来,我们就利用代理进程的思想,来建立一个多进程任务中文档读写的逻辑模型。

  1. 建立代理进程。该进程函数有两个参数:文件写入队列writing_queue和目标文件名filename。该进程用一个死循环不断从队列writing_queue中获取消息。获取的数据存入变量s中。如果s是整数0,则表示收到进程结束信号。代理进程将跳出死循环,结束运行。如果s是个字符串,则代理进程打开文件filename,将s写入文件中。

  2. 建立任务进程。任务进程执行某个任务,在需要向文件写入信息时,将其需要写入的信息封装成字符串,投入到writing_queue队列中。

  3. 当所有任务进程都结束运行,并入住进程之后,主进程需要告知代理进程结束工作。按照步骤1中的规定,主进程只需往writing_queue队列中投入一个整数0,并且等待代理进程结束即可。

Python的代码实现

最后这个小节,介绍如何用Python实现上一节的模型。首先,我们来编写代理进程的任务代码。

def writing_proc(writing_queue, filename):
    while True: # 开启处理消息的死循环。直到接收到终止消息(数字0)方才跳出。
        s = get()        if isinstance(s, str):
            open(filename, 'a').write(s)        elif isinstance(s, int) and s == 0:            break
        else:            continue # 忽略掉错误格式的消息。

接下来是任务进程代码。

def task_proc(writing_queue):
    # 执行其自己的任务。当需要写入文件时:
    s = "需要写入的字符串"
    writing_queue.put(s)    # 任务的其他代码

可以看到,任务进程中写入文件的操作非常简单,只需将要写入的字符串put到写入队列中就可以了。真正的文件写入是上面的代理进程的工作。最后,给出主进程的代码:

 # don't forget to import Process, Manager (to use the Queue object) and Pool (if you like) from multiprocessing.
 def main():
    m = Manager() # 从这个类中获取Queue类。(见下面讨论)
    writing_queue = m.Queue()
    filename = "log.txt"
    p_write = Process(writing_proc, args=(writing_queue, filename))
    p_write.start()
    p_task_list = [] # 这里演示用Process而非Pool建立任务进程
    for i in range(20):
        p = Process(task_proc, args=(writing_queue,))
        p_task_list.append(p)
        p.start()    
    # 主进程开启代理进程和所有任务进程后,执行其自己的操作,然后等待任务进程结束
    for p in p_task_list:
        p.join()        
    # 现在任务进程全部结束。但代理进程还在死循环中。
    # 主进程需要告知代理进程结束运行。
    writing_queue.put(0)
    p_write.join() # 等待代理进程结束。
    
    # 现在所有子进程均已完成工作。主进程可以继续执行其他代码,或者退出程序。

上面代码中提到了要使用multiprocessing.Manager来创建Queue,而不是直接从multiprocess中引入Queue,即

from multiprocessing import Queue # 有可能引发错误。

直接使用这一行代码,会引发错误:Queue can only be used through inheritance. 错误提示表示,Python只允许不同的进程通过集成同一个基类(Process)的方式使用Queue。也就是说,我们需要给每一个任务都创建一个进程类,而非一个简单的进程函数。其实,这确实是一个很好的编码习惯。特别是编写比较大的程序时,给每一个任务定义一个进程类会让代码的组织更清晰,更便于管理和重用。但是这里,我们只做简单的演示,固没有定义任务类,而是使用任务函数。为了避免出现错误,我们就引入Manager类,然后从Manager对象中引入Queue类,见下面代码。

from multiprocessing import Manager

m = Manager()
queue = m.Queue()

总结

本文通过”多进程同时对同一个文件进行读写“这个问题,朴素讲解了进程锁、进程通信和进程间通信的概念,展示了用不同方式构建出的不同的并发执行模型。本人的经验是,当在进行并发编程时,在开始写代码之前,先设计一下程序的并发执行模型是很重要的。这个模型可以类似于上面几个小节中的列表,规定进程间通信的格式,以及规定不同进程对统一资源进行操作的先后步骤。把这个模型搞清楚之后,再去考虑写代码实现这个模型。



作者:爱科学的程序员小刘
链接:https://www.jianshu.com/p/7b121e1e4229


0人推荐
随时随地看视频
慕课网APP