使用 Ray 并行化大型程序的正确方法

我有一个相当大的 Python 程序(~800 行),它具有以下结构:

  • 设置说明,我在其中处理用户提供的输入文件并定义对程序执行具有全局性的变量/对象。

  • 主要功能,它利用前面的设置阶段并调用程序的主要附加功能。

  • 附加函数可以是主要的,因为它们被主函数直接调用,或者是次要的,因为它们只被主要的附加函数调用。

  • 我处理 main 函数结果的最后几行代码。

该程序是大规模并行的,因为主函数的每次执行都独立于前一个和下一个。因此,我使用 Ray 在集群中的多个工作节点上并行执行主要功能。操作系统是 CentOS Linux release 8.2.2004 (Core),集群执行 PBS Pro 19.2.4.20190830141245。我正在使用 Python 3.7.4、Ray 0.8.7 和 Redis 3.4.1。

我在 Python 脚本中有以下内容,foo主要功能在哪里:

@ray.remote(memory=2.5 * 1024 * 1024 * 1024)

def foo(locInd):

    # Main function


if __name__ == '__main__':

    ray.init(address='auto', redis_password=args.pw,

             driver_object_store_memory=10 * 1024 * 1024 * 1024)

    futures = [foo.remote(i) for i in zip(*np.asarray(indArr == 0).nonzero())]

    waitingIds = list(futures)

    while len(waitingIds) > 0:

        readyIds, waitingIds = ray.wait(

            waitingIds, num_returns=min([checkpoint, len(waitingIds)]))

        for r0, r1, r2, r3, r4, r5, r6, r7 in ray.get(readyIds):

            # Process results

            indArr[r0[::-1]] = 1

            nodesComplete += 1

    ray.shutdown()

以下是我用来启动 Ray 的说明


# Head node

/path/to/ray start --head --port=6379 \

--redis-password=$redis_password \

--memory $((120 * 1024 * 1024 * 1024)) \

--object-store-memory $((20 * 1024 * 1024 * 1024)) \

--redis-max-memory $((10 * 1024 * 1024 * 1024)) \

--num-cpus 48 --num-gpus 0


只要我处理足够小的数据集,一切都会按预期运行。尽管如此,执行会产生以下警告

  • 2020-08-17 17:16:44,289 警告 worker.py:1134 -- 警告:腌制时远程函数的__main__.foo大小为 220019409。它将存储在 Redis 中,这可能会导致内存问题。这可能意味着它的定义使用了一个大数组或其他对象。

  • 2020-08-17 17:17:10,281 WARNING worker.py:1134 -- 这个 worker 被要求执行一个它没有注册的函数。您可能需要重新启动 Ray。

关于我如何向 Ray 描述程序,我显然做错了什么。我有 Scipy Interpolator 对象,我认为它们是全局的,但是,正如在这个 GitHub线程中已经指出的那样,我应该调用ray.put它们。问题是我遇到了这些ValueError: buffer source array is read-only我不知道如何诊断的问题。另外,我不确定是否应该用主要功能装饰所有功能@ray.remote或只装饰主要功能。我想我可以@ray.remote(num_cpus=1)为所有附加功能做,因为它实际上只应该是并行执行的主要功能,但我不知道这是否有意义。



四季花海
浏览 168回答 1
1回答

慕田峪4524236

正如在问题中提到的,该程序对于足够小的数据集运行得很好(尽管它似乎绕过了 Ray 逻辑的几个方面),但它最终在大型数据集上崩溃了。仅使用 Ray 任务,我没有设法调用存储在 Object Store ( ValueError: buffer source array is read-only) 中的 Scipy Interpolator 对象,并且装饰所有函数没有意义,因为实际上只有主要函数应该同时执行(同时调用其他函数)。因此,我决定更改程序结构以使用 Ray Actors。设置说明现在是该__init__方法的一部分。特别是,Scipy Interpolator 对象在此方法中定义并设置为 的属性self,就像全局变量一样。大多数函数(包括 main 函数)已成为类方法,但通过 Numba 编译的函数除外。对于后者,它们仍然是用 装饰的独立函数@jit,但它们中的每一个现在在调用 jitted 函数的类中都有一个等效的包装方法。为了让我的程序并行执行我现在的主要方法,我依赖于 ActorPool。我创建了与可用 CPU 一样多的 actor,每个 actor 都执行 main 方法,成功调用方法和 Numba 编译的函数,同时还设法访问 Interpolator 对象。我只适用@ray.remote于定义的 Python 类。所有这些都转化为以下结构:@ray.remoteclass FooClass(object):    def __init__(self, initArgs):        # Initialisation    @staticmethod    def exampleStaticMethod(args):        # Processing        return    def exampleMethod(self, args):        # Processing        return    def exampleWrapperMethod(self, args):        return numbaCompiledFunction(args)    def mainMethod(self, poolMapArgs):        # Processing        return@jitdef numbaCompiledFunction(args):    # Processing    returnray.init(address='auto', redis_password=redPass)actors = []for actor in range(int(ray.cluster_resources()['CPU'])):    actors.append(FooClass.remote(initArgs))pool = ActorPool(actors)for unpackedTuple in pool.map_unordered(        lambda a, v: a.mainMethod.remote(v),        poolMapArgs):    # Processingray.shutdown()这在分布在 4 个节点上的 192 个 CPU 上成功运行,没有任何警告或错误。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python