猿问

在Python多处理中将Pool.map与共享内存数组结合

我有一个很大的(只读)数据数组,我想由多个进程并行处理。


我喜欢Pool.map函数,并希望使用它来并行计算该数据上的函数。


我看到可以使用Value或Array类在进程之间使用共享内存数据。但是,当我尝试使用它时,我得到一个RuntimeError:'使用Pool.map函数时,应该仅通过继承在进程之间共享SynchronizedString对象:


这是我正在尝试做的一个简化示例:


from sys import stdin

from multiprocessing import Pool, Array


def count_it( arr, key ):

  count = 0

  for c in arr:

    if c == key:

      count += 1

  return count


if __name__ == '__main__':

  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

  # want to share it using shared memory

  toShare = Array('c', testData)


  # this works

  print count_it( toShare, "a" )


  pool = Pool()


  # RuntimeError here

  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

谁能告诉我我在做什么错?


因此,我想做的是在进程池中创建新的共享内存分配数组后,将信息传递给这些进程。


aluckdog
浏览 1184回答 3
3回答

千万里不及你

当我刚看到赏金的时候再试一次;)基本上,我认为错误消息的含义是它的意思-多处理共享内存数组不能作为参数传递(通过酸洗)。序列化数据没有意义-关键是数据是共享内存。因此,您必须使共享数组成为全局数组。我认为像我的第一个答案一样,将其作为模块的属性比较整洁,但在示例中将其保留为全局变量也可以很好地工作。考虑到您不想在fork之前设置数据的观点,这是一个修改后的示例。如果您希望拥有多个共享数组(这就是为什么要将toShare作为参数传递的原因),则可以类似地创建共享数组的全局列表,然后将索引传递给count_it(将变为for c in toShare[i]:)。from sys import stdinfrom multiprocessing import Pool, Array, Processdef count_it( key ):  count = 0  for c in toShare:    if c == key:      count += 1  return countif __name__ == '__main__':  # allocate shared array - want lock=False in this case since we   # aren't writing to it and want to allow multiple processes to access  # at the same time - I think with lock=True there would be little or   # no speedup  maxLength = 50  toShare = Array('c', maxLength, lock=False)  # fork  pool = Pool()  # can set data after fork  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"  if len(testData) > maxLength:      raise ValueError, "Shared array too small to hold data"  toShare[:len(testData)] = testData  print pool.map( count_it, ["a", "b", "s", "d"] )[编辑:以上内容由于未使用fork而无法在Windows上运行。但是,下面的方法在Windows上仍然可以使用Pool,但仍然可以使用,因此我认为这与您想要的最接近:from sys import stdinfrom multiprocessing import Pool, Array, Processimport mymoduledef count_it( key ):  count = 0  for c in mymodule.toShare:    if c == key:      count += 1  return countdef initProcess(share):  mymodule.toShare = shareif __name__ == '__main__':  # allocate shared array - want lock=False in this case since we   # aren't writing to it and want to allow multiple processes to access  # at the same time - I think with lock=True there would be little or   # no speedup  maxLength = 50  toShare = Array('c', maxLength, lock=False)  # fork  pool = Pool(initializer=initProcess,initargs=(toShare,))  # can set data after fork  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"  if len(testData) > maxLength:      raise ValueError, "Shared array too small to hold data"  toShare[:len(testData)] = testData  print pool.map( count_it, ["a", "b", "s", "d"] )不知道为什么map不会腌制数组,而Process和Pool会腌制-我想也许它已经在Windows上的子进程初始化时转移了。请注意,尽管在派生之后仍然设置了数据。

蛊毒传说

如果数据是只读的,只需在从Pool派生之前将其设置为模块中的变量即可。然后,所有子进程都应该能够访问它,并且只要您不对其进行写操作,就不会被复制。import myglobals # anything (empty .py file)myglobals.data = []def count_it( key ):    count = 0    for c in myglobals.data:        if c == key:            count += 1    return countif __name__ == '__main__':myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"pool = Pool()print pool.map( count_it, ["a", "b", "s", "d"] )如果您确实想尝试使用Array,则可以尝试使用lock=False关键字参数(默认情况下为true)。
随时随地看视频慕课网APP

相关分类

Python
我要回答