猿问

Spark 未使用二进制文件在并行 Pyspark 中运行 RDD

我是 Spark 的新手,开始用 Python 编写一些脚本。我的理解是Spark并行执行Transformation(map)。


def some_function(name, content):

    print(name, datetime.now())

    time.sleep(30)

    return content


config = SparkConf().setAppName("sample2").setMaster("local[*]")

filesRDD = SparkContext(conf=config).binaryFiles("F:\\usr\\temp\\*.zip")

inputfileRDD = filesRDD.map(lambda job_bundle: (job_bundle[0], some_function(job_bundle[0], job_bundle[1])))

print(inputfileRDD.collect())

上面的代码.zip从文件夹中收集文件列表并对其进行处理。当我执行它时,我看到这是按顺序发生的。


输出


file:/F:/usr/temp/sample.zip 2020-10-22 10:42:37.089085

file:/F:/usr/temp/sample1.zip 2020-10-22 10:43:07.103317

您可以看到它在 30 秒后开始处理第二个文件。意思是完成第一个文件后。我的代码出了什么问题?为什么它不并行执行RDD?你能帮我么 ?


汪汪一只猫
浏览 83回答 1
1回答

红糖糍粑

我不确切知道该方法如何binaryFiles跨 Spark 分区对文件进行分区。似乎与此相反textFiles,它倾向于只创建一个分区。让我们看看一个名为 的示例目录dir,其中包含 5 个文件。> ls dirtest1  test2  test3  test4  test5如果我使用textFile,事情就会并行运行。我不提供输出,因为它不是很漂亮,但你可以自己检查。我们可以验证事物是否与 并行运行getNumPartitions。>>> sc.textFile("dir").foreach(lambda x: some_function(x, None))# ugly output, but everything starts at the same time,# except maybe the last one since you have 4 cores.>>> sc.textFile("dir").getNumPartitions()5由于binaryFiles情况不同,并且由于某种原因,所有内容都进入同一个分区。>>> sc.binaryFiles("dir").getNumPartitions()1我什至尝试使用 10k 个文件,所有内容仍然位于同一分区。我相信这背后的原因是,在scala中,binaryFiles返回一个带有文件名的RDD和一个允许读取文件的对象(但不执行读取)。因此速度很快,并且生成的 RDD 很小。因此,将其放在一个分区上就可以了。在 scala 中,我们可以在使用后使用重新分区binaryFiles,一切都会很好。scala> sc.binaryFiles("dir").getNumPartitions1scala> sc.binaryFiles("dir").repartition(4).getNumPartitions4scala> sc.binaryFiles("dir").repartition(4)    .foreach{ case (name, ds) => {         println(System.currentTimeMillis+": "+name)        Thread.sleep(2000)        // do some reading on the DataStream ds    }}1603352918396: file:/home/oanicol/sandbox/dir/test11603352918396: file:/home/oanicol/sandbox/dir/test31603352918396: file:/home/oanicol/sandbox/dir/test41603352918396: file:/home/oanicol/sandbox/dir/test51603352920397: file:/home/oanicol/sandbox/dir/test2python 中的问题是binaryFiles实际上将文件读取到一个分区上。另外,这对我来说非常神秘,但是 pyspark 2.4 中的以下代码行会产生与您注意到的相同的行为,这是没有意义的。# this should work but does notsc.binaryFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))# this does not work either, which is strange but it would not be advised anyway# since all the data would be read on one partitionsc.binaryFiles("dir").repartition(4).foreach(lambda x: some_function(x, ''))然而,由于binaryFiles实际读取文件,您可以使用wholeTextFile它将文件作为文本文件读取并按预期运行:# this workssc.wholeTextFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))
随时随地看视频慕课网APP

相关分类

Python
我要回答