使用pyspark并行化排序算法

早上好,我开发了一个简单的归并排序算法,我想比较它在并行化和非并行化时的性能。


首先,我生成一个数字列表来排序并检查合并排序对列表进行排序需要多长时间。


我要做的下一件事是将数字列表传递给sc.parallelize()并转换为list,RDD然后将合并排序函数传递给mapPartitions()然后collect()。


import random

import time

from pyspark import SparkContext


def execute_merge_sort(generated_list):

    start_time = time.time()

    sorted_list = merge_sort(generated_list)

    elapsed = time.time() - start_time

    print('Simple merge sort: %f sec' % elapsed)

    return sorted_list



def generate_list(length):

    N = length

    generated_list = [random.random() for num in range(N)]

    return generated_list


def merging(left_side, right_side):

    result = []

    i = j = 0

    while i < len(left_side) and j < len(right_side):

        if left_side[i] <= right_side[j]:

            result.append(left_side[i])

            i += 1

        else:

            result.append(right_side[j])

            j += 1

    if i == len(left_side):

        result.extend(right_side[j:])

    else:

        result.extend(left_side[i:])

    return result



def merge_sort(generated_list):

    if len(generated_list) <= 1:

        return generated_list

    middle_value = len(generated_list) // 2

    sorted_list = merging(merge_sort(generated_list[:middle_value]), merge_sort(generated_list[middle_value:]))

    return sorted_list



def is_sorted(num_array):

    for i in range(1, len(num_array)):

        if num_array[i] < num_array[i - 1]:

            return False

    return True


generate_list = generate_list(500000)


sorted_list = execute_merge_sort(generate_list)


sc = SparkContext()


rdd = sc.parallelize(generate_list).mapPartitions(execute_merge_sort).collect()

当我执行此操作时sc.parallelize(generate_list).mapPartitions(execute_merge_sort).collect(),出现以下错误:


File "<ipython-input-15-1b7974b4fa56>", line 7, in execute_merge_sort

  File "<ipython-input-15-1b7974b4fa56>", line 36, in merge_sort

TypeError: object of type 'itertools.chain' has no len()

任何帮助,将不胜感激。提前致谢。


德玛西亚99
浏览 271回答 1
1回答

当年话下

我想出了如何解决TypeError: 'float' object is not iterable.这可以通过使用flatMap(lambda x: x)和调用扁平化数据glom()以包装列表并使其可由函数执行来解决execute_merge_sort。通过执行以下行,返回的结果是一个包含排序列表的列表。sc.parallelize(random_list_of_lists).flatMap(lambda&nbsp;x:&nbsp;x).glom().mapPartitions(execute_merge_sort_rdd).collect()
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python