Dask - 是否可以通过自定义函数使用每个工作线程中的所有线程?

就我而言,我在 S3 中有多个文件和一个自定义函数,该函数读取每个文件并使用所有线程处理它。为了简化示例,我只生成一个数据帧df,并且假设我的函数是tsfresh.extract_features使用多重处理。


生成数据

import pandas as pd

from tsfresh import extract_features

from tsfresh.examples.robot_execution_failures import download_robot_execution_failures, \

load_robot_execution_failures

download_robot_execution_failures()

ts, y = load_robot_execution_failures()

df = []

for i in range(5):

    tts = ts.copy()

    tts["id"] += 88 * i

    df.append(tts)

    

df = pd.concat(df, ignore_index=True)

功能

def fun(df, n_jobs):

    extracted_features = extract_features(df,

                                      column_id="id",

                                      column_sort="time",

                                      n_jobs=n_jobs)

import dask

from dask.distributed import Client, progress

from dask import compute, delayed

from dask_cloudprovider import FargateCluster


my_vpc = # your vpc

my_subnets = # your subnets


cpu = 2 

ram = 4

cluster = FargateCluster(n_workers=1,

                         image='rpanai/feats-worker:2020-08-24',

                         vpc=my_vpc,

                         subnets=my_subnets,

                         worker_cpu=int(cpu * 1024),

                         worker_mem=int(ram * 1024),

                         cloudwatch_logs_group="my_log_group",

                         task_role_policies=['arn:aws:iam::aws:policy/AmazonS3FullAccess'],

                         scheduler_timeout='20 minutes'

                        )



cluster.adapt(minimum=1,

              maximum=4)

client = Client(cluster)

client

使用所有工作线程(失败)

to_process = [delayed(fun)(df, cpu) for i in range(10)]

out = compute(to_process)

AssertionError: daemonic processes are not allowed to have children

仅使用一个线程(OK)

在这种情况下,它工作正常,但我浪费资源。


to_process = [delayed(fun)(df, 0) for i in range(10)]

out = compute(to_process)

问题

我知道对于这个特定的功能,我最终可以使用多线程和其他一些技巧编写一个自定义分配器,但我想分配一个工作,让每个工作人员都可以利用所有资源,而不必担心太多。


开心每一天1111
浏览 109回答 1
1回答

元芳怎么了

我可以帮助回答您的具体问题tsfresh,但 iftsfresh只是一个简单的玩具示例,可能不是您想要的。对于tsfresh,您通常不会混合使用tsfreshdask 和 dask 的多重处理,而是让 dask 执行所有处理。这意味着,您从一个单一的开始dask.DataFrame(在您的测试用例中,您可以将 pandas 数据帧转换为 dask 数据帧 - 对于您的读取用例,您可以直接从S3 docu读取),然后在 dask 数据帧中分发特征提取(特征提取的好处是,它在每个时间序列上独立工作。因此我们可以为每个时间序列生成一个作业)。我不确定这是否有助于解决您更普遍的问题。在我看来,你(在大多数情况下)不想混合dask的分布函数和“本地”多核计算,而只是让dask处理一切。因为如果您位于 dask 集群上,您甚至可能不知道每台机器上有多少个核心(或者每个作业可能只获得一个核心)。这意味着,如果您的作业可以分发 N 次,并且每个作业将启动 M 个子作业,您只需将“N x M”作业交给 dask 并让它计算其余部分(包括数据局部性)。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python