Observable.just(doSomeLongStuff()) 在我订阅 observable

我对 RxJava2 有愚蠢的问题。


我需要同时运行两个长时间的操作。我知道我应该使用 Observable.zip() 并且我使用了它。


问题是我的长期操作一个接一个地运行,另一个问题是我的长期操作在我订阅它们之前就开始了。


假设这是我应该异步运行的长期操作。


private String doSomethingLong() {

        Random rand = new Random();

        int value = rand.nextInt(5);

        Timber.i("Do something for [%d] sec [%s]", value, Thread.currentThread().getName());

        try {

            Thread.sleep(value * 1000);

        } catch (InterruptedException e) {

            e.printStackTrace();

            return String.format(Locale.getDefault(), "Exception [%s]", e.getMessage());

        }

        return String.format(Locale.getDefault(),"Job for [%d] seconds", value);

    }

并让有一个像 test() 这样的方法来尝试使其并行:


public void test() {


        final long started = System.currentTimeMillis();

        Observable<String> just1 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());

        Observable<String> just2 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());



        Observable.zip(just1, just2, new Func2<String, String, Combined>() {

            @Override

            public Combined call(String s, String s2) {

                return new Combined(s, s2);

            }

        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Combined>() {

            @Override

            public void onCompleted() {


            }


            @Override

            public void onError(Throwable e) {


            }


            @Override

            public void onNext(Combined combined) {

                long total = System.currentTimeMillis() - started;

                Timber.i("TOTAL [%d]ms [%s]", total, combined.toString());

            }

        });


    }

当我尝试运行它时,我观察到两个可观察对象 just1 和 just2 一个接一个地运行……这让我很困惑……


但是还有另一个工作人员让我更加困惑......我评论了 Observable.zip 并注意到 just1 和 just2 在我订阅它们之前启动了方法 doSomethingLong() ......


尚方宝剑之说
浏览 109回答 2
2回答

汪汪一只猫

Observable.just订阅时不运行任何内容。它会在您订阅时发出元素,但您doSomethingLong将在将其作为参数传递后立即运行。这很正常,这就是语言的工作方式。你正在寻找的是一种在我们订阅时返回它的方法,但也只在那个时候运行它,并希望在后台线程上运行。有几个答案,这里有一些:使用延迟有一个名为的运算符defer,它接受一个 lambda,它会在您订阅后执行:Observable.defer(() ->&nbsp; doSomethingLong())这只会doSomethingLong在您订阅时执行使用 fromCallable您可以从 lambda 创建一个可观察对象。这被称为fromCallable:Observable.fromCallable(() -> doSomethingLong())同样,这只会doSomethingLong在您订阅时运行使用创建我认为这可能是最令人气馁的做法,因为有几件事你必须处理,但我认为对于完整性来说,可以提一下:Observable.create( emitter -> {&nbsp; &nbsp; if(emitter.isDisposed()) return;&nbsp; &nbsp; emitter.onNext(doSomethingLong());&nbsp; &nbsp; emitter.onComplete();});同样,我确信有更多方法可以做到这一点。我只是想解释这个问题并提供一些选择。

PIPIONE

将您的 Observables 创建为 Observable.fromCallable{}。而不是 zip 使用 combineLatest()文档:&nbsp;http ://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#fromCallable-java.util.concurrent.Callable-&nbsp;http://reactivex.io/documentation/operators/combinelatest.html
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java