如何在 Java 中转置 Apache Spark 数据集

我有一个Dataset<Row>要转置的 Apache Spark。从这里的一些主题中,我很清楚这可以通过分组-透视-聚合来完成。但是我没有按照我需要的方式得到它。我有以下输入表:


+-------+------+------+------+------+

| rho_0 | rho_1| rho_2|rho_3 | names|

+-------+------+------+------+------+

|  1    | 0.89 | 0.66 | 0.074|  rho |

|  1    | 0.89 | 0.66 | 0.074|absRho|

|  0    |  1   | 2    |  3   | lag  |

+-------+------+------+------+------+

我需要的是


+-------+------+------+

| rho   |absRho| lag  |

+-------+------+------+

|  1    | 1    |  0   |

|  0.89 | 0.89 |  1   |

|  0.66 | 0.66 |  2   |

|  0.074| 0.074|  3   |

+-------+------+------+

我试过类似的东西


Dataset<Row> transposed = coll.groupBy().pivot("names").min("rho_0");

但这不起作用。从输入中调用groupBy一系列列也不起作用。我找到了一个我不喜欢的解决方法:


Dataset<Row> transposed = coll.groupBy().pivot("names").min("rho_0")

for (int i = 1; i < nlags; i++) {

    transposed = transposed.union(coll.groupBy().pivot("names").min("rho_" + i));

}

但它真的很慢,并不意味着以这种方式实施。你有什么建议吗?提前致谢!


烙印99
浏览 74回答 1
1回答

MYYA

不幸的是,spark 中没有内置函数可以做到这一点。有一个使用的解决方案pivot,但您需要先“分解”数据框。它应该比基于联合的解决方案快得多。在 Scala 中,它会按如下方式进行。我在下面添加了一个 java 版本。// scalaval cols = df.columns&nbsp; .filter(_ != "names")&nbsp; .map(n => struct(lit(n) as "c", col(n) as "v"))val exploded_df = df.select(col("names"), explode(array(cols : _*)))// javaColumn[] cols = Arrays&nbsp; &nbsp; .stream(df.columns())&nbsp; &nbsp; .filter(x -> ! x.equals("names"))&nbsp; &nbsp; .map(n -> struct(lit(n).alias("c"), col(n).alias("v")))&nbsp; &nbsp; .toArray(Column[]::new);Dataset<Row> exploded_df = df.select(col("names"), explode(array(cols)));exploded_df.show();+------+-------------+| names|&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; col|+------+-------------+|&nbsp; &nbsp;rho|&nbsp; &nbsp; [rho_0,1]||&nbsp; &nbsp;rho| [rho_1,0.89]||&nbsp; &nbsp;rho| [rho_2,0.66]||&nbsp; &nbsp;rho|[rho_3,0.074]||absRho|&nbsp; &nbsp; [rho_0,1]||absRho| [rho_1,0.89]||absRho| [rho_2,0.66]||absRho|[rho_3,0.074]||&nbsp; &nbsp;lag|&nbsp; &nbsp; [rho_0,0]||&nbsp; &nbsp;lag|&nbsp; &nbsp; [rho_1,1]||&nbsp; &nbsp;lag|&nbsp; &nbsp; [rho_2,2]||&nbsp; &nbsp;lag|&nbsp; &nbsp; [rho_3,3]|+------+-------------+基本上,我构建了一个数组列,其中包含由列名及其值组成的结构。然后,我使用 explode 函数来展平这个数组。从那里,我们可以pivot像往常一样使用 ;-)// scala and javaexploded_df&nbsp; .groupBy(col("col.c"))&nbsp; .pivot("names")&nbsp; .agg(first(col("col.v")))&nbsp; .orderBy("c")&nbsp; .show();+-----+------+---+-----+|&nbsp; &nbsp; c|absRho|lag|&nbsp; rho|+-----+------+---+-----+|rho_0|&nbsp; &nbsp; &nbsp;1|&nbsp; 0|&nbsp; &nbsp; 1||rho_1|&nbsp; 0.89|&nbsp; 1| 0.89||rho_2|&nbsp; 0.66|&nbsp; 2| 0.66||rho_3| 0.074|&nbsp; 3|0.074|+-----+------+---+-----+
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java