猿问

RxJava 基于回调向流添加事件

添加一些代码来解决问题


//generates a sequence in the range from input value (+1) to input value (+9)

Observable<ColoredIntegerModel> getSequenceObservable(int value, int delay, int color) {

        return Observable.range(value+1,9)

                .map(i -> {

                    Log.d(TAG, "Value " + i

                            + " evaluating on " + Thread.currentThread().getName()

                            + " emitting item at " + System.currentTimeMillis());

                    try {

                        Thread.sleep(delay);

                    } catch (InterruptedException e) {


                    }

                    return new ColoredIntegerModel(i, color);

                });

    }


//creates a stream if say input =2 of numbers from 1-20 (input*2) such that the output is 1 (Red color) 2-10(green color) 11 (Red color) 11-20 (Green Color) 

    Observable<ColoredIntegerModel> getEventStream(int value) {

        return Observable.create(new ObservableOnSubscribe<ColoredIntegerModel>() {

            @Override

            public void subscribe(ObservableEmitter<ColoredIntegerModel> emitter) throws Exception {

                for (int i = 0; i < value; ++i) {

                    ColoredIntegerModel model = new ColoredIntegerModel(i*10, Color.RED);

                    emitter.onNext(model);

                    Observable<ColoredIntegerModel> more = getSequenceObservable(i*10, 100, Color.GREEN);

                    more.subscribe(new Consumer<ColoredIntegerModel>() {

                        @Override

                        public void accept(ColoredIntegerModel coloredIntegerModel) throws Exception {

                            emitter.onNext(coloredIntegerModel);

                        }

                    });

                }

            }

        });

    }

上面的代码有效。它打印 1(Red) 2-10(Green) 11(Red), 12-20,但我想要一个更干净的解决方案。我也不确定何时可以处理 getEventStream() 中的内部订阅。


问题基本上是 getEventStream 为每个发射调用一个函数,该函数也返回一个 Observable 。这类似于一个 Promise 链,其中每个单独的 Promise 可以返回一系列其他 Promise。希望这可以澄清对原始问题的任何混淆。


互换的青春
浏览 145回答 2
2回答
随时随地看视频慕课网APP

相关分类

Java
我要回答