如何将新列添加到Spark DataFrame(使用PySpark)?

我有一个Spark DataFrame(使用PySpark 1.5.1),想添加一个新列。


我已经尝试了以下方法,但没有成功:


type(randomed_hours) # => list


# Create in Python and transform to RDD


new_col = pd.DataFrame(randomed_hours, columns=['new_col'])


spark_new_col = sqlContext.createDataFrame(new_col)


my_df_spark.withColumn("hours", spark_new_col["new_col"])

使用此命令也出错:


my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

那么,如何使用PySpark将新列(基于Python向量)添加到现有DataFrame中?


Smart猫小萌
浏览 1098回答 3
3回答

宝慕林4294392

您无法将任意列添加到DataFrameSpark中。只能通过使用文字来创建新列(其他文字类型在如何在Spark DataFrame中添加常量列中进行了描述)。from pyspark.sql.functions import litdf = sqlContext.createDataFrame(    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))df_with_x4 = df.withColumn("x4", lit(0))df_with_x4.show()## +---+---+-----+---+## | x1| x2|   x3| x4|## +---+---+-----+---+## |  1|  a| 23.0|  0|## |  3|  B|-23.0|  0|## +---+---+-----+---+转换现有列:from pyspark.sql.functions import expdf_with_x5 = df_with_x4.withColumn("x5", exp("x3"))df_with_x5.show()## +---+---+-----+---+--------------------+## | x1| x2|   x3| x4|                  x5|## +---+---+-----+---+--------------------+## |  1|  a| 23.0|  0| 9.744803446248903E9|## |  3|  B|-23.0|  0|1.026187963170189...|## +---+---+-----+---+--------------------+包括使用join:from pyspark.sql.functions import explookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))df_with_x6 = (df_with_x5    .join(lookup, col("x1") == col("k"), "leftouter")    .drop("k")    .withColumnRenamed("v", "x6"))## +---+---+-----+---+--------------------+----+## | x1| x2|   x3| x4|                  x5|  x6|## +---+---+-----+---+--------------------+----+## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|## |  3|  B|-23.0|  0|1.026187963170189...|null|## +---+---+-----+---+--------------------+----+或使用函数/ udf生成:from pyspark.sql.functions import randdf_with_x7 = df_with_x6.withColumn("x7", rand())df_with_x7.show()## +---+---+-----+---+--------------------+----+-------------------+## | x1| x2|   x3| x4|                  x5|  x6|                 x7|## +---+---+-----+---+--------------------+----+-------------------+## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|## +---+---+-----+---+--------------------+----+-------------------+在性能方面,pyspark.sql.functions映射到Catalyst表达式的内置函数()通常优于Python用户定义的函数。如果您想将任意RDD的内容添加为列,则可以将行号添加到现有数据框调用zipWithIndexRDD并将其转换为数据帧使用索引作为联接键将两者联接
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python