至尊宝的传说
在 spark 2.4 中,您可以map_from_arrays在汇总每只股票的值时使用它来构建日期值映射。那么create_map使用股票代码作为关键只是一个使用问题。此示例使用ChainMappython 3.4 来构建您所描述的最终 dict 结构。import jsonfrom collections import ChainMapfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import *spark = SparkSession \ .builder \ .appName("example") \ .getOrCreate()df = spark.createDataFrame([ (1388534400, "GOOG", 50), (1388534400, "FB", 60), (1388534400, "MSFT", 55), (1388620800, "GOOG", 52), (1388620800, "FB", 61), (1388620800, "MSFT", 55)]).toDF("date", "stock", "price")out = df.groupBy("stock") \ .agg( map_from_arrays( collect_list("date"), collect_list("price")).alias("values")) \ .select(create_map("stock", "values").alias("values")) \ .rdd.flatMap(lambda x: x) \ .collect()print(json.dumps(dict(ChainMap(*out)), indent=4, separators=(',', ': '), sort_keys=True))这使:{ "FB": { "1388534400": 60, "1388620800": 61 }, "GOOG": { "1388534400": 50, "1388620800": 52 }, "MSFT": { "1388534400": 55, "1388620800": 55 }}但是,正如您所说,您有很多数据,您实际上可能并不想在内存中创建此字典,因此最好将其拆分并将相同的字典结构写入不同分区的文件中。让我们通过将日期截断到给定月份并为每个月和每个股票编写单独的文件来做到这一点:out = df.groupBy(trunc(expr("CAST(date as TIMESTAMP)"), "month").alias("month"), df["stock"]) \ .agg( map_from_arrays( collect_list("date"), collect_list("price")).alias("values")) \ .select("month", "stock", create_map("stock", "values").alias("values"))out.write.partitionBy("month", "stock").format("json").save("out/prices")这为您提供了如下结构:out└── prices ├── _SUCCESS └── month=2014-01-01 ├── stock=FB │ └── part-00093-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json ├── stock=GOOG │ └── part-00014-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json └── stock=MSFT └── part-00152-3741bdc2-345a-488e-82da-53bb586cd23b.c000.jsonMSFT 文件如下所示:{"values":{"MSFT":{"1388534400":55,"1388620800":55}}}虽然“值”列名称可能不在您的字典结构中,但我希望这说明了您可以做什么。