有没有办法将具有值范围的列添加到 Spark Dataframe 中?

我有一个 Spark 数据框:df1 如下:


age = spark.createDataFrame(["10","11","13"], "string").toDF("age")

age.show()

+---+

|age|

+---+

| 10|

| 11|

| 13|

+---+

我需要向数据框中添加行号列以使其:


+---+------+

|age|col_id|

+---+------+

| 10|   1  |

| 11|   2  |

| 13|   3  |

+---+------+

我的数据框中的所有列都不包含唯一值。我尝试使用F.monotonically_increasing_id()),但它只是按递增顺序生成随机数。


>>> age = spark.createDataFrame(["10","11","13"], "string").toDF("age").withColumn("rowId1", F.monotonically_increasing_id())

>>> age

DataFrame[age: string, rowId1: bigint]

>>> age.show

<bound method DataFrame.show of DataFrame[age: string, rowId1: bigint]>

>>> age.show()

+---+-----------+

|age|     rowId1|

+---+-----------+

| 10|17179869184|

| 11|42949672960|

| 13|60129542144|

+---+-----------+

由于我没有任何包含唯一数据的列,因此我担心使用窗口函数和生成row_numbers。那么,有没有一种方法可以row_count在数据框中添加一列,该列给出:


+---+------+

|age|col_id|

+---+------+

| 10|   1  |

| 11|   2  |

| 13|   3  |

+---+------+

如果窗口功能是唯一的实现方法,我如何确保所有数据都位于单个分区下?或者如果有一种方法可以在不使用窗口函数的情况下实现相同的功能,那么如何实现它?任何帮助表示赞赏。


慕哥9229398
浏览 83回答 2
2回答

慕标琳琳

使用zipWithIndex。pyspark 与 Scala 不同。其他答案对性能不利 - 使用单个执行器。zipWithIndex是narrow transformation这样,它可以按partition.在这里,您可以进行相应的定制:from pyspark.sql.types import StructFieldfrom pyspark.sql.types import StructTypefrom pyspark.sql.types import StringType, LongTypeimport pyspark.sql.functions as Fdf1 = spark.createDataFrame([ ('abc'),('2'),('3'),('4'), ('abc'),('2'),('3'),('4'), ('abc'),('2'),('3'),('4') ], StringType())schema = StructType(df1.schema.fields[:] + [StructField("index", LongType(), True)])rdd = df1.rdd.zipWithIndex()rdd1 = rdd.map(lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],))df1 = spark.createDataFrame(rdd1, schema)df1.show()返回:+-----+-----+|value|index|+-----+-----+|  abc|    0||    2|    1||    3|    2||    4|    3||  abc|    4||    2|    5||    3|    6||    4|    7||  abc|    8||    2|    9||    3|   10||    4|   11|+-----+-----+

宝慕林4294392

假设:这个答案基于以下假设: 的顺序col_id应取决于age列。如果假设不成立,则其他建议的解决方案是问题评论中提到的zipWithIndex。zipWithIndex可以在此答案中找到 的示例用法。建议的解决方案:您可以使用window带有空partitionBy和行号的 a 来获取预期的数字。from pyspark.sql.window import Windowfrom pyspark.sql import functions as FwindowSpec = Window.partitionBy().orderBy(F.col('age').asc())age = age.withColumn(&nbsp; &nbsp; 'col_id',&nbsp; &nbsp; F.row_number().over(windowSpec))
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python