猿问

尝试访问 multiprocessing.Pool 工作进程中的持久数据时出现不稳定的运行时异常

受此解决方案的启发,我尝试在 Python 中设置一个多处理工作进程池。这个想法是在工作进程实际开始工作之前将一些数据传递给工作进程并最终重用它。它旨在最小化每次调用到工作进程时需要打包/解包的数据量(即减少进程间通信开销)。我的MCVE看起来像这样:


import multiprocessing as mp

import numpy as np


def create_worker_context():

    global context # create "global" context in worker process

    context = {}


def init_worker_context(worker_id, some_const_array, DIMS, DTYPE):

    context.update({

        'worker_id': worker_id,

        'some_const_array': some_const_array,

        'tmp': np.zeros((DIMS, DIMS), dtype = DTYPE),

        }) # store context information in global namespace of worker

    return True # return True, verifying that the worker process received its data


class data_analysis:

    def __init__(self):

        self.DTYPE = 'float32'

        self.CPU_LEN = mp.cpu_count()

        self.DIMS = 100

        self.some_const_array = np.zeros((self.DIMS, self.DIMS), dtype = self.DTYPE)

        # Init multiprocessing pool

        self.cpu_pool = mp.Pool(processes = self.CPU_LEN, initializer = create_worker_context) # create pool and context in workers

        pool_results = [

            self.cpu_pool.apply_async(

                init_worker_context,

                args = (core_id, self.some_const_array, self.DIMS, self.DTYPE)

            ) for core_id in range(self.CPU_LEN)

            ] # pass information to workers' context

        result_batches = [result.get() for result in pool_results] # check if they got the information

        if not all(result_batches): # raise an error if things did not work

            raise SyntaxError('Workers could not be initialized ...')


    @staticmethod

    def process_batch(batch_data):

        context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker

        return context['tmp'] # return result



我正在使用 CPython 3.6.6 运行上述内容。



我很困惑。这里发生了什么?


心有法竹
浏览 230回答 1
1回答
随时随地看视频慕课网APP

相关分类

Python
我要回答