我对 RxJava2 有愚蠢的问题。
我需要同时运行两个长时间的操作。我知道我应该使用 Observable.zip() 并且我使用了它。
问题是我的长期操作一个接一个地运行,另一个问题是我的长期操作在我订阅它们之前就开始了。
假设这是我应该异步运行的长期操作。
private String doSomethingLong() {
Random rand = new Random();
int value = rand.nextInt(5);
Timber.i("Do something for [%d] sec [%s]", value, Thread.currentThread().getName());
try {
Thread.sleep(value * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
return String.format(Locale.getDefault(), "Exception [%s]", e.getMessage());
}
return String.format(Locale.getDefault(),"Job for [%d] seconds", value);
}
并让有一个像 test() 这样的方法来尝试使其并行:
public void test() {
final long started = System.currentTimeMillis();
Observable<String> just1 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
Observable<String> just2 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
Observable.zip(just1, just2, new Func2<String, String, Combined>() {
@Override
public Combined call(String s, String s2) {
return new Combined(s, s2);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Combined>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Combined combined) {
long total = System.currentTimeMillis() - started;
Timber.i("TOTAL [%d]ms [%s]", total, combined.toString());
}
});
}
当我尝试运行它时,我观察到两个可观察对象 just1 和 just2 一个接一个地运行……这让我很困惑……
但是还有另一个工作人员让我更加困惑......我评论了 Observable.zip 并注意到 just1 和 just2 在我订阅它们之前启动了方法 doSomethingLong() ......
汪汪一只猫
PIPIONE
相关分类