湖上湖
可能有更聪明的方法来实现这一点,但您也可以使用 RDD :from operator import addfrom numpy import meanfrom datetime import datetimedata = [(1, "2018-12-25"), (2, "2019-01-15"), (1, "2019-01-20"), (3, "2018-01-01"), (2, "2019-01-01"), (4, "2018-04-09"), (3, "2018-11-08"), (1, "2018-03-20")]data = sc.parallelize(data).mapValues(lambda v: [datetime.strptime(v, "%Y-%m-%d")]).reduceByKey(add)def computeMvgAvg(values):sorted_date = sorted(values)diffs = []mvg_avg = []for i in range(1, len(sorted_date)): diffs.append(int((sorted_date[i] - sorted_date[i-1]).total_seconds()/86400)) mvg_avg.append(int(mean(diffs)))diffs = [None] + diffsmvg_avg = [None] + mvg_avgreturn zip(sorted_date, diffs, mvg_avg)sch = StructType([ StructField("A", StringType(), True), StructField("B", DateType(), True), StructField("C", IntegerType(), True), StructField("moving_avg", IntegerType(), True)])data.flatMapValues(myMapValues).map(lambda row: [row[0]] + list(row[1])).toDF(schema=sch).show()+---+----------+----+----------+| A| B| C|moving_avg|+---+----------+----+----------+| 1|2018-03-20|null| null|| 1|2018-12-25| 280| 280|| 1|2019-01-20| 26| 153|| 2|2019-01-01|null| null|| 2|2019-01-15| 14| 14|| 3|2018-01-01|null| null|| 3|2018-11-08| 311| 311|| 4|2018-04-09|null| null|+---+----------+----+----------+
慕姐8265434
文档: 窗口文档: 滞后# Creating a Dataframefrom pyspark.sql.window import Windowfrom pyspark.sql.functions import col, to_date, lag, datediff, when, udfdf = sqlContext.createDataFrame([(1,'2018-12-25'),(2,'2019-01-15'),(1,'2019-01-20'),(3,'2018-01-01'), (2,'2019-01-01'),(4,'2018-04-09'),(3,'2018-11-08'),(1,'2018-03-20')], ['A','B'])df = df.withColumn('B',to_date(col('B'), 'yyyy-MM-dd'))# Using window and lag functions to find the value from previous rowmy_window = Window.partitionBy('A').orderBy('A','B')# Creating a UDF to calculate average of window sized 2.def row_avg(c1,c2): count_non_null = 2 total = 0 if c1 == None: c1 = 0 count_non_null = count_non_null - 1 if c2 == None: c2 = 0 count_non_null = count_non_null - 1 if count_non_null == 0: return None else: return int((c1+c2)/count_non_null)row_avg = udf(row_avg)df = df.withColumn('B_Lag_1', lag(col('B'),1).over(my_window))\ .withColumn('C', datediff(col('B'),col('B_Lag_1'))).drop('B_Lag_1')\ .withColumn('C_Lag_1', lag(col('C'),1).over(my_window))\ .withColumn('moving_avg',row_avg(col('C'),col('C_Lag_1'))).drop('C','C_Lag_1')df.show()+---+----------+----------+| A| B|moving_avg|+---+----------+----------+| 1|2018-03-20| null|| 1|2018-12-25| 280|| 1|2019-01-20| 153|| 3|2018-01-01| null|| 3|2018-11-08| 311|| 2|2019-01-01| null|| 2|2019-01-15| 14|| 4|2018-04-09| null|+---+----------+----------+