我可以为 subscribeOn 方法和异步任务使用相同的执行程序吗

我有一个简单的问题,假设我有一个如下所示的课程:


import lombok.Value;


import java.nio.file.Path;


@Value

class ImageResizeRequest {


    private DownloadedImage downloadedImage;


    private ImageSize imageSize;


    private Path destinationLocation;

}

上面的类代表负责将图像调整为给定大小的单个任务。我有很多要求将此图像调整为许多不同的尺寸。


@RequiredArgsConstructor

class ImageResizeService {


    private final Executor executor;


    Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {


        return Flux.fromIterable(requests)

                .flatMap(this::resize)

                .collectList()

                .subscribeOn(Schedulers.fromExecutor(executor));

    }


    private Mono<ImageResizeResult> resize(ImageResizeRequest request) {


        return Mono.fromFuture(CompletableFuture.supplyAsync(resizeTask(request), executor));


    }


    private Supplier<ImageResizeResult> resizeTask(ImageResizeRequest request) {

        return () -> {

            //TODO add image resize logic for example ImageMagick by Im4Java...

            /** code below call ImageMagick library

             ConvertCmd cmd = new ConvertCmd();

             IMOperation op = new IMOperation();

             op.quality(100d);

             op.addImage(request.getDestinationLocation().toString());

             cmd.run(op);


             */

            //TODO add logic!!!

            return new ImageResizeResult(null, null, null, null);

        };

    }

}

我的问题是:如何在 Project Reactor 中实现负责调整图像大小的并行独立任务?如果没有项目反应器,我将使用 CompletableFuture 列表:


private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {

    CompletableFuture<Void> allDoneFuture =

        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));

    return allDoneFuture.thenApply(v ->

            futures.stream().

                    map(future -> future.join()).

                    collect(Collectors.<T>toList())

    );

}

具有指定的执行程序服务。此外,在我的示例中,我在 subscribeOn 方法和 supplyAsync 中使用相同的执行程序 - 是个好主意吗?


慕斯709654
浏览 226回答 2
2回答

FFIVE

不要不断地Scheduler从重新创建 ,ExecutorService而是努力将它直接包装在构造函数中。您根本不需要CompletableFuture,并且subscribeOn应该应用于内部flatMap以可能为每个调整大小任务选择单独的线程(它从每个 Flux 应用到的池中选择一个线程):class ImageResizeService {&nbsp; private final Executor executor; //TODO prefer an ExecutorService if possible&nbsp; private final Scheduler scheduler; //FIXME Schedulers.fromExecutor(executor)&nbsp; Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {&nbsp; &nbsp; //we get the requests on IO thread&nbsp; &nbsp; return Flux.fromIterable(requests)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //for each request, perform asynchronous resize...&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .flatMap(r -> Mono&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //... by converting the resizeTask Callable to a Mono&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .fromCallable(r -> resizeTask(r).get())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //... and making sure it executes on the executor&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .subscribeOn(scheduler)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .collectList();&nbsp; }}为了实现真正的并行化,您还有另一种选择parallel().runOn()::Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {&nbsp; &nbsp; //we get the requests on IO thread&nbsp; &nbsp; return Flux.fromIterable(requests)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //divide into N workloads&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //the executor _should_ be capable of this degree of parallelisation:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .parallel(NUMBER_OF_DESIRED_THREADS)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //actually tell to run each workload on a thread picked from executor&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .runOn(scheduler)&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //here the workload are already running on their dedicated thread,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //we can afford to block it and thus apply resize in a simpler `map`&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(r -> resizeTask(r).get()) //NB: the Supplier aspect can probably be removed&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //go back to a `Flux` sequence for collection into list&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .sequential()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .collectList();}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java