下面CompletableFuture例子中join的调用是否阻塞进程

我试图理解 CompletableFutures 和返回已完成期货的调用链,我创建了下面的示例,它模拟了对数据库的两次调用。

第一个方法应该是用 userIds 列表给出一个可完成的未来,然后我需要调用另一个方法提供 userId 来获取用户(在本例中是一个字符串)。

总结一下:
1. 获取 ID
2. 获取与这些 ID 对应的用户列表。

我创建了简单的方法来模拟休眠线程的响应。请检查下面的代码

public class PipelineOfTasksExample {


    private Map<Long, String> db = new HashMap<>();


    PipelineOfTasksExample() {

        db.put(1L, "user1");

        db.put(2L, "user2");

        db.put(3L, "user3");

        db.put(4L, "user4");

    }



    private CompletableFuture<List<Long>> returnUserIdsFromDb() {

        try {

            Thread.sleep(500);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        System.out.println("building the list of Ids" + " - thread: " + Thread.currentThread().getName());

        return CompletableFuture.supplyAsync(() -> Arrays.asList(1L, 2L, 3L, 4L));

    }


    private CompletableFuture<String> fetchById(Long id) {

        CompletableFuture<String> cfId = CompletableFuture.supplyAsync(() -> db.get(id));

        try {

            Thread.sleep(500);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        System.out.println("fetching id: " + id + " -> " + db.get(id) + " thread: " + Thread.currentThread().getName());

        return cfId;

    }


    public static void main(String[] args) {


        PipelineOfTasksExample example = new PipelineOfTasksExample();


        CompletableFuture<List<String>> result = example.returnUserIdsFromDb()

                .thenCompose(listOfIds ->

                        CompletableFuture.supplyAsync(

                                () -> listOfIds.parallelStream()

                                        .map(id -> example.fetchById(id).join())

                                        .collect(Collectors.toList()

                                        )

                        )

                );


        System.out.println(result.join());

    }


}


我的问题是,join call ( example.fetchById(id).join()) 是否破坏了进程的非阻塞性质。如果答案是肯定的,我该如何解决这个问题?


BIG阳
浏览 264回答 1
1回答

桃花长相依

你的例子有点奇怪,因为你returnUserIdsFromDb()在任何操作甚至开始之前就减慢了主线程的速度,同样,fetchById减慢了调用者而不是异步操作的速度,这违背了异步操作的全部目的。此外,.thenCompose(listOfIds -> CompletableFuture.supplyAsync(() -> …))您可以简单地使用.thenApplyAsync(listOfIds -> …).所以一个更好的例子可能是public class PipelineOfTasksExample {&nbsp; &nbsp; private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()&nbsp; &nbsp; &nbsp; &nbsp; .collect(Collectors.toMap(id -> id, id -> "user"+id));&nbsp; &nbsp; PipelineOfTasksExample() {}&nbsp; &nbsp; private static <T> T slowDown(String op, T result) {&nbsp; &nbsp; &nbsp; &nbsp; LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(op + " -> " + result + " thread: "&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; + Thread.currentThread().getName()+ ", "&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; + POOL.getPoolSize() + " threads");&nbsp; &nbsp; &nbsp; &nbsp; return result;&nbsp; &nbsp; }&nbsp; &nbsp; private CompletableFuture<List<Long>> returnUserIdsFromDb() {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("trigger building the list of Ids - thread: "&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; + Thread.currentThread().getName());&nbsp; &nbsp; &nbsp; &nbsp; return CompletableFuture.supplyAsync(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; () -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; POOL);&nbsp; &nbsp; }&nbsp; &nbsp; private CompletableFuture<String> fetchById(Long id) {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("trigger fetching id: " + id + " thread: "&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; + Thread.currentThread().getName());&nbsp; &nbsp; &nbsp; &nbsp; return CompletableFuture.supplyAsync(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; () -> slowDown("fetching id: " + id , db.get(id)), POOL);&nbsp; &nbsp; }&nbsp; &nbsp; static ForkJoinPool POOL = new ForkJoinPool(2);&nbsp; &nbsp; public static void main(String[] args) {&nbsp; &nbsp; &nbsp; &nbsp; PipelineOfTasksExample example = new PipelineOfTasksExample();&nbsp; &nbsp; &nbsp; &nbsp; CompletableFuture<List<String>> result = example.returnUserIdsFromDb()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .thenApplyAsync(listOfIds ->&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; listOfIds.parallelStream()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(id -> example.fetchById(id).join())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .collect(Collectors.toList()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; POOL&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; );&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(result.join());&nbsp; &nbsp; }}打印出类似的东西trigger building the list of Ids - thread: mainbuilding the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threadstrigger fetching id: 2 thread: ForkJoinPool-1-worker-0trigger fetching id: 3 thread: ForkJoinPool-1-worker-1trigger fetching id: 4 thread: ForkJoinPool-1-worker-2fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-3, 4 threadsfetching id: 2 -> user2 thread: ForkJoinPool-1-worker-3, 4 threadsfetching id: 3 -> user3 thread: ForkJoinPool-1-worker-2, 4 threadstrigger fetching id: 1 thread: ForkJoinPool-1-worker-3fetching id: 1 -> user1 thread: ForkJoinPool-1-worker-2, 4 threads[user1, user2, user3, user4]乍一看,这可能是一个惊人的线程数。答案是join()可能会阻塞线程,但是如果这种情况发生在Fork/Join池的工作线程内部,这种情况会被检测到并启动一个新的补偿线程,以确保配置的目标并行度。作为一种特殊情况,当使用默认的 Fork/Join 池时,实现可能会在方法内选择新的待处理任务join(),以确保同一线程内的进度。所以代码总是会取得进展,join()偶尔调用也没有错,如果替代方案要复杂得多,但如果过度使用,就会有资源消耗过多的危险。毕竟,之所以要使用线程池,就是为了限制线程的数量。另一种方法是尽可能使用链式依赖操作。public class PipelineOfTasksExample {&nbsp; &nbsp; private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()&nbsp; &nbsp; &nbsp; &nbsp; .collect(Collectors.toMap(id -> id, id -> "user"+id));&nbsp; &nbsp; PipelineOfTasksExample() {}&nbsp; &nbsp; private static <T> T slowDown(String op, T result) {&nbsp; &nbsp; &nbsp; &nbsp; LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(op + " -> " + result + " thread: "&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; + Thread.currentThread().getName()+ ", "&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; + POOL.getPoolSize() + " threads");&nbsp; &nbsp; &nbsp; &nbsp; return result;&nbsp; &nbsp; }&nbsp; &nbsp; private CompletableFuture<List<Long>> returnUserIdsFromDb() {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("trigger building the list of Ids - thread: "&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; + Thread.currentThread().getName());&nbsp; &nbsp; &nbsp; &nbsp; return CompletableFuture.supplyAsync(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; () -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; POOL);&nbsp; &nbsp; }&nbsp; &nbsp; private CompletableFuture<String> fetchById(Long id) {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("trigger fetching id: " + id + " thread: "&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; + Thread.currentThread().getName());&nbsp; &nbsp; &nbsp; &nbsp; return CompletableFuture.supplyAsync(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; () -> slowDown("fetching id: " + id , db.get(id)), POOL);&nbsp; &nbsp; }&nbsp; &nbsp; static ForkJoinPool POOL = new ForkJoinPool(2);&nbsp; &nbsp; public static void main(String[] args) {&nbsp; &nbsp; &nbsp; &nbsp; PipelineOfTasksExample example = new PipelineOfTasksExample();&nbsp; &nbsp; &nbsp; &nbsp; CompletableFuture<List<String>> result = example.returnUserIdsFromDb()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .thenComposeAsync(listOfIds -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; List<CompletableFuture<String>> jobs = listOfIds.parallelStream()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(id -> example.fetchById(id))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .collect(Collectors.toList());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return CompletableFuture.allOf(jobs.toArray(new CompletableFuture<?>[0]))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .thenApply(_void -> jobs.stream()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(CompletableFuture::join).collect(Collectors.toList()));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; POOL&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; );&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(result.join());&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(ForkJoinPool.commonPool().getPoolSize());&nbsp; &nbsp; }}不同之处在于,首先提交所有异步作业,然后join安排调用它们的依赖操作,仅在所有作业完成后执行,因此这些join调用永远不会阻塞。只有方法join末尾的最终调用才main可能阻塞主线程。所以这会打印出类似的东西trigger building the list of Ids - thread: mainbuilding the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threadstrigger fetching id: 3 thread: ForkJoinPool-1-worker-1trigger fetching id: 2 thread: ForkJoinPool-1-worker-0trigger fetching id: 4 thread: ForkJoinPool-1-worker-1trigger fetching id: 1 thread: ForkJoinPool-1-worker-0fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-1, 2 threadsfetching id: 3 -> user3 thread: ForkJoinPool-1-worker-0, 2 threadsfetching id: 2 -> user2 thread: ForkJoinPool-1-worker-1, 2 threadsfetching id: 1 -> user1 thread: ForkJoinPool-1-worker-0, 2 threads[user1, user2, user3, user4]显示无需创建补偿线程,因此线程数与配置的目标并行度相匹配。请注意,如果实际工作是在后台线程中而不是在fetchById方法本身中完成的,那么您现在不再需要并行流,因为没有阻塞join()调用。对于这种情况,仅使用stream()通常会带来更高的性能。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java