Spark:在数据帧中反向直到满足条件

我有以下格式的数据框:


+----------+-------+----------+---------+-----------------------+---------+---------+

|rownum    |viewid |skillid   |parentId |post_timestamp         |is_skill |column A |

+----------+-------+----------+---------+-----------------------+---------+---------+

|1         |251    |b         |xyz12    |2019-01-31 09:24:02.868|true     |abcde    |

|2         |251    |b         |abc34    |2019-01-31 10:24:02.868|false    |453aw    |

|3         |251    |b         |abc34    |2019-01-31 11:24:02.868|false    |abcde    |

|4         |94     |a         |ghi23    |2019-01-31 02:28:05.107|false    |bbbbb    |

|5         |94     |a         |yui67    |2019-01-31 09:06:57.976|true     |nnnn     |

|6         |94     |a         |qwe12    |2019-01-31 09:24:02.868|false    |2n21q    |

|7         |94     |a         |qwe12    |2019-01-31 10:06:57.976|false    |nnnnq    |

|8         |94     |a         |rty87    |2019-01-31 15:07:57.976|true     |1234     |

|9         |94     |a         |bnm22    |2019-01-31 16:28:05.107|true     |1234     |

|10        |94     |a         |bnm22    |2019-01-31 17:28:05.107|true     |6789     |

|11        |94     |b         |tyu12    |2019-01-31 09:24:02.868|true     |6789     |

+----------+-------+----------+---------+-----------------------+---------+---------+

对于一组viewidand skillid,如果当前行parentId不等于前一行parentId ,则在该组中找到最新的技能 ID 值为 true 的行,并检查当前行的 columnA 值不等于该行的 columnA 值。


Column matchedParentId = df.col("parentId").$eq$eq$eq(functions.lag("parentId",1);```


Now how can I go back to the dataframe until skillId is true? I guess going back would be doable as the dataframe is ordered by timestamp.



函数式编程
浏览 142回答 2
2回答

温温酱

我使用 Scala,但我想出的解决方案是- 在 parent_Id 不等于前一个 parent_Id 的行之前,使用窗口函数查找 is_skill = true 的最后一行的行号 - 自连接数据框以匹配行期望的输出如下?+------+------+-------+--------+--------------------+--------+--------+---------------+--------+|rownum|viewid|skillid|parentId|      post_timestamp|is_skill|column A|matchedParentId|isAEqual|+------+------+-------+--------+--------------------+--------+--------+---------------+--------+|     1|   251|      b|   xyz12|20190131 09:24:02...|    true|   abcde|           null|    true||     2|   251|      b|   abc34|20190131 10:24:02...|   false|   453aw|          false|   false||     3|   251|      b|   abc34|20190131 11:24:02...|   false|   abcde|           true|    true||     5|    94|      a|   yui67|20190131 09:06:57...|    true|    nnnn|          false|    true||     6|    94|      a|   qwe12|20190131 09:24:02...|   false|   2n21q|          false|   false||     7|    94|      a|   qwe12|20190131 10:06:57...|   false|   nnnnq|           true|   false||     8|    94|      a|   rty87|20190131 15:07:57...|    true|    1234|          false|    true||     9|    94|      a|   bnm22|20190131 16:28:05...|    true|    1234|          false|    true||    10|    94|      a|   bnm22|20190131 17:28:05...|    true|    6789|           true|    true||    11|    94|      b|   tyu12|20190131 09:24:02...|    true|    6789|           null|    true|+------+------+-------+--------+--------------------+--------+--------+---------------+--------+这是代码:import org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functionsimport spark.implicits._val df = Seq((1,251 ,"b" ,"xyz12" ,"20190131 09:24:02.868",true  ,"abcde"),             (2 ,251 ,"b" ,"abc34" ,"20190131 10:24:02.868", false ,"453aw"),             (3 ,251 ,"b" ,"abc34" ,"20190131 11:24:02.868", false ,"abcde"),             (4 ,94  ,"a" ,"ghi23" ,"20190131 02:28:05.107", false ,"bbbbb"),             (5 ,94  ,"a" ,"yui67" ,"20190131 09:06:57.976", true  ,"nnnn"),             (6 ,94  ,"a" ,"qwe12" ,"20190131 09:24:02.868", false ,"2n21q"),             (7 ,94  ,"a" ,"qwe12" ,"20190131 10:06:57.976", false ,"nnnnq"),             (8 ,94  ,"a" ,"rty87" ,"20190131 15:07:57.976", true  ,"1234"),             (9 ,94  ,"a" ,"bnm22" ,"20190131 16:28:05.107", true  ,"1234"),             (10  ,94  ,"a" ,"bnm22" ,"20190131 17:28:05.107",true  ,"6789"),             (11  ,94  ,"b" ,"tyu12" ,"20190131 09:24:02.868",true  ,"6789")).             toDF("rownum", "viewid", "skillid", "parentId" , "post_timestamp", "is_skill", "column A")val w = Window.partitionBy("viewid", "skillid").orderBy("post_timestamp")val df2 = df.withColumn("matchedParentId", lag($"parentId", 1).over(w).equalTo($"parentId")).             withColumn("rank", rank.over(w)).withColumn("is_skill_int", when($"is_skill", 1).otherwise(0)).             withColumn("test", max($"is_skill_int" * $"rank").over(w))val df3 = df2.as("df_left").             join(df2.as("df_right"), $"df_left.viewid".equalTo($"df_right.viewid").                                  and($"df_left.skillid".equalTo($"df_right.skillid")).                                  and($"df_left.rank".equalTo($"df_right.test"))).             withColumn("isAEqual", $"df_left.column A".equalTo($"df_right.column A")).             select("df_right.rownum", "df_right.viewid", "df_right.skillid", "df_right.parentId", "df_right.post_timestamp", "df_right.is_skill", "df_right.column A", "df_right.matchedParentId", "isAEqual").             orderBy("rownum")df3.show

开满天机

这是方法,我会推荐groupby (viewid,skillid),并将分组记录收集为列表实现udf,取列表,可以遍历和实现逻辑根据 udf 返回值选择记录(可能是时间戳)
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java