具有复杂条件的Spark SQL窗口函数

这可能是最容易通过示例解释的。假设我有一个用户登录网站的DataFrame,例如:


scala> df.show(5)

+----------------+----------+

|       user_name|login_date|

+----------------+----------+

|SirChillingtonIV|2012-01-04|

|Booooooo99900098|2012-01-04|

|Booooooo99900098|2012-01-06|

|  OprahWinfreyJr|2012-01-10|

|SirChillingtonIV|2012-01-11|

+----------------+----------+

only showing top 5 rows

我想在此列添加一个列,指示他们何时成为网站上的活跃用户。但有一点需要注意:有一段时间用户被认为是活动的,在此期间之后,如果他们再次登录,他们的became_active日期会重置。假设这段时间是5天。然后从上表派生的所需表将是这样的:


+----------------+----------+-------------+

|       user_name|login_date|became_active|

+----------------+----------+-------------+

|SirChillingtonIV|2012-01-04|   2012-01-04|

|Booooooo99900098|2012-01-04|   2012-01-04|

|Booooooo99900098|2012-01-06|   2012-01-04|

|  OprahWinfreyJr|2012-01-10|   2012-01-10|

|SirChillingtonIV|2012-01-11|   2012-01-11|

+----------------+----------+-------------+

因此,特别是,SirChillingtonIV的became_active日期被重置,因为他们的第二次登录是在活动期过期之后,但是Booooooo99900098的became_active日期没有在他/她登录的第二次重置,因为它落在活动期间。


我最初的想法是使用窗口函数lag,然后使用lagged值填充became_active列; 例如,大致类似于:


import org.apache.spark.sql.expressions.Window

import org.apache.spark.sql.functions._


val window = Window.partitionBy("user_name").orderBy("login_date")

val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))

然后,规则填写became_active日期会是这样,如果tmp是null(即,如果它是第一次登录),或者如果login_date - tmp >= 5再became_active = login_date; 否则,转到下一个最近的值tmp并应用相同的规则。这表明了一种递归方法,我无法想象实现的方法。


我的问题:这是一种可行的方法,如果是这样的话,我怎么能“回头”看看早期的价值观,tmp直到我找到一个停止的地方?据我所知,我无法迭代Spark SQL的值Column。还有另一种方法来实现这个结果吗?


收到一只叮咚
浏览 756回答 2
2回答

呼啦一阵风

重构对方的回答 与工作Pyspark在Pyspark你可以像下面。create data framedf = sqlContext.createDataFrame([("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"), ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"), ("SirChillingtonIV", "2012-08-11")], ("user_name", "login_date"))上面的代码创建了一个如下所示的数据框+----------------+----------+|       user_name|login_date|+----------------+----------+|SirChillingtonIV|2012-01-04||Booooooo99900098|2012-01-04||Booooooo99900098|2012-01-06||  OprahWinfreyJr|2012-01-10||SirChillingtonIV|2012-01-11||SirChillingtonIV|2012-01-14||SirChillingtonIV|2012-08-11|+----------------+----------+现在我们要先发现它们之间的区别login_date是多于5几天。对于这个,如下所示。必要的进口from pyspark.sql import functions as ffrom pyspark.sql import Window# defining window partitions  login_window = Window.partitionBy("user_name").orderBy("login_date")session_window = Window.partitionBy("user_name", "session")session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))当我们运行上面的代码行时,如果date_diff是,NULL则coalesce函数将替换NULL为0。+----------------+----------+-------+|       user_name|login_date|session|+----------------+----------+-------+|  OprahWinfreyJr|2012-01-10|      0||SirChillingtonIV|2012-01-04|      0||SirChillingtonIV|2012-01-11|      1||SirChillingtonIV|2012-01-14|      1||SirChillingtonIV|2012-08-11|      2||Booooooo99900098|2012-01-04|      0||Booooooo99900098|2012-01-06|      0|+----------------+----------+-------+# add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above stepfinal_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")+----------------+----------+-------------+|       user_name|login_date|became_active|+----------------+----------+-------------+|  OprahWinfreyJr|2012-01-10|   2012-01-10||SirChillingtonIV|2012-01-04|   2012-01-04||SirChillingtonIV|2012-01-11|   2012-01-11||SirChillingtonIV|2012-01-14|   2012-01-11||SirChillingtonIV|2012-08-11|   2012-08-11||Booooooo99900098|2012-01-04|   2012-01-04||Booooooo99900098|2012-01-06|   2012-01-04|+----------------+----------+-------------+
打开App,查看更多内容
随时随地看视频慕课网APP