猿问

如何在 PySpark 中创建 merge_asof 功能?

表A有许多列和一个日期列,表B有一个日期时间和一个值。两个表中的数据都是零星生成的,没有固定的间隔。桌子A很小,桌子B很大。


我需要在给定元素对应的条件下B加入AaA.datetime


B[B['datetime'] <= a]]['datetime'].max()

有几种方法可以做到这一点,但我想要最有效的方法。


选项1

将小数据集广播为 Pandas DataFrame。设置一个 Spark UDF,为每一行创建一个 pandas DataFrame,使用merge_asof.


选项 2

使用 Spark SQL 的广播连接功能:在以下条件下设置 theta 连接


B['datetime'] <= A['datetime']

然后消除所有多余的行。


选项 B 似乎很糟糕......但请让我知道第一种方法是否有效或者是否有另一种方法。


编辑:这是示例输入和预期输出:


A =

+---------+----------+

| Column1 | Datetime |

+---------+----------+

|    A    |2019-02-03|

|    B    |2019-03-14|

+---------+----------+


B =

+---------+----------+

|   Key   | Datetime |

+---------+----------+

|    0    |2019-01-01|

|    1    |2019-01-15|

|    2    |2019-02-01|

|    3    |2019-02-15|

|    4    |2019-03-01|

|    5    |2019-03-15|

+---------+----------+


custom_join(A,B) =

+---------+----------+

| Column1 |   Key    |

+---------+----------+

|    A    |     2    |

|    B    |     4    |

+---------+----------+


慕虎7371278
浏览 132回答 3
3回答

神不在的星期二

任何尝试在 pyspark 3.x 中执行此操作的人都可以使用pyspark.sql.PandasCogroupedOps.applyInPandas例如:&nbsp; from pyspark.sql import SparkSession, Row, DataFrame&nbsp; import pandas as pd&nbsp; spark = SparkSession.builder.master("local").getOrCreate()&nbsp; df1 = spark.createDataFrame(&nbsp; &nbsp; &nbsp; [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],&nbsp; &nbsp; &nbsp; ("time", "id", "v1"))&nbsp; df2 = spark.createDataFrame(&nbsp; &nbsp; &nbsp; [(20000101, 1, "x"), (20000101, 2, "y")],&nbsp; &nbsp; &nbsp; ("time", "id", "v2"))&nbsp; def asof_join(l, r):&nbsp; &nbsp; &nbsp; return pd.merge_asof(l, r, on="time", by="id")&nbsp; df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(&nbsp; &nbsp; &nbsp; asof_join, schema="time int, id int, v1 double, v2 string"&nbsp; ).show()&nbsp; >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>&nbsp; +--------+---+---+---+&nbsp; |&nbsp; &nbsp; time| id| v1| v2|&nbsp; +--------+---+---+---+&nbsp; |20000101|&nbsp; 1|1.0|&nbsp; x|&nbsp; |20000102|&nbsp; 1|3.0|&nbsp; x|&nbsp; |20000101|&nbsp; 2|2.0|&nbsp; y|&nbsp; |20000102|&nbsp; 2|4.0|&nbsp; y|&nbsp; +--------+---+---+---+

慕尼黑5688855

我怀疑它是否更快,但您可以通过使用union和last与window函数一起使用 Spark 来解决它。from pyspark.sql import functions as ffrom pyspark.sql.window import Windowdf1 = df1.withColumn('Key', f.lit(None))df2 = df2.withColumn('Column1', f.lit(None))df3 = df1.unionByName(df2)w = Window.orderBy('Datetime', 'Column1').rowsBetween(Window.unboundedPreceding, -1)df3.withColumn('Key', f.last('Key', True).over(w)).filter(~f.isnull('Column1')).show()这使+-------+----------+---+|Column1|&nbsp; Datetime|Key|+-------+----------+---+|&nbsp; &nbsp; &nbsp; A|2019-02-03|&nbsp; 2||&nbsp; &nbsp; &nbsp; B|2019-03-14|&nbsp; 4|+-------+----------+---+这是一个老问题,但可能对某些人仍然有用。

收到一只叮咚

想出了一个快速(但可能不是最有效)的方法来完成这个。我构建了一个辅助函数:def get_close_record(df, key_column, datetime_column, record_time):&nbsp; &nbsp; """&nbsp; &nbsp; Takes in ordered dataframe and returns the closest&nbsp;&nbsp; &nbsp; record that is higher than the datetime given.&nbsp; &nbsp; """&nbsp; &nbsp; filtered_df = df[df[datetime_column] >= record_time][0:1]&nbsp; &nbsp; [key] = filtered_df[key_column].values.tolist()&nbsp; &nbsp; return key我没有加入B,A而是设置了pandas_udf上述代码并在表的列上运行它,然后B使用主键运行并由聚合。groupByBA_keyB_keymax这种方法的问题是它需要在B.更好的解决方案:我开发了以下应该可以工作的辅助函数other_df['_0'] = other_df['Datetime']bdf = sc.broadcast(other_df)#merge asof udf@F.pandas_udf('long')def join_asof(v, other=bdf.value):&nbsp; &nbsp; f = pd.DataFrame(v)&nbsp; &nbsp; j = pd.merge_asof(f, other, on='_0', direction = 'forward')&nbsp; &nbsp; return j['Key']joined = df.withColumn('Key', join_asof(F.col('Datetime')))
随时随地看视频慕课网APP

相关分类

Python
我要回答