Spark SQL 分区依据、窗口、排序依据、计数

假设我有一个包含杂志订阅信息的数据框:


subscription_id    user_id       created_at       expiration_date

 12384               1           2018-08-10        2018-12-10

 83294               1           2018-06-03        2018-10-03

 98234               1           2018-04-08        2018-08-08

 24903               2           2018-05-08        2018-07-08

 32843               2           2018-03-25        2018-05-25

 09283               2           2018-04-07        2018-06-07

现在我想添加一个列,说明在当前订阅开始之前用户有多少以前的订阅已过期。换句话说,与给定用户相关联的到期日期在此订阅的开始日期之前。这是完整的所需输出:


subscription_id    user_id       created_at       expiration_date   previous_expired

 12384               1           2018-08-10        2018-12-10          1

 83294               1           2018-06-03        2018-10-03          0

 98234               1           2018-04-08        2018-08-08          0

 24903               2           2018-05-08        2018-07-08          2

 32843               2           2018-03-25        2018-05-03          1

 09283               2           2018-01-25        2018-02-25          0

尝试:


编辑:使用 Python 尝试了各种滞后/领先/等,我现在认为这是一个 SQL 问题


df = df.withColumn('shiftlag', func.lag(df.expires_at).over(Window.partitionBy('user_id').orderBy('created_at')))

<--- 编辑,编辑:没关系,这行不通


我想我用尽了滞后/领先/转移方法,发现它不起作用。我现在认为最好使用 Spark SQL 来做到这一点,也许使用 acase when来生成新列,结合 a having count,按 ID 分组?


潇潇雨雨
浏览 230回答 1
1回答

神不在的星期二

使用 PySpark 解决了这个问题:我首先创建了另一个列,其中包含每个用户的所有到期日期数组:joined_array = df.groupBy('user_id').agg(collect_set('expiration_date'))然后将该数组加入到原始数据帧中:joined_array = joined_array.toDF('user_idDROP', 'expiration_date_array')df = df.join(joined_array, df.user_id == joined_array.user_idDROP, how = 'left').drop('user_idDROP')然后创建一个函数来遍历数组,如果创建日期大于到期日期,则将计数加 1:def check_expiration_count(created_at, expiration_array):&nbsp; if not expiration_array:&nbsp; &nbsp; return 0&nbsp; else:&nbsp; &nbsp;count = 0&nbsp; &nbsp; for i in expiration_array:&nbsp; if created_at > i:&nbsp; &nbsp; count += 1return countcheck_expiration_count = udf(check_expiration_count, IntegerType())然后应用该函数创建一个具有正确计数的新列:df = df.withColumn('count_of_subs_ending_before_creation', check_expiration_count(df.created_at, df.expiration_array))瓦拉。完毕。谢谢大家(没有人帮忙,但还是谢谢)。希望有人在 2022 年发现这很有用
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python