猿问

面向对象的图像多处理

我想使用 multiprocessing 与我的班级并行分析多张图像:


class SegmentationType(object):

    DISPLAY_NAME = "invalid"


    def __init__(self, filename, path):

        self.filename = filename

        self.path = path

        self.input_data = None

        self.output_data = None



    def read_image(self):

        self.input_data =  cv2.imread(self.path + self.filename)[1]


    def write_image(self):

        cv2.imwrite(self.path + self.filename.split('.')[0] + '_' + self.DISPLAY_NAME + '.png', self.output_data)


    def process(self):

        # override in derived classes to perform an actual segmentation

        pass


    def start_pipeline(self):

        self.read_image()

        self.process()

        self.write_image()


class HSV_Segmenter(SegmentationType):

    DISPLAY_NAME = 'HSV'


    def process(self):

        source = rgb_to_hsv(self.input_data)

        self.output_data = treshold_otsu(source)



class LabSegmenter(SegmentationType):

    DISPLAY_NAME = 'LAB'


    def process(self):

        source = rgb_to_lab(self.input_data)

        self.output_data = global_threshold(source)



segmenter_class = {

'hsv': HSV_Segmentation,

'lab': LAB_Segmenter

}.get(procedure)


if not segmenter_class:

    raise ArgumentError("Invalid segmentation method '{}'".format(procedure))


for img in images:

    os.chdir(img_dir)

    processor =  = segmenter_class(img, img_dir, procedure)

    processor.start_pipeline()

但是,我不确定如何调用地图功能:


image_lst = os.listdir(my_image_path)


# We split the list into sublist with 5 elements because of 512 GB RAM limitation

if len(image_lst) > 4:

    nr_of_sublists = int(len(image_lst)/2.5)

    image_sub_lst  =np.array_split(image_lst, nr_of_sublists)

else:

    image_sub_lst = [image_lst]


# We do the analysis for each sublist

for sub_lst in image_sub_lst:

    print (sub_lst)

    pool = multiprocessing.Pool(8)

    

    # Call the processor 

    processor =  = segmenter_class(img, img_dir, procedure)

    processor.start_pipeline()

    # How to call map???

    pool.map(?, sub_lst)

    pool.terminate()

    


动漫人物
浏览 49回答 1
1回答

慕婉清6462132

