温温酱
我使用 Scala,但我想出的解决方案是- 在 parent_Id 不等于前一个 parent_Id 的行之前,使用窗口函数查找 is_skill = true 的最后一行的行号 - 自连接数据框以匹配行期望的输出如下?+------+------+-------+--------+--------------------+--------+--------+---------------+--------+|rownum|viewid|skillid|parentId| post_timestamp|is_skill|column A|matchedParentId|isAEqual|+------+------+-------+--------+--------------------+--------+--------+---------------+--------+| 1| 251| b| xyz12|20190131 09:24:02...| true| abcde| null| true|| 2| 251| b| abc34|20190131 10:24:02...| false| 453aw| false| false|| 3| 251| b| abc34|20190131 11:24:02...| false| abcde| true| true|| 5| 94| a| yui67|20190131 09:06:57...| true| nnnn| false| true|| 6| 94| a| qwe12|20190131 09:24:02...| false| 2n21q| false| false|| 7| 94| a| qwe12|20190131 10:06:57...| false| nnnnq| true| false|| 8| 94| a| rty87|20190131 15:07:57...| true| 1234| false| true|| 9| 94| a| bnm22|20190131 16:28:05...| true| 1234| false| true|| 10| 94| a| bnm22|20190131 17:28:05...| true| 6789| true| true|| 11| 94| b| tyu12|20190131 09:24:02...| true| 6789| null| true|+------+------+-------+--------+--------------------+--------+--------+---------------+--------+这是代码:import org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functionsimport spark.implicits._val df = Seq((1,251 ,"b" ,"xyz12" ,"20190131 09:24:02.868",true ,"abcde"), (2 ,251 ,"b" ,"abc34" ,"20190131 10:24:02.868", false ,"453aw"), (3 ,251 ,"b" ,"abc34" ,"20190131 11:24:02.868", false ,"abcde"), (4 ,94 ,"a" ,"ghi23" ,"20190131 02:28:05.107", false ,"bbbbb"), (5 ,94 ,"a" ,"yui67" ,"20190131 09:06:57.976", true ,"nnnn"), (6 ,94 ,"a" ,"qwe12" ,"20190131 09:24:02.868", false ,"2n21q"), (7 ,94 ,"a" ,"qwe12" ,"20190131 10:06:57.976", false ,"nnnnq"), (8 ,94 ,"a" ,"rty87" ,"20190131 15:07:57.976", true ,"1234"), (9 ,94 ,"a" ,"bnm22" ,"20190131 16:28:05.107", true ,"1234"), (10 ,94 ,"a" ,"bnm22" ,"20190131 17:28:05.107",true ,"6789"), (11 ,94 ,"b" ,"tyu12" ,"20190131 09:24:02.868",true ,"6789")). toDF("rownum", "viewid", "skillid", "parentId" , "post_timestamp", "is_skill", "column A")val w = Window.partitionBy("viewid", "skillid").orderBy("post_timestamp")val df2 = df.withColumn("matchedParentId", lag($"parentId", 1).over(w).equalTo($"parentId")). withColumn("rank", rank.over(w)).withColumn("is_skill_int", when($"is_skill", 1).otherwise(0)). withColumn("test", max($"is_skill_int" * $"rank").over(w))val df3 = df2.as("df_left"). join(df2.as("df_right"), $"df_left.viewid".equalTo($"df_right.viewid"). and($"df_left.skillid".equalTo($"df_right.skillid")). and($"df_left.rank".equalTo($"df_right.test"))). withColumn("isAEqual", $"df_left.column A".equalTo($"df_right.column A")). select("df_right.rownum", "df_right.viewid", "df_right.skillid", "df_right.parentId", "df_right.post_timestamp", "df_right.is_skill", "df_right.column A", "df_right.matchedParentId", "isAEqual"). orderBy("rownum")df3.show