猿问

在PySpark中的GroupedData上应用UDF(具有可运行的python示例)

我有在python数据帧中本地运行的以下python代码:


df_result = pd.DataFrame(df

                          .groupby('A')

                          .apply(lambda x: myFunction(zip(x.B, x.C), x.name))

我想在PySpark中运行它,但是在处理pyspark.sql.group.GroupedData对象时遇到了麻烦。


我尝试了以下方法:


sparkDF

 .groupby('A')

 .agg(myFunction(zip('B', 'C'), 'A')) 

哪个返回


KeyError: 'A'

我猜想是因为“ A”不再是一列,而且我找不到x.name的等效项。


接着


sparkDF

 .groupby('A')

 .map(lambda row: Row(myFunction(zip('B', 'C'), 'A'))) 

 .toDF()

但出现以下错误:


AttributeError: 'GroupedData' object has no attribute 'map'

任何建议将不胜感激!


开心每一天1111
浏览 501回答 3
3回答

慕森王

我将超越答案。因此,您可以使用@pandas_udf在pyspark中实现类似pandas.groupby()。apply的逻辑,这是矢量化方法,并且比简单的udf更快。from pyspark.sql.functions import pandas_udf,PandasUDFTypedf3 = spark.createDataFrame([("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],("key", "value1", "value2"))from pyspark.sql.types import *schema = StructType([    StructField("key", StringType()),    StructField("avg_value1", DoubleType()),    StructField("avg_value2", DoubleType()),    StructField("sum_avg", DoubleType()),    StructField("sub_avg", DoubleType())])@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)def g(df):    gr = df['key'].iloc[0]    x = df.value1.mean()    y = df.value2.mean()    w = df.value1.mean() + df.value2.mean()    z = df.value1.mean() - df.value2.mean()    return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])df3.groupby("key").apply(g).show()您将获得以下结果:+---+----------+----------+-------+-------+|key|avg_value1|avg_value2|sum_avg|sub_avg|+---+----------+----------+-------+-------+|  b|       6.5|      -1.5|    5.0|    8.0||  a|       0.0|      21.0|   21.0|  -21.0|+---+----------+----------+-------+-------+因此,您可以在分组数据中的其他字段之间进行更多计算,并将它们以列表格式添加到数据框中。
随时随地看视频慕课网APP

相关分类

Python
我要回答