猿问

列 A 和 B 之间的流差由列 C 和 D 聚合

如何流式传输到表中:


按列 C 和 D 聚合的列 A 和 B 之间的差值。


+-------------+-------------------+--+-

| Column_A|Column_B |Column_C|Column_D|

+-------------+-------------------+--+-

|52       |67       |boy     |car     |

|44       |25       |girl    |bike    |

|98       |85       |boy     |car     |

|52       |41       |girl    |car     |

+-------------+-------------------+--+-

这是我的尝试,但它不起作用:


difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C")

differenceStream = difference.writeStream\

  .queryName("diff_aggr")\

  .format("memory").outputMode("append")\

  .start()


我收到此错误:“GroupedData”对象没有属性“writeStream”


潇潇雨雨
浏览 83回答 1
1回答

森林海

根据您希望如何聚合分组数据 - 您可以执行例如先决条件(如果您尚未设置它们):from pyspark.sql import functions as F from pyspark.sql.functions import *为:sumdifference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.sum(F.col("Difference")).alias("Difference"))为:maxdifference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.max(F.col("Difference")).alias("Difference"))然后:differenceStream = difference.writeStream\  .queryName("diff_aggr")\  .format("memory").outputMode("append")\  .start()关键是 - 如果你这样做,你也需要通过聚合来减少。如果你想把你的值排序在一起,试试groupBydf.sort(...)
随时随地看视频慕课网APP

相关分类

Python
我要回答