猿问

在一组文档上使用 ForkJoinPool

我从未使用过 ForkJoinPool,我遇到了这个代码片段。


我有一个Set<Document> docs. 文档有写方法。如果我执行以下操作,是否需要使用 get 或 join 来确保集合中的所有文档都正确完成了它们的写入方法?


ForkJoinPool pool = new ForkJoinPool(concurrencyLevel);

pool.submit(() -> docs.parallelStream().forEach(

    doc -> {

        doc.write();

    })

);

如果其中一个文档无法完成它的写入会发生什么?说它抛出一个异常。给出的代码是否等待所有文档完成其写入操作?


精慕HU
浏览 180回答 1
1回答

呼啦一阵风

ForkJoinPool.submit(Runnable)返回一个ForkJoinTask代表待完成的任务。如果您想等待所有文档被处理,您需要与该任务进行某种形式的同步,例如调用其&nbsp;get()方法(从Future接口)。关于异常处理,像往常一样,流处理期间的任何异常都会停止它。但是,您必须参考以下文档Stream.forEach(Consumer):此操作的行为明显是不确定的。对于并行流管道,此操作不保证遵守流的遇到顺序,因为这样做会牺牲并行性的好处。对于任何给定的元素,可以在库选择的任何时间和线程中执行该操作。[…]这意味着如果发生异常,您无法保证将写入哪个文档。处理将停止,但您无法控制仍将处理哪个文档。如果您想确保处理剩余的文件,我会建议两种解决方案:document.write()用try/包围catch以确保没有异常传播,但这使得很难检查哪个文档成功或根本没有失败;要么使用另一种解决方案来管理您的并行处理,例如CompletableFutureAPI。正如评论中所指出的,您当前的解决方案是一种由于实现细节而起作用的黑客,因此最好做一些更清洁的事情。使用CompletableFuture,你可以这样做:List<CompletableFuture<Void>>&nbsp;futures&nbsp;=&nbsp;docs.stream() &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.map(doc&nbsp;->&nbsp;CompletableFuture.runAsync(doc::write,&nbsp;pool)) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.collect(Collectors.toList());这将确保处理所有文档,并检查返回列表中的每个未来是否成功。
随时随地看视频慕课网APP

相关分类

Java
我要回答