猿问

使用 java 将索引列添加到 apache spark Dataset<Row>

下面的问题有 scala 和 pyspark 的解决方案,这个问题中提供的解决方案不适用于连续的索引值。


Spark Dataframe:如何添加索引列:又名分布式数据索引


我在 Apache-spark 中有一个现有数据集,我想根据索引从中选择一些行。我打算添加一个索引列,其中包含从 1 开始的唯一值,并根据该列的值获取行。我发现以下方法可以添加使用排序依据的索引:


df.withColumn("index", functions.row_number().over(Window.orderBy("a column")));

我不想使用排序依据。我需要索引的顺序与它们在数据集中的顺序相同。有什么帮助吗?


手掌心
浏览 132回答 2
2回答

阿晨1998

据我所知,您正在尝试将索引(具有连续值)添加到数据框。不幸的是,在 Spark 中没有内置函数可以做到这一点。您只能使用 df.withColumn("index", ) 添加递增索引(但不一定具有连续值monotonicallyIncreasingId)。尽管如此,RDD API 中有一个zipWithIndex函数可以完全满足您的需要。因此,我们可以定义一个函数,将数据帧转换为 RDD,添加索引并将其转换回数据帧。我不是 java 中 spark 的专家(scala 更紧凑)所以可能会做得更好。这是我会怎么做。public static Dataset<Row> zipWithIndex(Dataset<Row> df, String name) {&nbsp; &nbsp; JavaRDD<Row> rdd = df.javaRDD().zipWithIndex().map(t -> {&nbsp; &nbsp; &nbsp; &nbsp; Row r = t._1;&nbsp; &nbsp; &nbsp; &nbsp; Long index = t._2 + 1;&nbsp; &nbsp; &nbsp; &nbsp; ArrayList<Object> list = new ArrayList<>();&nbsp; &nbsp; &nbsp; &nbsp; r.toSeq().iterator().foreach(x -> list.add(x));&nbsp; &nbsp; &nbsp; &nbsp; list.add(index);&nbsp; &nbsp; &nbsp; &nbsp; return RowFactory.create(list);&nbsp; &nbsp; });&nbsp; &nbsp; StructType newSchema = df.schema()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .add(new StructField(name, DataTypes.LongType, true, null));&nbsp; &nbsp; return df.sparkSession().createDataFrame(rdd, newSchema);}以下是您将如何使用它。请注意内置 spark 函数的作用与我们的方法的作用形成对比。Dataset<Row> df = spark.range(5)&nbsp; &nbsp; .withColumn("index1", functions.monotonicallyIncreasingId());Dataset<Row> result = zipWithIndex(df, "good_index");// df+---+-----------+| id|&nbsp; &nbsp; &nbsp;index1|+---+-----------+|&nbsp; 0|&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 0||&nbsp; 1| 8589934592||&nbsp; 2|17179869184||&nbsp; 3|25769803776||&nbsp; 4|25769803777|+---+-----------+// result+---+-----------+----------+| id|&nbsp; &nbsp; &nbsp;index1|good_index|+---+-----------+----------+|&nbsp; 0|&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 0|&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;1||&nbsp; 1| 8589934592|&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;2||&nbsp; 2|17179869184|&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;3||&nbsp; 3|25769803776|&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;4||&nbsp; 4|25769803777|&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;5|+---+-----------+----------+

UYOU

上面的答案经过一些调整对我有用。下面是一个功能性的 Intellij Scratch 文件。我在 Spark 2.3.0 上:import org.apache.spark.api.java.JavaRDD;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.functions;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.Metadata;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.util.ArrayList;class Scratch {&nbsp; &nbsp; public static void main(String[] args) {&nbsp; &nbsp; &nbsp; &nbsp; SparkSession spark = SparkSession&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .builder()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .appName("_LOCAL")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .master("local")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .getOrCreate();&nbsp; &nbsp; &nbsp; &nbsp; Dataset<Row> df = spark.range(5)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .withColumn("index1", functions.monotonicallyIncreasingId());&nbsp; &nbsp; &nbsp; &nbsp; Dataset<Row> result = zipWithIndex(df, "good_index");&nbsp; &nbsp; &nbsp; &nbsp; result.show();&nbsp; &nbsp; }&nbsp; &nbsp; public static Dataset<Row> zipWithIndex(Dataset<Row> df, String name) {&nbsp; &nbsp; &nbsp; &nbsp; JavaRDD<Row> rdd = df.javaRDD().zipWithIndex().map(t -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Row r = t._1;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Long index = t._2 + 1;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ArrayList<Object> list = new ArrayList<>();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; scala.collection.Iterator<Object> iterator = r.toSeq().iterator();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while(iterator.hasNext()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Object value = iterator.next();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; assert value != null;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; list.add(value);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; list.add(index);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return RowFactory.create(list.toArray());&nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; StructType newSchema = df.schema()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .add(new StructField(name, DataTypes.LongType, true, Metadata.empty()));&nbsp; &nbsp; &nbsp; &nbsp; return df.sparkSession().createDataFrame(rdd, newSchema);&nbsp; &nbsp; }}输出:+---+------+----------+| id|index1|good_index|+---+------+----------+|&nbsp; 0|&nbsp; &nbsp; &nbsp;0|&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;1||&nbsp; 1|&nbsp; &nbsp; &nbsp;1|&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;2||&nbsp; 2|&nbsp; &nbsp; &nbsp;2|&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;3||&nbsp; 3|&nbsp; &nbsp; &nbsp;3|&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;4||&nbsp; 4|&nbsp; &nbsp; &nbsp;4|&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;5|+---+------+----------+
随时随地看视频慕课网APP

相关分类

Java
我要回答