Python多处理和共享计数器

我在多处理模块上遇到了麻烦。我正在使用具有其map方法的工作人员池从大量文件中加载数据,对于每个文件,我都使用自定义函数来分析数据。每次处理文件时,我都希望更新一个计数器,以便跟踪要处理的文件数量。这是示例代码:


def analyze_data( args ):

    # do something 

    counter += 1

    print counter



if __name__ == '__main__':


    list_of_files = os.listdir(some_directory)


    global counter

    counter = 0


    p = Pool()

    p.map(analyze_data, list_of_files)

我找不到解决方案。


精慕HU
浏览 694回答 3
3回答

慕标5832272

问题在于该counter变量未在您的进程之间共享:每个单独的进程都在创建它自己的本地实例并对其进行递增。有关可用于在进程之间共享状态的某些技术,请参阅文档的本部分。在您的情况下,您可能希望Value在工作人员之间共享一个实例这是示例的工作版本(带有一些虚拟输入数据)。请注意,它使用的是全局值,在实践中我会尽量避免使用这些值:from multiprocessing import Pool, Valuefrom time import sleepcounter = Nonedef init(args):    ''' store the counter for later use '''    global counter    counter = argsdef analyze_data(args):    ''' increment the global counter, do something with the input '''    global counter    # += operation is not atomic, so we need to get a lock:    with counter.get_lock():        counter.value += 1    print counter.value    return args * 10if __name__ == '__main__':    #inputs = os.listdir(some_directory)    #    # initialize a cross-process counter and the input lists    #    counter = Value('i', 0)    inputs = [1, 2, 3, 4]    #    # create the pool of workers, ensuring each one receives the counter     # as it starts.     #    p = Pool(initializer = init, initargs = (counter, ))    i = p.map_async(analyze_data, inputs, chunksize = 1)    i.wait()    print i.get()

www说

没有竞争条件错误的计数器类:class Counter(object):    def __init__(self):        self.val = multiprocessing.Value('i', 0)    def increment(self, n=1):        with self.val.get_lock():            self.val.value += n    @property    def value(self):        return self.val.value
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python