猿问

通过计算具有整列的行的值,在 spark 数据框中创建一列

我有一个数据框:


|id|value|

| 0|    1|

| 1|    3|

| 2|    9|

我想在每一行上应用一个函数来创建一个新列。该函数必须将参数中的行的值和整列的值作为第二个参数来生成向量。


例如:列中的值与行值之和的向量:


def fu(myValue, myColumn):

    return [myValue + i for i in myColumn]

具有 :


|id|value|sums_in_column|

| 0|    1|    [2, 4, 10]|

| 1|    3|    [3, 6, 12]|

| 2|    9|  [10, 12, 18]|

我知道,我可以从一行传递一个或多个值来计算一个新列,withColumn并使用udf并行化执行。但我不明白如何将列作为参数传递?可能吗?


慕姐4208626
浏览 89回答 2
2回答

慕村9548890

您不能将整列数据传递给 UDF,因为 Spark 引擎将计算和数据拆分到多个服务器/执行程序中。如果您可以调整算法以处理列值的执行程序本地子集,则可以使用RDD.mapPartitions在完整的数据分区上执行单个函数。或者,如果您知道您的列数据可以适合您的执行程序内存,您可以首先DataFrame.collect()列数据并使用SparkContext.broadcast()将其复制到所有执行程序并使用对UDF中广播变量的引用.

牧羊人nacy

设置:>>> d = [{'id': 0, 'value': 1},{'id': 1, 'value': 3},{'id': 2, 'value': 9}]>>> df0 = spark.createDataFrame(d)>>> df0.show()+---+-----+| id|value|+---+-----+|  0|    1||  1|    3||  2|    9|+---+-----+第 1 步:使用collect_list()函数创建一个包含 column 中所有值的数组value,并将该数组作为列添加到初始数据帧中>>> from pyspark.sql.functions import *>>> arr = df0.agg(collect_list(df.value).alias('arr_column'))>>> df1 = df0.crossJoin(arr)>>> df1.show()+---+-----+-------------+| id|value|   arr_column|+---+-----+-------------+|  0|    1|    [1, 3, 9]||  1|    3|    [1, 3, 9]||  2|    9|    [1, 3, 9]|+---+-----+-------------+交叉连接本质上会将数组广播给所有执行程序,因此请注意要应用它的数据大小。(您可能还需要spark.sql.crossJoin.enabled=true在创建 Spark 上下文时显式设置,因为 Spark 不喜欢交叉连接正是出于这个原因。)第 2 步:将您的fu函数注册为 Spark UDF>>> from pyspark.sql.types import *>>> fu_udf = udf(fu, ArrayType(IntegerType()))Step3:使用这个UDF来增加数组元素>>> df3 = df1.withColumn('sums_in_column',fu_udf(df1.value,df1.arr_column))>>> df3.show()+---+-----+-------------+--------------+| id|value|   arr_column|sums_in_column|+---+-----+-------------+--------------+|  0|    1|    [1, 3, 9]|    [2, 4, 10]||  1|    3|    [1, 3, 9]|    [4, 6, 12]||  2|    9|    [1, 3, 9]|  [10, 12, 18]|+---+-----+-------------+--------------+
随时随地看视频慕课网APP

相关分类

Python
我要回答