添加一些代码来解决问题
//generates a sequence in the range from input value (+1) to input value (+9)
Observable<ColoredIntegerModel> getSequenceObservable(int value, int delay, int color) {
return Observable.range(value+1,9)
.map(i -> {
Log.d(TAG, "Value " + i
+ " evaluating on " + Thread.currentThread().getName()
+ " emitting item at " + System.currentTimeMillis());
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
}
return new ColoredIntegerModel(i, color);
});
}
//creates a stream if say input =2 of numbers from 1-20 (input*2) such that the output is 1 (Red color) 2-10(green color) 11 (Red color) 11-20 (Green Color)
Observable<ColoredIntegerModel> getEventStream(int value) {
return Observable.create(new ObservableOnSubscribe<ColoredIntegerModel>() {
@Override
public void subscribe(ObservableEmitter<ColoredIntegerModel> emitter) throws Exception {
for (int i = 0; i < value; ++i) {
ColoredIntegerModel model = new ColoredIntegerModel(i*10, Color.RED);
emitter.onNext(model);
Observable<ColoredIntegerModel> more = getSequenceObservable(i*10, 100, Color.GREEN);
more.subscribe(new Consumer<ColoredIntegerModel>() {
@Override
public void accept(ColoredIntegerModel coloredIntegerModel) throws Exception {
emitter.onNext(coloredIntegerModel);
}
});
}
}
});
}
上面的代码有效。它打印 1(Red) 2-10(Green) 11(Red), 12-20,但我想要一个更干净的解决方案。我也不确定何时可以处理 getEventStream() 中的内部订阅。
问题基本上是 getEventStream 为每个发射调用一个函数,该函数也返回一个 Observable 。这类似于一个 Promise 链,其中每个单独的 Promise 可以返回一系列其他 Promise。希望这可以澄清对原始问题的任何混淆。
相关分类