猿问

pyspark计算当前时间和上次活动时间之间差异的移动平均值

我有一些这样的记录。


A    B

1    2018-12-25

2    2019-01-15

1    2019-01-20

3    2018-01-01

2    2019-01-01

4    2018-04-09

3    2018-11-08

1    2018-03-20

我想要得到的是这样的东西。第一步,在组内按升序排列。(不需要按A订购)


A    B

1    2018-03-20

1    2018-12-25

1    2019-01-20

3    2018-01-01

3    2018-11-08

2    2019-01-01

2    2019-01-15

4    2018-04-09

第二步,获取组内连续行之间的时间差。


A    B            C

1    2018-03-20   NaN

1    2018-12-25   280

1    2019-01-20   26

3    2018-01-01   NaN

3    2018-11-08   311

2    2019-01-01   NaN

2    2019-01-15   14

4    2018-04-09   NaN

第三步,得到窗口大小为2的C的移动平均。(因为我只提供了很少的行作为例子,为了方便就选择大小2)


A    B            C     moving_avg

1    2018-03-20   NaN   NaN

1    2018-12-25   280   280

1    2019-01-20   26    153

3    2018-01-01   NaN   NaN

3    2018-11-08   311   311

2    2019-01-01   NaN   NaN

2    2019-01-15   14    14

4    2018-04-09   NaN   NaN

如果 Windows 函数可以处理这种情况,该解决方案实际上不需要生成 C 列。我列出每个步骤只是为了确保您可以清楚地了解问题所在。


结果集将如下所示


A    B            moving_avg

1    2018-03-20   NaN

1    2018-12-25   280

1    2019-01-20   153

3    2018-01-01   NaN

3    2018-11-08   311

2    2019-01-01   NaN

2    2019-01-15   14

4    2018-04-09   NaN

注意:这是在 pyspark 上并使用数据框。不是在 Python 上使用 Pandas。


海绵宝宝撒
浏览 151回答 2
2回答

湖上湖

可能有更聪明的方法来实现这一点,但您也可以使用 RDD :from operator import addfrom numpy import meanfrom datetime import datetimedata = [(1, "2018-12-25"), (2, "2019-01-15"), (1, "2019-01-20"), (3, "2018-01-01"),        (2, "2019-01-01"), (4, "2018-04-09"), (3, "2018-11-08"), (1, "2018-03-20")]data = sc.parallelize(data).mapValues(lambda v: [datetime.strptime(v, "%Y-%m-%d")]).reduceByKey(add)def computeMvgAvg(values):sorted_date = sorted(values)diffs = []mvg_avg = []for i in range(1, len(sorted_date)):    diffs.append(int((sorted_date[i] - sorted_date[i-1]).total_seconds()/86400))    mvg_avg.append(int(mean(diffs)))diffs = [None] + diffsmvg_avg = [None] + mvg_avgreturn zip(sorted_date, diffs, mvg_avg)sch = StructType([   StructField("A", StringType(), True),   StructField("B", DateType(), True),   StructField("C", IntegerType(), True),   StructField("moving_avg", IntegerType(), True)])data.flatMapValues(myMapValues).map(lambda row: [row[0]] + list(row[1])).toDF(schema=sch).show()+---+----------+----+----------+|  A|         B|   C|moving_avg|+---+----------+----+----------+|  1|2018-03-20|null|      null||  1|2018-12-25| 280|       280||  1|2019-01-20|  26|       153||  2|2019-01-01|null|      null||  2|2019-01-15|  14|        14||  3|2018-01-01|null|      null||  3|2018-11-08| 311|       311||  4|2018-04-09|null|      null|+---+----------+----+----------+

慕姐8265434

文档: 窗口文档: 滞后# Creating a Dataframefrom pyspark.sql.window import Windowfrom pyspark.sql.functions import col, to_date, lag, datediff, when, udfdf = sqlContext.createDataFrame([(1,'2018-12-25'),(2,'2019-01-15'),(1,'2019-01-20'),(3,'2018-01-01'),                                 (2,'2019-01-01'),(4,'2018-04-09'),(3,'2018-11-08'),(1,'2018-03-20')],                                 ['A','B'])df = df.withColumn('B',to_date(col('B'), 'yyyy-MM-dd'))# Using window and lag functions to find the value from previous rowmy_window = Window.partitionBy('A').orderBy('A','B')# Creating a UDF to calculate average of window sized 2.def row_avg(c1,c2):    count_non_null = 2    total = 0    if c1 == None:        c1 = 0        count_non_null = count_non_null - 1    if c2 == None:        c2 = 0        count_non_null = count_non_null - 1    if count_non_null == 0:        return None    else:        return int((c1+c2)/count_non_null)row_avg = udf(row_avg)df = df.withColumn('B_Lag_1', lag(col('B'),1).over(my_window))\       .withColumn('C', datediff(col('B'),col('B_Lag_1'))).drop('B_Lag_1')\       .withColumn('C_Lag_1', lag(col('C'),1).over(my_window))\       .withColumn('moving_avg',row_avg(col('C'),col('C_Lag_1'))).drop('C','C_Lag_1')df.show()+---+----------+----------+|  A|         B|moving_avg|+---+----------+----------+|  1|2018-03-20|      null||  1|2018-12-25|       280||  1|2019-01-20|       153||  3|2018-01-01|      null||  3|2018-11-08|       311||  2|2019-01-01|      null||  2|2019-01-15|        14||  4|2018-04-09|      null|+---+----------+----------+
随时随地看视频慕课网APP

相关分类

Python
我要回答