猿问

Spark SQL:在数组值上使用 collect_set?

我有一个聚合的 DataFrame,其中有一列使用collect_set. 我现在需要再次聚合此 DataFrame,并再次应用于collect_set该列的值。问题是我需要应用collect_Set集合的值——而且我看到的唯一方法是分解聚合的 DataFrame。有没有更好的办法?


例子:


初始数据框:


country   | continent   | attributes

-------------------------------------

Canada    | America     | A

Belgium   | Europe      | Z

USA       | America     | A

Canada    | America     | B

France    | Europe      | Y

France    | Europe      | X

聚合数据帧(我收到作为输入的那个) - 聚合country:


country   | continent   | attributes

-------------------------------------

Canada    | America     | A, B

Belgium   | Europe      | Z

USA       | America     | A

France    | Europe      | Y, X

我想要的输出 - 聚合continent:


continent   | attributes

-------------------------------------

America     | A, B

Europe      | X, Y, Z


慕桂英3389331
浏览 81回答 1
1回答

哈士奇WWW

由于此时您只能拥有少量行,因此您只需按原样收集属性并将结果展平(Spark >= 2.4)import org.apache.spark.sql.functions.{collect_set, flatten, array_distinct}val byState = Seq(&nbsp; ("Canada", "America", Seq("A", "B")),&nbsp; ("Belgium", "Europe", Seq("Z")),&nbsp; ("USA", "America", Seq("A")),&nbsp; ("France", "Europe", Seq("Y", "X"))).toDF("country", "continent", "attributes")byState&nbsp; .groupBy("continent")&nbsp; .agg(array_distinct(flatten(collect_set($"attributes"))) as "attributes")&nbsp; .show+---------+----------+|continent|attributes|+---------+----------+|&nbsp; &nbsp;Europe| [Y, X, Z]||&nbsp; America|&nbsp; &nbsp; [A, B]|+---------+----------+在一般情况下,事情更难处理,并且在许多情况下,如果您期望大型列表,每个组有许多重复项和许多值,则最佳解决方案*是从头开始重新计算结果,即input.groupBy($"continent").agg(collect_set($"attributes") as "attributes")一种可能的替代方法是使用Aggregatorimport org.apache.spark.sql.expressions.Aggregatorimport org.apache.spark.sql.catalyst.encoders.ExpressionEncoderimport org.apache.spark.sql.{Encoder, Encoders}import scala.collection.mutable.{Set => MSet}class MergeSets[T, U](f: T => Seq[U])(implicit enc: Encoder[Seq[U]]) extends&nbsp;&nbsp; &nbsp; &nbsp;Aggregator[T, MSet[U], Seq[U]] with Serializable {&nbsp; def zero = MSet.empty[U]&nbsp; def reduce(acc: MSet[U], x: T) = {&nbsp; &nbsp; for { v <- f(x) } acc.add(v)&nbsp; &nbsp; acc&nbsp; }&nbsp; def merge(acc1: MSet[U], acc2: MSet[U]) = {&nbsp; &nbsp; acc1 ++= acc2&nbsp; }&nbsp; def finish(acc: MSet[U]) = acc.toSeq&nbsp; def bufferEncoder: Encoder[MSet[U]] = Encoders.kryo[MSet[U]]&nbsp; def outputEncoder: Encoder[Seq[U]] = enc}并按如下方式应用case class CountryAggregate(&nbsp; country: String, continent: String, attributes: Seq[String])byState&nbsp; .as[CountryAggregate]&nbsp; .groupByKey(_.continent)&nbsp; .agg(new MergeSets[CountryAggregate, String](_.attributes).toColumn)&nbsp; .toDF("continent", "attributes")&nbsp; .show+---------+----------+|continent|attributes|+---------+----------+|&nbsp; &nbsp;Europe| [X, Y, Z]||&nbsp; America|&nbsp; &nbsp; [B, A]|+---------+----------+但这显然不是 Java 友好的选择。另请参阅如何在 groupBy 之后将值聚合到集合中?(类似,但没有唯一性约束)。* 这是因为explode可能非常昂贵,尤其是在旧 Spark 版本中,与访问 SQL 集合的外部表示相同。
随时随地看视频慕课网APP

相关分类

Java
我要回答