这可能是最容易通过示例解释的。假设我有一个用户登录网站的DataFrame,例如:
scala> df.show(5)
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
我想在此列添加一个列,指示他们何时成为网站上的活跃用户。但有一点需要注意:有一段时间用户被认为是活动的,在此期间之后,如果他们再次登录,他们的became_active日期会重置。假设这段时间是5天。然后从上表派生的所需表将是这样的:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+
因此,特别是,SirChillingtonIV的became_active日期被重置,因为他们的第二次登录是在活动期过期之后,但是Booooooo99900098的became_active日期没有在他/她登录的第二次重置,因为它落在活动期间。
我最初的想法是使用窗口函数lag,然后使用lagged值填充became_active列; 例如,大致类似于:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))
然后,规则填写became_active日期会是这样,如果tmp是null(即,如果它是第一次登录),或者如果login_date - tmp >= 5再became_active = login_date; 否则,转到下一个最近的值tmp并应用相同的规则。这表明了一种递归方法,我无法想象实现的方法。
我的问题:这是一种可行的方法,如果是这样的话,我怎么能“回头”看看早期的价值观,tmp直到我找到一个停止的地方?据我所知,我无法迭代Spark SQL的值Column。还有另一种方法来实现这个结果吗?
呼啦一阵风
相关分类