使用Spark将列转换为行

使用Spark将列转换为行

我正在尝试将我的表的某些列转换为行。我正在使用Python和Spark 1.5.0。这是我的初始表:


+-----+-----+-----+-------+

|  A  |col_1|col_2|col_...|

+-----+-------------------+

|  1  |  0.0|  0.6|  ...  |

|  2  |  0.6|  0.7|  ...  |

|  3  |  0.5|  0.9|  ...  |

|  ...|  ...|  ...|  ...  |

我想有这样的事情:


+-----+--------+-----------+

|  A  | col_id | col_value |

+-----+--------+-----------+

|  1  |   col_1|        0.0|

|  1  |   col_2|        0.6|   

|  ...|     ...|        ...|    

|  2  |   col_1|        0.6|

|  2  |   col_2|        0.7| 

|  ...|     ...|        ...|  

|  3  |   col_1|        0.5|

|  3  |   col_2|        0.9|

|  ...|     ...|        ...|

有人知道我能做到吗?谢谢您的帮助。


凤凰求蛊
浏览 642回答 3
3回答

呼唤远方

Spark本地线性代数库目前非常弱:它们不包括如上所述的基本操作。有一个JIRA用于解决Spark 2.1的问题 - 但今天对你没有帮助。要考虑的事情:执行转置可能需要完全改组数据。现在您需要直接编写RDD代码。我用transposescala 编写- 但不是用python 编写的。这是scala版本: def transpose(mat: DMatrix) = {     val nCols = mat(0).length     val matT = mat      .flatten      .zipWithIndex      .groupBy {       _._2 % nCols    }       .toSeq.sortBy {       _._1    }       .map(_._2)       .map(_.map(_._1))       .toArray     matT  }所以你可以将它转换为python供你使用。在这个特定的时刻,我没有带宽来编写/测试:如果你无法进行转换,请告诉我。至少 - 以下内容很容易转换为python。zipWithIndex- > enumerate()(等价的python - 归功于@ zero323)map - > [someOperation(x) for x in ..]groupBy - > itertools.groupBy()这是flatten没有python等价的实现:  def flatten(L):         for item in L:             try:                 for i in flatten(item):                     yield i            except TypeError:                 yield item所以你应该能够把它们放在一起寻找解决方案。

不负相思意

使用flatmap。像下面的东西应该工作from pyspark.sql import Rowdef rowExpander(row):     rowDict = row.asDict()     valA = rowDict.pop('A')     for k in rowDict:         yield Row(**{'A': valA , 'colID': k, 'colValue': row[k]})newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander))
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python