猿问

如何从 aysnc 回调创建多个 Flux

从 Reactor 的参考指南中,我了解到Flux.create()可用于将 aysnc 回调转换为Flux.


但是,有时回调有多种方法来接收多种类型的数据,假设我有如下代码:


asrService.recognize(new Callback() {

    @Override

    public void stateChange(State state) {

        // consume state

    }


    @Override

    public void onResultData(Result result) {

        // consume result

    }

});

如何将其转换为两个反应流:Flux<State>和Flux<Result>?


一只名叫tom的猫
浏览 89回答 2
2回答

暮色呼如

一种方法是使用某些处理器,如 DirectProcessor,您可以创建 2 个不同的处理器,并在事件向处理器发出项目并订阅处理器时,但如果您仍想使用 Flux.create,您可以这样做&nbsp; &nbsp; Flux<Object> objectFlux;@Overridepublic void run(String... args) throws Exception {&nbsp; &nbsp; objectFlux = Flux.create(objectFluxSink ->&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; asrService.recognize(new Callback() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void stateChange(State state) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; objectFluxSink.next(state);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void onResultData(Result result) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; objectFluxSink.next(state);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }));}public Flux<Result> getResult(){&nbsp;return&nbsp; &nbsp; objectFlux.filter(o -> o instanceof Result)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(o -> ((Result)o));}public Flux<State> geState(){&nbsp; &nbsp; return&nbsp; &nbsp; objectFlux.filter(o -> o instanceof State)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(o -> ((State)o));}我仍然认为使用处理器应该更清洁,你不需要做那个过滤和铸造,但你需要有 2 个这样的处理器:&nbsp; &nbsp; &nbsp; &nbsp; DirectProcessor <Result> resultDirectProcessor = DirectProcessor.create();&nbsp; &nbsp; DirectProcessor<State> stateDirectProcessor = DirectProcessor.create();&nbsp; &nbsp; asrService.recognize(new Callback() {&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public void stateChange(State state) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; stateDirectProcessor.onNext(state);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public void onResultData(Result result) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; resultDirectProcessor.onNext(result);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; });

ABOUTYOU

只是可用于给定任务的小片段。Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();Sinks.EmitResult result = sink.tryEmitNext("some string");
随时随地看视频慕课网APP

相关分类

Java
我要回答