从 CompletableFuture 调用 ExecutorService.shutdownNow

当已经运行的任务之一引发异常时,我需要取消所有计划但尚未运行的 CompletableFuture 任务。


尝试了以下示例,但大多数情况下 main 方法不会退出(可能是由于某种类型的死锁)。


public static void main(String[] args) {

    ExecutorService executionService = Executors.newFixedThreadPool(5);


    Set< CompletableFuture<?> > tasks = new HashSet<>();


    for (int i = 0; i < 1000; i++) {

        final int id = i;

        CompletableFuture<?> c = CompletableFuture


        .runAsync( () -> {

            System.out.println("Running: " + id); 

            if ( id == 400 ) throw new RuntimeException("Exception from: " + id);

        }, executionService )


        .whenComplete( (v, ex) -> { 

            if ( ex != null ) {

                System.out.println("Shutting down.");

                executionService.shutdownNow();

                System.out.println("shutdown.");

            }

        } );


        tasks.add(c);

    }


    try{ 

        CompletableFuture.allOf( tasks.stream().toArray(CompletableFuture[]::new) ).join(); 

    }catch(Exception e) { 

        System.out.println("Got async exception: " + e); 

    }finally { 

        System.out.println("DONE"); 

    }        

}

最后的打印输出是这样的:


Running: 402

Running: 400

Running: 408

Running: 407

Running: 406

Running: 405

Running: 411

Shutting down.

Running: 410

Running: 409

Running: 413

Running: 412

shutdown.

尝试shutdownNow在单独的线程上运行方法,但在大多数情况下,它仍然会出现相同的死锁。


知道什么可能导致这种僵局吗?


您认为在CompletableFuture抛出异常时取消所有已安排但尚未运行的s的最佳方法是什么?


正在考虑迭代tasks并调用cancel每个CompletableFuture. 但我不喜欢这个的是CancellationException从join.


狐的传说
浏览 195回答 2
2回答

蓝山帝景

另一个仅依赖的解决方案CompletableFuture是使用“取消者”未来,这将导致所有未完成的任务在完成时被取消:Set<CompletableFuture<?>> tasks = ConcurrentHashMap.newKeySet();CompletableFuture<Void> canceller = new CompletableFuture<>();for(int i = 0; i < 1000; i++) {&nbsp; &nbsp; if (canceller.isDone()) {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Canceller invoked, not creating other futures.");&nbsp; &nbsp; &nbsp; &nbsp; break;&nbsp; &nbsp; }&nbsp; &nbsp; //LockSupport.parkNanos(10);&nbsp; &nbsp; final int id = i;&nbsp; &nbsp; CompletableFuture<?> c = CompletableFuture&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .runAsync(() -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //LockSupport.parkNanos(1000);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Running: " + id);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(id == 400) throw new RuntimeException("Exception from: " + id);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }, executionService);&nbsp; &nbsp; c.whenComplete((v, ex) -> {&nbsp; &nbsp; &nbsp; &nbsp; if(ex != null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; canceller.complete(null);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; });&nbsp; &nbsp; tasks.add(c);}canceller.thenRun(() -> {&nbsp; &nbsp; System.out.println("Cancelling all tasks.");&nbsp; &nbsp; tasks.forEach(t -> t.cancel(false));&nbsp; &nbsp; System.out.println("Finished cancelling tasks.");});
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java