你必须用对创建列表(filename, path)data = [(img, img_dir) for img in images]然后 map 将在单独的进程中运行每一对。但你必须args进去start_pipeline    def start_pipeline(self, args):        print('ok starting')                filename, path = args        print('filename: {}\npath: {}'.format(filename, path))                return self.process()你必须使用()创建类的实例segmenter_class来使用start_pipelinepool.map(segmenter_class().start_pipeline, data)顺便说一句:在示例代码中,我还返回了过程的结果。import osimport multiprocessingclass SegmentationType(object):    DISPLAY_NAME = "invalid"    def __init__(self):        print('init')    def read_image(self):        print('read')    def write_image(self):        print('write')    def process(self):        # override in derived classes to perform an actual segmentation        pass    def start_pipeline(self, args):        print('ok starting')                filename, path = args        print('filename: {}\npath: {}'.format(filename, path))                return self.process()class HSV_Segmenter(SegmentationType):    DISPLAY_NAME = 'HSV'    def process(self):        print('ok HSV')        return "result HSV"    class LabSegmenter(SegmentationType):    DISPLAY_NAME = 'LAB'    def process(self):        print('ok LAB')        return "result LAB"if __name__ == '__main__':    procedure = 'hsv'        segmenter_class = {        'hsv': HSV_Segmenter,        'lab': LabSegmenter,    }.get(procedure)        images = ['01.png', '02.png', '03.png']    img_dir = 'C:/'        data = [(img, img_dir) for img in images]        pool = multiprocessing.Pool(3)    # example 1    results = pool.map(segmenter_class().start_pipeline, data)    print('Results:', results)    # example 2    for result in pool.map(segmenter_class().start_pipeline, data):        print('result:', result)    pool.terminate()编辑:您还可以创建获取然后使用它的函数procedure-data这样map每个进程都会创建自己的实例,procedure或者您可以将不同的过程发送到不同的进程。import osimport multiprocessingclass SegmentationType(object):    DISPLAY_NAME = "invalid"    def __init__(self):        print('init')    def read_image(self):        print('read')    def write_image(self):        print('write')    def process(self):        # override in derived classes to perform an actual segmentation        pass    def start_pipeline(self, args):        print('ok starting')                filename, path = args        print('filename: {}\npath: {}'.format(filename, path))                return self.process()class HSV_Segmenter(SegmentationType):    DISPLAY_NAME = 'HSV'    def process(self):        print('ok HSV')        return "result HSV"    class LabSegmenter(SegmentationType):    DISPLAY_NAME = 'LAB'    def process(self):        print('ok LAB')        return "result LAB"segmenters = {    'hsv': HSV_Segmenter,    'lab': LabSegmenter,}def start_process(args):    procedure = args[0]    data = args[1:]    segmenter_class = segmenters.get(procedure)    result = segmenter_class().start_pipeline(data)    return result    if __name__ == '__main__':    procedure = 'hsv'        images = ['01.png', '02.png', '03.png']    img_dir = 'C:/'        data = [(procedure, img, img_dir) for img in images]        pool = multiprocessing.Pool(3)    # example 1    results = pool.map(start_process, data)    print('Results:', results)    # example 2    for result in pool.map(segmenter_class().start_pipeline, data):        print('result:', result)    pool.terminate()不同程序的示例if __name__ == '__main__':    images = ['01.png', '02.png', '03.png']    img_dir = 'C:/'        pool = multiprocessing.Pool(3)    data = [('hsv', img, img_dir) for img in images]    results = pool.map(start_process, data)    print('Results HSV:', results)    data = [('lab', img, img_dir) for img in images]    results = pool.map(start_process, data)    print('Results LAB:', results)    pool.terminate()和一个一样map()。有 6 个进程要启动,Pool(3)因此它只会同时运行 3 个进程,当它有空闲进程时,map将从列表中获取下一个值并运行进程。if __name__ == '__main__':    images = ['01.png', '02.png', '03.png']    img_dir = 'C:/'        data_hsv = [('hsv', img, img_dir) for img in images]    data_lab = [('lab', img, img_dir) for img in images]        data = data_hsv + data_lab    pool = multiprocessing.Pool(3)    # example 1    results = pool.map(start_process, data)    print('Results:', results)    # example 2    for result in pool.map(start_process, data):        print('results:', result)    pool.terminate()编辑:它也适用于Ray它只需要from ray.util import multiprocessing代替import multiprocessing我没有用Dask、PySpark或Joblib测试它编辑:Joblib示例from joblib import Parallel, delayedclass SegmentationType(object):    DISPLAY_NAME = "invalid"    def __init__(self):        print('init')    def read_image(self):        print('read')    def write_image(self):        print('write')    def process(self):        # override in derived classes to perform an actual segmentation        pass    def start_pipeline(self, args):        print('ok starting')                filename, path = args        print('filename: {}\npath: {}'.format(filename, path))                return self.process()class HSV_Segmenter(SegmentationType):    DISPLAY_NAME = 'HSV'    def process(self):        print('ok HSV')        return "result HSV"class LabSegmenter(SegmentationType):    DISPLAY_NAME = 'LAB'    def process(self):        print('ok LAB')        return "result LAB"segmenters = {    'hsv': HSV_Segmenter,    'lab': LabSegmenter,}def start_process(args):        procedure = args[0]    data = args[1:]        segmenter_class = segmenters.get(procedure)    result = segmenter_class().start_pipeline(data)        return resultif __name__ == '__main__':    images = ['01.png', '02.png', '03.png']    img_dir = 'C:/'        data_hsv = [('hsv', img, img_dir) for img in images]    data_lab = [('lab', img, img_dir) for img in images]        data = data_hsv + data_lab    # --- version 1 ---    #pool = Parallel(n_jobs=3, backend='threading')    #pool = Parallel(n_jobs=3, backend='multiprocessing')    pool = Parallel(n_jobs=3)        # example 1        results = pool( delayed(start_process)(args) for args in data )    print('Results:', results)    # example 2        for result in pool( delayed(start_process)(args) for args in data ):        print('result:', result)    # --- version 2 ---        #with Parallel(n_jobs=3, backend='threading') as pool:    #with Parallel(n_jobs=3, backend='multiprocessing') as pool:    with Parallel(n_jobs=3) as pool:        # example 1                results = pool( delayed(start_process)(args) for args in data )        print('Results:', results)        # example 1        for result in pool( delayed(start_process)(args) for args in data ):            print('result:', result)
随时随地看视频慕课网APP

相关分类

Python
我要回答