猿问

如何使用星火找到中位数和分位数

如何使用星火找到中位数和分位数

我怎样才能找到RDD使用分布式方法的整数,IPython和SPark?这个RDD大约有70万个元素,因此太大,无法收集和找到中位数。

这个问题和这个问题类似。但是,这个问题的答案是使用Scala,我不知道。

如何计算ApacheSPark的精确中值?

使用Scala答案的思路,我试图用Python编写类似的答案。

我知道我首先想把RDD..我不知道怎么做。我看到sortBy(将此rdd按给定的keyfunc)和sortByKey(整理这个RDD,它假定由(键,值)对组成。)方法。我认为两者都使用键值,而我的RDD只有整数元素。

  1. 首先,我在考虑

    myrdd.sortBy(lambda x: x)?

  2. 接下来,我将找到RDD的长度(

    rdd.count()).

  3. 最后,我希望找到RDD中心的元素或2个元素。这个方法我也需要帮助。

编辑:

我有个主意。也许我可以索引我的RDD然后Key=index和value=Element。然后我就可以试着按价值分类了?我不知道这是否可能,因为只有一个sortByKey方法。


慕桂英3389331
浏览 702回答 3
3回答

绝地无双

火花2.0+:你可以用approxQuantile实现Greenwald-Khanna算法:Python:df.approxQuantile("x",&nbsp;[0.5],&nbsp;0.25)斯卡拉:df.stat.approxQuantile("x",&nbsp;Array(0.5),&nbsp;0.25)其中最后一个参数是一个相对错误。次数越少,计算结果越准确,计算量越大。星火2.2(火花-14352)它支持对多列的估计:df.approxQuantile(["x",&nbsp;"y",&nbsp;"z"],&nbsp;[0.5],&nbsp;0.25)和df.approxQuantile(Array("x",&nbsp;"y",&nbsp;"z"),&nbsp;Array(0.5),&nbsp;0.25)火花<2.0Python正如我在评论中提到的,这很可能不值得大惊小怪。如果数据相对较小,比如在您的情况下,那么只需在本地收集和计算中值:import&nbsp;numpy&nbsp;as&nbsp;np np.random.seed(323)rdd&nbsp;=&nbsp;sc.parallelize(np.random.randint(1000000,&nbsp;size=700000))%time&nbsp;np.median(rdd.collect())np.array(rdd.collect()).nbytes在我几年前的电脑上,它需要大约0.01秒的时间和大约5.5MB的内存。如果数据要大得多,排序将是一个限制因素,因此,与其获得确切的值,不如在本地进行采样、收集和计算。但是,如果你真的想让一个人使用星火,这样的事情应该能起作用(如果我什么都没搞砸的话):from&nbsp;numpy&nbsp;import&nbsp;floorimport&nbsp;timedef&nbsp;quantile(rdd,&nbsp;p,&nbsp;sample=None,&nbsp;seed=None): &nbsp;&nbsp;&nbsp;&nbsp;"""Compute&nbsp;a&nbsp;quantile&nbsp;of&nbsp;order&nbsp;p&nbsp;∈&nbsp;[0,&nbsp;1] &nbsp;&nbsp;&nbsp;&nbsp;:rdd&nbsp;a&nbsp;numeric&nbsp;rdd &nbsp;&nbsp;&nbsp;&nbsp;:p&nbsp;quantile(between&nbsp;0&nbsp;and&nbsp;1) &nbsp;&nbsp;&nbsp;&nbsp;:sample&nbsp;fraction&nbsp;of&nbsp;and&nbsp;rdd&nbsp;to&nbsp;use.&nbsp;If&nbsp;not&nbsp;provided&nbsp;we&nbsp;use&nbsp;a&nbsp;whole&nbsp;dataset &nbsp;&nbsp;&nbsp;&nbsp;:seed&nbsp;random&nbsp;number&nbsp;generator&nbsp;seed&nbsp;to&nbsp;be&nbsp;used&nbsp;with&nbsp;sample &nbsp;&nbsp;&nbsp;&nbsp;""" &nbsp;&nbsp;&nbsp;&nbsp;assert&nbsp;0&nbsp;<=&nbsp;p&nbsp;<=&nbsp;1 &nbsp;&nbsp;&nbsp;&nbsp;assert&nbsp;sample&nbsp;is&nbsp;None&nbsp;or&nbsp;0&nbsp;<&nbsp;sample&nbsp;<=&nbsp;1 &nbsp;&nbsp;&nbsp;&nbsp;seed&nbsp;=&nbsp;seed&nbsp;if&nbsp;seed&nbsp;is&nbsp;not&nbsp;None&nbsp;else&nbsp;time.time() &nbsp;&nbsp;&nbsp;&nbsp;rdd&nbsp;=&nbsp;rdd&nbsp;if&nbsp;sample&nbsp;is&nbsp;None&nbsp;else&nbsp;rdd.sample(False,&nbsp;sample,&nbsp;seed) &nbsp;&nbsp;&nbsp;&nbsp;rddSortedWithIndex&nbsp;=&nbsp;(rdd. &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;sortBy(lambda&nbsp;x:&nbsp;x). &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;zipWithIndex(). &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;map(lambda&nbsp;(x,&nbsp;i):&nbsp;(i,&nbsp;x)). &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;cache()) &nbsp;&nbsp;&nbsp;&nbsp;n&nbsp;=&nbsp;rddSortedWithIndex.count() &nbsp;&nbsp;&nbsp;&nbsp;h&nbsp;=&nbsp;(n&nbsp;-&nbsp;1)&nbsp;*&nbsp;p &nbsp;&nbsp;&nbsp;&nbsp;rddX,&nbsp;rddXPlusOne&nbsp;=&nbsp;( &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;rddSortedWithIndex.lookup(x)[0] &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;for&nbsp;x&nbsp;in&nbsp;int(floor(h))&nbsp;+&nbsp;np.array([0L,&nbsp;1L])) &nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;rddX&nbsp;+&nbsp;(h&nbsp;-&nbsp;floor(h))&nbsp;*&nbsp;(rddXPlusOne&nbsp;-&nbsp;rddX)还有一些测试:np.median(rdd.collect()),&nbsp;quantile(rdd,&nbsp;0.5)##&nbsp;(500184.5,&nbsp;500184.5)np.percentile(rdd.collect(),&nbsp;25),&nbsp;quantile(rdd,&nbsp;0.25)##&nbsp;(250506.75,&nbsp;250506.75)np.percentile(rdd.collect(),&nbsp;75),&nbsp;quantile(rdd,&nbsp;0.75)(750069.25,&nbsp;750069.25)最后,让我们定义中位数:from&nbsp;functools&nbsp;import&nbsp;partial median&nbsp;=&nbsp;partial(quantile,&nbsp;p=0.5)到目前为止还不错,但是它需要4.66秒的本地模式,没有任何网络通信。也许有办法改善这一点,但为什么还要费心呢?语言独立&nbsp;(蜂箱):如果你用HiveContext您也可以使用HiveUDAFs。具有积分值:rdd.map(lambda&nbsp;x:&nbsp;(float(x),&nbsp;)).toDF(["x"]).registerTempTable("df")sqlContext.sql("SELECT&nbsp;percentile_approx(x,&nbsp;0.5)&nbsp;FROM&nbsp;df")具有连续值:sqlContext.sql("SELECT&nbsp;percentile(x,&nbsp;0.5)&nbsp;FROM&nbsp;df")在……里面percentile_approx您可以传递另一个参数,该参数确定要使用的多个记录。

