实例属性不会使用多处理持久化

我遇到了实例不保留对属性的更改,甚至不保留创建的新属性的问题。我想我已经将范围缩小到我的脚本利用了多处理这一事实,并且我认为当脚本返回到主线程时,不会“记住”发生在单独进程线程中的实例的更改。


基本上,我有几组数据需要并行处理。数据存储为属性,并通过类中的几种方法进行更改。在处理结束时,我希望返回到主线程并连接来自每个对象实例的数据。但是,如上所述,当我尝试在并行处理位完成后使用数据访问实例属性时,那里什么也没有。就好像在多处理位期间制定的任何更改都被“遗忘”了。


有没有明显的解决方案来解决这个问题?或者我是否需要重建我的代码来返回处理过的数据,而不仅仅是将其更改/存储为实例属性?我想另一种解决方案是序列化数据,然后在必要时重新读取它,而不仅仅是将其保存在内存中。


这里可能值得注意的是,我使用的是pathos模块而不是 python 的multiprocessing模块。我遇到了一些关于酸洗的错误,类似于这里:Python multiprocessing PicklingError: Can't pickle <type 'function'>。我的代码分为几个模块,如上所述,数据处理方法包含在一个类中。


对不起,文字墙。


编辑这是我的代码:


import importlib

import pandas as pd

from pathos.helpers import mp

from provider import Provider


# list of data providers ... length is arbitrary

operating_providers = ['dataprovider1', 'dataprovider2', 'dataprovider3']



# create provider objects for each operating provider

provider_obj_list = []

for name in operating_providers:

    loc     = 'providers.%s' % name

    module  = importlib.import_module(loc)

    provider_obj = Provider(module)

    provider_obj_list.append(provider_obj)


processes = []

for instance in provider_obj_list:

    process = mp.Process(target = instance.data_processing_func)

    process.daemon = True

    process.start()

    processes.append(process)


for process in processes:

    process.join()


# now that data_processing_func is complete for each set of data, 

# stack all the data

stack = pd.concat((instance.data for instance in provider_obj_list))

我有许多模块(它们的名称在 中列出operating_providers)包含特定于它们的数据源的属性。这些模块被迭代导入并传递给 Provider 类的新实例,我在单独的模块 ( provider) 中创建了该实例。我将每个 Provider 实例附加到一个列表 ( provider_obj_list),然后迭代地创建调用实例方法的单独进程instance.data_processing_func。这个函数做一些数据处理(每个实例访问完全不同的数据文件),并在此过程中创建新的实例属性,当并行处理完成时我需要访问这些属性。


我尝试使用多线程而不是多处理——在这种情况下,我的实例属性保持不变,这就是我想要的。但是,我不确定为什么会发生这种情况——我必须研究线程与多处理之间的差异。


谢谢你的帮助!


largeQ
浏览 170回答 1
1回答

紫衣仙女

这是一些示例代码,展示了如何执行我在评论中概述的内容。我无法测试它,因为我没有provider或没有pathos安装,但它应该让你对我的建议有一个很好的了解。import importlibfrom pathos.helpers import mpfrom provider import Providerdef process_data(loc):&nbsp; &nbsp; module&nbsp; = importlib.import_module(loc)&nbsp; &nbsp; provider_obj = Provider(module)&nbsp; &nbsp; provider_obj.data_processing_func()if __name__ == '__main__':&nbsp; &nbsp; # list of data providers ... length is arbitrary&nbsp; &nbsp; operating_providers = ['dataprovider1', 'dataprovider2', 'dataprovider3']&nbsp; &nbsp; # create list of provider locations for each operating provider&nbsp; &nbsp; provider_loc_list = []&nbsp; &nbsp; for name in operating_providers:&nbsp; &nbsp; &nbsp; &nbsp; loc = 'providers.%s' % name&nbsp; &nbsp; &nbsp; &nbsp; provider_loc_list.append(loc)&nbsp; &nbsp; processes = []&nbsp; &nbsp; for loc in provider_loc_list:&nbsp; &nbsp; &nbsp; &nbsp; process = mp.Process(target=process_data, args=(loc,))&nbsp; &nbsp; &nbsp; &nbsp; process.daemon = True&nbsp; &nbsp; &nbsp; &nbsp; process.start()&nbsp; &nbsp; &nbsp; &nbsp; processes.append(process)&nbsp; &nbsp; for process in processes:&nbsp; &nbsp; &nbsp; &nbsp; process.join()
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python