Python Spark - 在阈值后删除数据 - Pyspark

如何删除最后TP == 1一个具有 48 小时缓冲区的数据?


例如ID = A9,最后一个TP == 1是 on 2020-05-06 13:00。我想保留该组 ID 的所有数据,直到2020-05-06 13:00最后TP == 1加上接下来的 2 天?


+---++--------+----------------+

| id|       TP|            Date|

+---+---------+----------------+

| A1|     Null|2010-01-01 12:00|

| A1|     Null|2010-01-01 13:00|

| A1|        1|2010-01-02 01:00|

| A1|     Null|2010-01-02 02:00|

| A9|     Null|2010-05-05 12:00|

| A9|        1|2010-05-05 13:00|

| A9|        1|2010-05-06 13:00|

| A9|     Null|2010-05-09 13:00|

+---+---------+----------------+

所需的数据框


+---++--------+----------------+

| id|       TP|            Date|

+---+---------+----------------+

| A1|     Null|2010-01-01 12:00|

| A1|     Null|2010-01-01 13:00|

| A1|        1|2010-01-02 01:00|

| A1|     Null|2010-01-02 02:00|

| A9|     Null|2010-05-05 12:00|

| A9|        1|2010-05-05 13:00|

| A9|        1|2010-05-06 13:00|

+---+---------+----------------+

这就是我在 Pandas 中所做的,但对于 15M+ 的观察结果效率不高


main_pd = main.toPandas()


bigdf = pd.DataFrame()


for i in main_pd.ID.unique():

  df = main_pd[main_pd.ID == i]

  TPdate = df[df.TP == 1]['Date'].max()+pd.Timedelta('3 days 0 hours')

  df = df[(df.Date <= TPdate)]

  bigdf = bigdf.append(df)


狐的传说
浏览 72回答 2
2回答

月关宝盒

IIUC,您可以使用窗口函数查找max(IF(TP=1, Date, NULL))每个id,然后按此阈值进行过滤:from pyspark.sql import Window, functions as Fw1 = Window.partitionBy('id')df_new = df.withColumn('Date', F.to_timestamp('Date', 'yyyy-MM-dd HH:mm')) \&nbsp; &nbsp; .withColumn('threshhold_date', F.expr("max(IF(TP=1, Date, NULL))").over(w1)) \&nbsp; &nbsp; .filter('Date <= threshhold_date + interval 2 days')&nbsp;df_new.show()+---+----+-------------------+-------------------+| id|&nbsp; TP|&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Date|&nbsp; &nbsp; threshhold_date|+---+----+-------------------+-------------------+| A9|Null|2010-05-05 12:00:00|2010-05-06 13:00:00|| A9|&nbsp; &nbsp;1|2010-05-05 13:00:00|2010-05-06 13:00:00|| A9|&nbsp; &nbsp;1|2010-05-06 13:00:00|2010-05-06 13:00:00|| A1|Null|2010-01-01 12:00:00|2010-01-02 01:00:00|| A1|Null|2010-01-01 13:00:00|2010-01-02 01:00:00|| A1|&nbsp; &nbsp;1|2010-01-02 01:00:00|2010-01-02 01:00:00|| A1|Null|2010-01-02 02:00:00|2010-01-02 01:00:00|+---+----+-------------------+-------------------+

阿晨1998

您可以简单地过滤数据帧TP = 1, 并使用collect()[0]来获取列的最大值Date作为变量。使用以下命令向该变量添加 48 小时timedelta并过滤df:from pyspark.sql.functions import *from datetime import timedeltadate_var = df.filter(col("TP")==1).orderBy("date", ascending=False)\&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .collect()[0]["date"] + timedelta(hours=48)df.filter(col("Date")<=date_var).show()
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python