如何等待 JavaRx2 Flowable 完成所有任务?

我正在尝试学习 RxJava2 库的基础知识,现在我陷入了以下时刻:

我已经生成了myFlowablevia Flowable.generate(...),现在我需要等待所有任务完成执行,然后才能继续下一步。

这是展示问题的代码:


myFlowable.parallel()

            .runOn(Schedulers.computation())

            .map(val -> myCollection.add(val))

            .sequential()

            .subscribe(val -> {

                System.out.println("Thread from subscribe: " + Thread.currentThread().getName());

                System.out.println("Value from subscribe: " + val.toString());

            });

    System.out.println("Before sleep - Number of objects: " + myCollection.size());

    try {

        Thread.sleep(1000);

        System.out.println("After sleep - Number of objects: " + myCollection.size());

    } catch (InterruptedException e) {

        e.printStackTrace();

    }

我运行所有任务并将结果添加到集合中。如果我在 myFlowable 块之后立即检查集合大小,那么如果我在small之后检查它,情况将会有所不同Thread.sleep()。有什么方法可以检查所有任务是否已完成执行并且我们可以进一步进行?任何帮助或指导将不胜感激。


catspeake
浏览 159回答 3
3回答

拉风的咖菲猫

使用Flowable::blockingSubscribe()- 将当前 Flowable 运行到终端事件,忽略任何值并重新抛出任何异常。

守着星空守着你

由于 RxJava 是异步的,observable 下面的 java 代码将运行,而 observable 将在不同的线程中运行,这就是为什么如果您想在 Flowable 已完成发送数据时收到通知,您应该在 RxJava 流中执行此操作。为此,您有一个运算符 .doOnComplete 这里有一个示例如何检测流何时完成        Flowable.range(0, 100).parallel()            .runOn(Schedulers.computation())            .map(integer -> {                return integer;            })            .sequential()            .doOnComplete(() -> {                System.out.println("finished");            })            .subscribe(integer -> System.out.println(integer));

小唯快跑啊

您可以使用 AtomicBoolean,将其初始化为 false 并使用 将其设置为 true doFinally()。doFinally()在 Observable 发出 onError 或 onCompleted 信号后调用,或者被下游处理。然后让主线程休眠,直到completedvalue 为 true。使用你的例子:AtomicBoolean completed = new AtomicBoolean(false);myFlowable.parallel()            .runOn(Schedulers.computation())            .map(val -> myCollection.add(val))            .sequential()            .doFinally(() -> completed.set(true))            .subscribe(val -> {                ...            });    ...try {   while(!completed.get()){       Thread.sleep(1000);       ...   }  ...} catch (InterruptedException e) {  e.printStackTrace();}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java