在 Dask 中,如何根据全局(而不是工作线程)资源约束来限制任务的调度?

我有一个使用 Dask 编写的大型数据提取作业,其中每个任务将从数十个数据库的大量表中查询一个表。对于每个数据库实例,我想限制一次连接的任务数量(即限制)。例如,我可能有 100 个任务连接到数据库 A,100 个任务连接到数据库 B,100 个任务连接到数据库 C,等等,并且我想确保在任何给定时间连接到任何数据库的任务不超过 20 个。

我发现 Dask 提供了基于工作线程资源(CPU、MEM、GPU 等)的约束,但是数据库资源是“全局”的,因此对于任何 Dask 工作线程来说都不是特定的。Dask 是否提供任何方法来对任务并发性的此类约束进行建模?


繁星coding
浏览 82回答 1
1回答

幕布斯6054654

Dask 提供分布式信号量,可以限制对数据库等资源的并发访问。例子import timefrom dask.distributed import Client, Semaphoreclient = Client(...)def do_task(x, sem):    with sem:        time.sleep(5)        return x# allow no more than 5 tasks to run concurrentlysem = Semaphore(max_leases=5, name="Limiter")# submit jobs that use the semaphorefutures = client.map(do_task, range(20), sem=sem)# collect resultsresults = client.gather(futures)
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python