守候你守候我

如果您只想要一个RDD方法,并且不想移动到DF,那么添加一个解决方案。这个片段可以为双的RDD获得一个百分位数。如果输入百分位数为50,则应获得所需的中位数。如果有不明案件,请告诉我。/** &nbsp;&nbsp;*&nbsp;Gets&nbsp;the&nbsp;nth&nbsp;percentile&nbsp;entry&nbsp;for&nbsp;an&nbsp;RDD&nbsp;of&nbsp;doubles&nbsp;&nbsp;* &nbsp;&nbsp;*&nbsp;@param&nbsp;inputScore&nbsp;:&nbsp;Input&nbsp;scores&nbsp;consisting&nbsp;of&nbsp;a&nbsp;RDD&nbsp;of&nbsp;doubles&nbsp;&nbsp;*&nbsp;@param&nbsp;percentile&nbsp;:&nbsp;The&nbsp;percentile&nbsp;cutoff&nbsp;required&nbsp;(between&nbsp;0&nbsp;to&nbsp;100),&nbsp;e.g&nbsp;90%ile&nbsp;of&nbsp;[1,4,5,9,19,23,44]&nbsp;=&nbsp;~23. &nbsp;&nbsp;*&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;It&nbsp;prefers&nbsp;the&nbsp;higher&nbsp;value&nbsp;when&nbsp;the&nbsp;desired&nbsp;quantile&nbsp;lies&nbsp;between&nbsp;two&nbsp;data&nbsp;points&nbsp;&nbsp;*&nbsp;@return&nbsp;:&nbsp;The&nbsp;number&nbsp;best&nbsp;representing&nbsp;the&nbsp;percentile&nbsp;in&nbsp;the&nbsp;Rdd&nbsp;of&nbsp;double&nbsp;&nbsp;*/&nbsp;&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;def&nbsp;getRddPercentile(inputScore:&nbsp;RDD[Double],&nbsp;percentile:&nbsp;Double):&nbsp;Double&nbsp;=&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;val&nbsp;numEntries&nbsp;=&nbsp;inputScore.count().toDouble &nbsp;&nbsp;&nbsp;&nbsp;val&nbsp;retrievedEntry&nbsp;=&nbsp;(percentile&nbsp;*&nbsp;numEntries&nbsp;/&nbsp;100.0&nbsp;).min(numEntries).max(0).toInt &nbsp;&nbsp;&nbsp;&nbsp;inputScore&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.sortBy&nbsp;{&nbsp;case&nbsp;(score)&nbsp;=>&nbsp;score&nbsp;} &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.zipWithIndex() &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.filter&nbsp;{&nbsp;case&nbsp;(score,&nbsp;index)&nbsp;=>&nbsp;index&nbsp;==&nbsp;retrievedEntry&nbsp;} &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.map&nbsp;{&nbsp;case&nbsp;(score,&nbsp;index)&nbsp;=>&nbsp;score&nbsp;} &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.collect()(0) &nbsp;&nbsp;}
随时随地看视频慕课网APP

相关分类

Python
我要回答