猿问

当其中之一抛出异常时如何阻止可运行对象的执行

我有一组元素,对于每个元素,我都执行方法,将其作为 Runnable 传递给 CompletableFuture.runAsync() 。在执行过程中,可能需要停止整个计算,因此我在执行方法之前检查一些条件。如果计算应该停止,那么我会抛出一个异常,该异常在 CompletableFuture 之外处理。我想阻止所有 Runnables 的执行,这些 Runnables 在抛出异常后执行。因此,换句话说,当其中任何一个 CompletableFuture 抛出异常时,我不想等待所有 CompletableFuture 完成。


Set elements = ...

Executor executor = Executors.newFixedThreadPool(N);

try {

    CompletableFuture.allOf(elements.stream().map(e - > CompletableFuture.runAsync(() - > {

        if (shouldStop()) {

            throw new MyException();

        }

        myMethod(e);

    }, executor)).toArray(CompletableFuture[]::new)).join()

} catch (CompletionException e) {

    ...

}


千万里不及你
浏览 130回答 3
3回答

慕田峪9158850

发生异常时全部取消即可。障碍在于您在创建它们时并不了解所有这些,并且您不想多次执行此工作。这可以通过创建一个新的、空的CompletableFuture第一个(我们称之为f1)来解决。然后,像以前一样创建 future,但f1.cancel在if(shouldStop()) { … }语句中插入对 的调用。然后,在创建所有 future 后,将一个操作链接起来,将所有 future 取消f1。取消将达到两个目的,它将阻止尚未开始的可运行对象的执行,并且将使未来通过不allOf等待仍在进行的评估完成来返回。由于取消 aCompletableFuture与使用 a 异常完成它没有什么不同CancellationException,并且在出现多个异常的情况下,由 返回的 futureallOf将报告任意一个,我们可以使用自completeExceptionally定义来MyException代替,以确保报告的异常不会是次要的CancellationException。一个独立的例子是:static final AtomicInteger STOP = new AtomicInteger(2);static boolean shouldStop() {&nbsp; &nbsp; return STOP.getAndDecrement() <= 0;}static final int N = 10;public static void main(String[] args) {&nbsp; &nbsp; Set<Integer> elements = IntStream.range(0, 100).boxed().collect(Collectors.toSet());&nbsp; &nbsp; ExecutorService executor = Executors.newFixedThreadPool(N);&nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; CompletableFuture<?> cancelAll = new CompletableFuture<>();&nbsp; &nbsp; &nbsp; &nbsp; CompletableFuture<?>[] all = elements.stream()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(e ->&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; CompletableFuture.runAsync(() -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("entered "+e);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(shouldStop()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; RuntimeException myException = new RuntimeException("stopped");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// alternatively cancelAll.cancel(false);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cancelAll.completeExceptionally(myException);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw myException;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("processing "+e);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }, executor))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .toArray(CompletableFuture<?>[]::new);&nbsp; &nbsp; &nbsp; &nbsp; cancelAll.whenComplete((value,throwable) -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(throwable != null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; CompletableFuture.allOf(all).join();&nbsp; &nbsp; } catch (CompletionException e) {&nbsp; &nbsp; &nbsp; &nbsp; e.printStackTrace();&nbsp; &nbsp; }&nbsp; &nbsp; executor.shutdown();}这会打印类似的东西entered 3entered 8entered 4entered 6entered 1entered 9entered 0entered 7entered 5entered 2entered 10processing 8processing 3java.util.concurrent.CompletionException: java.lang.RuntimeException: stopped&nbsp; &nbsp; at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)&nbsp; &nbsp; at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)&nbsp; &nbsp; at java.base/java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1423)&nbsp; &nbsp; at java.base/java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1144)&nbsp; &nbsp; at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)&nbsp; &nbsp; at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)&nbsp; &nbsp; at CompletableFutureTest.lambda$main$3(CompletableFutureTest.java:34)&nbsp; &nbsp; at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)&nbsp; &nbsp; at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)&nbsp; &nbsp; at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)&nbsp; &nbsp; at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)&nbsp; &nbsp; at CompletableFutureTest.lambda$main$0(CompletableFutureTest.java:26)&nbsp; &nbsp; at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)&nbsp; &nbsp; at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)&nbsp; &nbsp; at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)&nbsp; &nbsp; at java.base/java.lang.Thread.run(Thread.java:834)Caused by: java.lang.RuntimeException: stopped&nbsp; &nbsp; at CompletableFutureTest.lambda$main$0(CompletableFutureTest.java:25)&nbsp; &nbsp; ... 4 more显示由于并发性,一些可运行对象已经在运行,但一旦传播取消,就不会启动后续执行。请注意,由于cancelAll只会在异常情况下完成或根本不会完成,cancelAll.whenComplete((value,throwable) -> { for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable); });因此您可以将链接操作简化为,但这只是编码风格是否保留冗余检查的问题。您还可以向处理步骤添加延迟,以确保allOf(all).join()在满足停止条件时不会等待完成。还可以将一个操作链接到返回的 future,runAsync该操作将在任何异常完成时取消所有操作,而不仅仅是显式停止。但是,必须注意返回表示通过 安排的操作的原始未来,runAsync而不是返回的未来whenComplete。CompletableFuture<?> cancelAll = new CompletableFuture<>();CompletableFuture<?>[] all = elements.stream()&nbsp; &nbsp; .map(e -> {&nbsp; &nbsp; &nbsp; &nbsp; CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("entered "+e);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(shouldStop()) throw new RuntimeException("stopped");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("processing "+e);&nbsp; &nbsp; &nbsp; &nbsp; }, executor);&nbsp; &nbsp; &nbsp; &nbsp; cf.whenComplete((value,throwable) -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(throwable != null) cancelAll.completeExceptionally(throwable);&nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; return cf;&nbsp; &nbsp; })&nbsp; &nbsp; .toArray(CompletableFuture<?>[]::new);cancelAll.whenComplete((value,throwable) -> {&nbsp; &nbsp; for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable);});CompletableFuture.allOf(all).join();

青春有我

我对 s 没有太多(当然没有!)经验CompletableFuture,但我确实有一个建议(可能有帮助?)你可以在CompletableFuture.allOf(elements.stream().maptry 块外部声明 lambda 吗?这样,在尝试内部之前,所有期货都不会运行。但它们仍然可以被 catch 块访问。在其中您可以完成cancel所有这些。

至尊宝的传说

您应该做的主要事情是interrupt希望更快地终止所有正在运行的任务,这意味着这些任务可能需要检查中断,以便它们知道停止正在做的事情并更快地终止。此外,您可以在主线程中继续并让它们在后台终止,而不是等待被中断的任务实际终止。public static void main(String[] args) {&nbsp; &nbsp; List<Integer> elements = Arrays.asList(5, null, 6, 3, 4); // these elements will fail fast&nbsp; &nbsp; // List<Integer> elements = Arrays.asList(5, 2, 6, 3, 4); // these elements will succeed&nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; CountDownLatch latch = new CountDownLatch(elements.size());&nbsp; &nbsp; &nbsp; &nbsp; ExecutorService executor = Executors.newFixedThreadPool(elements.size());&nbsp; &nbsp; &nbsp; &nbsp; elements.stream().forEach(e -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; executor.execute(() -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; doSomething(e);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; latch.countDown();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } catch (Exception ex) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // shutdown executor ASAP on exception, read the docs for `shutdownNow()`&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // it will interrupt all tasks in the executor&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (!executor.isShutdown()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; executor.shutdownNow();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for (int i = (int) latch.getCount(); i >= 0; i--) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; latch.countDown();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // log the exception&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ex.printStackTrace(System.out);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; latch.await();&nbsp; &nbsp; &nbsp; &nbsp; if (executor.isShutdown()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Tasks failed! Terminating remaining tasks in the background.");&nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; executor.shutdown();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Tasks succeeded!");&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; } catch (InterruptedException e) {&nbsp; &nbsp; &nbsp; &nbsp; e.printStackTrace();&nbsp; &nbsp; }}public static void doSomething(Integer sleepSecs) {&nbsp; &nbsp; // You will want to check for `interrupted()` throughout the method you want to be able to cancel&nbsp; &nbsp; if (Thread.interrupted()) {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(Thread.currentThread().getName() + " interrupted early");&nbsp; &nbsp; &nbsp; &nbsp; return;&nbsp; &nbsp; }&nbsp; &nbsp; if (sleepSecs == null) {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(Thread.currentThread().getName() + " throwing exception ");&nbsp; &nbsp; &nbsp; &nbsp; throw new RuntimeException();&nbsp; &nbsp; }&nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(Thread.currentThread().getName() + " started interruptable sleep for " + sleepSecs + "s");&nbsp; &nbsp; &nbsp; &nbsp; Thread.sleep(sleepSecs * 1000);&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(Thread.currentThread().getName() + " finished interruptable sleep" + sleepSecs + "s");&nbsp; &nbsp; } catch (InterruptedException e) {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(Thread.currentThread().getName() + " interrupted sleep!");&nbsp; &nbsp; }&nbsp; &nbsp; // ...possibly some part of the task that can't be skipped, such as cleanup&nbsp; &nbsp; System.out.println(Thread.currentThread().getName() + " complete!");}
随时随地看视频慕课网APP

相关分类

Java
我要回答