catspeake
UPD最后,我找到了对我来说最有效的实现。在我的纱线配置中,它使用集群的所有资源。from pyspark.sql.functions import explode
def melt(df):
sp = df.columns[1:]
return (df
.rdd
.map(lambda x: [str(x[0]), [(str(i[0]),
float(i[1] if i[1] else 0)) for i in zip(sp, x[1:])]],
preservesPartitioning = True)
.toDF()
.withColumn('_2', explode('_2'))
.rdd.map(lambda x: [str(x[0]),
str(x[1][0]),
float(x[1][1] if x[1][1] else 0)],
preservesPartitioning = True)
.toDF()
)对于非常广泛的dataframe,从user6910411应答到_vars_和_vals生成时,性能下降了。通过selectExpr实现熔融是非常有用的。columns=['a', 'b', 'c', 'd', 'e', 'f']
pd_df = pd.DataFrame([[1,2,3,4,5,6], [4,5,6,7,9,8], [7,8,9,1,2,4], [8,3,9,8,7,4]], columns=columns)
df = spark.createDataFrame(pd_df)
+---+---+---+---+---+---+
| a| b| c| d| e| f|
+---+---+---+---+---+---+
| 1| 2| 3| 4| 5| 6|
| 4| 5| 6| 7| 9| 8|
| 7| 8| 9| 1| 2| 4|
| 8| 3| 9| 8| 7| 4|
+---+---+---+---+---+---+
cols = df.columns[1:]
df.selectExpr('a', "stack({}, {})".format(len(cols), ', '.join(("'{}', {}".format(i, i) for i in cols))))
+---+----+----+
| a|col0|col1|
+---+----+----+
| 1| b| 2|
| 1| c| 3|
| 1| d| 4|
| 1| e| 5|
| 1| f| 6|
| 4| b| 5|
| 4| c| 6|
| 4| d| 7|
| 4| e| 9|
| 4| f| 8|
| 7| b| 8|
| 7| c| 9|
...