猿问

连接一个 dask 数据框和一个 Pandas 数据框

我有一个df包含大约 2.5 亿行(来自 10Gb CSV 文件)的 dask 数据框 ( )。我有另一个ndf25,000 行的Pandas 数据框 ( )。我想通过将每个项目重复 10,000 次,将 Pandas 数据框的第一列添加到 dask 数据框。


这是我尝试过的代码。我已将问题缩小到较小的范围。


import dask.dataframe as dd

import pandas as pd

import numpy as np


pd.DataFrame(np.random.rand(25000, 2)).to_csv("tempfile.csv")

df = dd.read_csv("tempfile.csv")

ndf = pd.DataFrame(np.random.randint(1000, 3500, size=2500))

df['Node'] = np.repeat(ndf[0], 10)

使用此代码,我最终遇到错误。


ValueError:并非所有分区都是已知的,无法对齐分区。请使用set_index来设置索引。


我可以执行 areset_index()后跟 aset_index()来制作df.known_divisions Truedask 数据框。但这是一项耗时的操作。有没有更好更快的方法来做我想做的事情?我可以使用熊猫本身来做到这一点吗?


最终目标是从ndf其中的任何相应行中找到df与某些条件匹配的行。


桃花长相依
浏览 198回答 2
2回答

holdtom

您的基本算法是“我希望将 的前 10 个值df['Node']设置为 的第一个值ndf,将接下来的 10 个值设置为 的下一个值ndf,依此类推”。这在 Dask 中很难,因为它不知道每个分区中有多少行:您正在从 CSV 读取,并且您在 X 字节中获得的行数取决于每个部分中的数据是什么样的. 其他格式为您提供更多信息...因此,您肯定需要两次遍历数据。您可以使用索引来找出划分并可能进行一些排序。在我看来,你能做的最简单的事情就是测量分割长度,然后得到每个开始的偏移量:lengths = df.map_partitions(len).compute()offsets = np.cumsum(lengths.values)offsets -= offsets[0]现在使用自定义延迟功能来处理零件@dask.delayeddef add_node(part, offset, ndf):    index = pd.Series(range(offset, offset + len(part)) // 10,                      index=part.index)  # 10 is the repeat factor    part['Node'] = index.map(ndf)    return partdf2 = dd.from_delayed([add_node(d, off, ndf)                        for d, off in zip(df.to_delayed(), offsets)])

牛魔王的故事

使用相同的工作流程,您可以divisions按照此处的建议手动设置import dask.dataframe as ddimport pandas as pdimport numpy as nppd.DataFrame(np.random.rand(25000, 2)).to_csv("tempfile.csv", index=False)df = dd.read_csv("tempfile.csv")ndf = pd.DataFrame(np.random.randint(1000, 3500, size=2500))df.divisions = (0, len(df)-1)df["Note"] = dd.from_array(np.repeat(ndf.values, 10))我不认为使用np.repeat是非常有效的,特别是对于大 df。
随时随地看视频慕课网APP

相关分类

Python
我要回答