就我而言,我在 S3 中有多个文件和一个自定义函数,该函数读取每个文件并使用所有线程处理它。为了简化示例,我只生成一个数据帧df,并且假设我的函数是tsfresh.extract_features使用多重处理。
生成数据
import pandas as pd
from tsfresh import extract_features
from tsfresh.examples.robot_execution_failures import download_robot_execution_failures, \
load_robot_execution_failures
download_robot_execution_failures()
ts, y = load_robot_execution_failures()
df = []
for i in range(5):
tts = ts.copy()
tts["id"] += 88 * i
df.append(tts)
df = pd.concat(df, ignore_index=True)
功能
def fun(df, n_jobs):
extracted_features = extract_features(df,
column_id="id",
column_sort="time",
n_jobs=n_jobs)
簇
import dask
from dask.distributed import Client, progress
from dask import compute, delayed
from dask_cloudprovider import FargateCluster
my_vpc = # your vpc
my_subnets = # your subnets
cpu = 2
ram = 4
cluster = FargateCluster(n_workers=1,
image='rpanai/feats-worker:2020-08-24',
vpc=my_vpc,
subnets=my_subnets,
worker_cpu=int(cpu * 1024),
worker_mem=int(ram * 1024),
cloudwatch_logs_group="my_log_group",
task_role_policies=['arn:aws:iam::aws:policy/AmazonS3FullAccess'],
scheduler_timeout='20 minutes'
)
cluster.adapt(minimum=1,
maximum=4)
client = Client(cluster)
client
使用所有工作线程(失败)
to_process = [delayed(fun)(df, cpu) for i in range(10)]
out = compute(to_process)
AssertionError: daemonic processes are not allowed to have children
仅使用一个线程(OK)
在这种情况下,它工作正常,但我浪费资源。
to_process = [delayed(fun)(df, 0) for i in range(10)]
out = compute(to_process)
问题
我知道对于这个特定的功能,我最终可以使用多线程和其他一些技巧编写一个自定义分配器,但我想分配一个工作,让每个工作人员都可以利用所有资源,而不必担心太多。
元芳怎么了
相关分类