PySpark:从数据帧创建字典?

我有以下格式的数据,这些数据是从 Hive 获取到数据帧中的:


date, stock, price

1388534400, GOOG, 50

1388534400, FB, 60

1388534400, MSFT, 55

1388620800, GOOG, 52

1388620800, FB, 61

1388620800, MSFT, 55

其中 date 是当天午夜的纪元,我们有可以追溯到 10 年左右的数据(8 亿多行)。我想得到一本字典如下:


{

'GOOG':

{

'1388534400': 50,

'1388620800': 52

}


'FB':

{

'1388534400': 60,

'1388620800': 61

}

}

一种天真的方法是获取唯一股票的列表,然后通过仅过滤掉每只股票的那些行来获取数据帧的子集,但这似乎过于天真且效率极低。这可以在 Spark 中轻松完成吗?我目前已经使用 PyHive 在本机 Python 中运行它,但由于数据量庞大,我宁愿在集群/Spark 上完成这项工作。


饮歌长啸
浏览 131回答 2
2回答

至尊宝的传说

在 spark 2.4 中,您可以map_from_arrays在汇总每只股票的值时使用它来构建日期值映射。那么create_map使用股票代码作为关键只是一个使用问题。此示例使用ChainMappython 3.4 来构建您所描述的最终 dict 结构。import jsonfrom collections import ChainMapfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import *spark = SparkSession \    .builder \    .appName("example") \    .getOrCreate()df = spark.createDataFrame([    (1388534400, "GOOG", 50),    (1388534400, "FB", 60),    (1388534400, "MSFT", 55),    (1388620800, "GOOG", 52),    (1388620800, "FB", 61),    (1388620800, "MSFT", 55)]).toDF("date", "stock", "price")out = df.groupBy("stock") \        .agg(            map_from_arrays(                collect_list("date"), collect_list("price")).alias("values")) \        .select(create_map("stock", "values").alias("values")) \        .rdd.flatMap(lambda x: x) \        .collect()print(json.dumps(dict(ChainMap(*out)), indent=4, separators=(',', ': '), sort_keys=True))这使:{                                                                                   "FB": {        "1388534400": 60,        "1388620800": 61    },    "GOOG": {        "1388534400": 50,        "1388620800": 52    },    "MSFT": {        "1388534400": 55,        "1388620800": 55    }}但是,正如您所说,您有很多数据,您实际上可能并不想在内存中创建此字典,因此最好将其拆分并将相同的字典结构写入不同分区的文件中。让我们通过将日期截断到给定月份并为每个月和每个股票编写单独的文件来做到这一点:out = df.groupBy(trunc(expr("CAST(date as TIMESTAMP)"), "month").alias("month"), df["stock"]) \        .agg(            map_from_arrays(                collect_list("date"), collect_list("price")).alias("values")) \        .select("month", "stock", create_map("stock", "values").alias("values"))out.write.partitionBy("month", "stock").format("json").save("out/prices")这为您提供了如下结构:out└── prices    ├── _SUCCESS    └── month=2014-01-01        ├── stock=FB        │   └── part-00093-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json        ├── stock=GOOG        │   └── part-00014-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json        └── stock=MSFT            └── part-00152-3741bdc2-345a-488e-82da-53bb586cd23b.c000.jsonMSFT 文件如下所示:{"values":{"MSFT":{"1388534400":55,"1388620800":55}}}虽然“值”列名称可能不在您的字典结构中,但我希望这说明了您可以做什么。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python