猿问

有没有办法在 UDF 中添加新列(在 java spark 中)

我有一个 spark 数据集的列(在 java 中),我希望该列的所有值成为新列的列名(新列可以用常量值填充)。


For example I have:

+------------+

|    Column  | 

+------------+

| a          | 

| b          |

| c          |

+------------+


And I want: 

+------+----+----+---+

|Column| a  |  b | c |

+------+----+----+---+

| a    | 0  | 0  |0  |

| b    | 0  | 0  |0  |

| c    | 0  | 0  |0  |

+------+----+----+---+

我试过的是:


public class test{


    static SparkSession spark = SparkSession.builder().appName("Java")

            .config("spark.master", "local").getOrCreate();

    static Dataset<Row> dataset = spark.emptyDataFrame();


    public Dataset<Row> test(Dataset<Row> ds, SparkSession spark) {

        SQLContext sqlContext = new SQLContext(spark);

        sqlContext.udf().register("add", add, DataTypes.createArrayType(DataTypes.StringType));

        ds = ds.withColumn("substrings", functions.callUDF("add", ds.col("Column")));

        return ds;

    }


    private static UDF1 addSubstrings = new UDF1<String, String[]>() {

        public String[] call(String str) throws Exception {

            dataset = dataset.withColumn(str, functions.lit(0));

            String[] a = {"placeholder"};

            return a;

        }

    };

}

我的问题是,有时我得到正确的结果,有时却没有(未添加列)。我真的不明白为什么。我正在寻找一种将数据集传递给 UDF 的方法,但我不知道该怎么做。


目前我正在通过使用列的 collectAsList() 来解决它,然后迭代 Arraylist 从而添加新列。但这真的很低效,因为我的数据太多了。


繁星淼淼
浏览 170回答 3
3回答

动漫人物

对于这个用例,您可以使用pivot:ds .withColumn("pivot_column", $"first_column") .groupBy($"first_column") .pivot("pivot_column") .count如果你想要更好的性能,你可能想在 pivot 中提供可能的值,比如pivot("pivot_column", Seq("a", "b", "c"))我用于count聚合,但你可以进行任何你想要的聚合。From+------------+|first_column| +------------+| a          | | b          || c          |+------------+To+------------+---+---+---+|first_column| a | b | c |+------------+---+---+---+| a          | 1 | 0 | 0 || b          | 0 | 1 | 0 || c          | 0 | 0 | 1 |+------------+---+---+---+

慕桂英546537

如果的值Column是最小的/更少,请尝试下面的代码。df.show+------+|Column|+------+|&nbsp; &nbsp; &nbsp;A||&nbsp; &nbsp; &nbsp;B||&nbsp; &nbsp; &nbsp;C|+------+// If you have multiple columns are exist, select only required columnval names = df.select($"Column").as[String].collect&nbsp;val df1 = names.foldLeft(df)((df,n) => df.withColumn(n, lit(0)))df1.show()+------+---+---+---+|Column|&nbsp; A|&nbsp; B|&nbsp; C|+------+---+---+---+|&nbsp; &nbsp; &nbsp;A|&nbsp; 0|&nbsp; 0|&nbsp; 0||&nbsp; &nbsp; &nbsp;B|&nbsp; 0|&nbsp; 0|&nbsp; 0||&nbsp; &nbsp; &nbsp;C|&nbsp; 0|&nbsp; 0|&nbsp; 0|+------+---+---+---+

蓝山帝景

我认为 Spark 的本质(更准确地说,它的并行性)不允许您使用 UDF 实现您的目标。执行查询时,Spark 将数据分发给执行器,每个执行器都有自己的行块。每个行块都有自己的列的可能值列表Column。因此,每个执行者都会尝试添加自己的列列表,这与其他执行者所做的不同。因此,当驱动程序尝试合并来自不同执行程序的结果集时,它会失败(或者执行程序可能会失败)。collectAsList确实解决了你的问题,虽然效率很低。此外,您可以猜测列数并发明一些函数(适合您的实际数据)来将列获得的值映射Column到这些数字 - 这样您就可以使每个执行程序的列集保持相等。该解决方案不是很通用,但可以解决某些情况。即,您会得到像这样的列:<c01, c02, c03, ..., cNN>。
随时随地看视频慕课网APP

相关分类

Java
我要回答