猿问

合并热通量源

在Spring Boot 2 with Reactor中,我试图合并两个热源。但是,似乎唯一一个报告了 中的两个参数中的第一个。我如何识别第二个.FluxmergeFluxmergemergeFlux


在下面的示例中,in 甚至不会打印 when 是第一个参数。如果我做第一个,那么不打印。System.errB-2outgoing1aoutgoing2A-2


以下是完整的示例;


package com.example.demo;


import java.time.Duration;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

import reactor.core.publisher.Flux;

import reactor.core.scheduler.Schedulers;


public class Weather {

String city;

Integer temperature;


public Weather(String city, Integer temperature) {

    this.city = city;

    this.temperature = temperature;

}


@Override

public String toString() {

    return "Weather [city=" + city + ", temperature=" + temperature + "]";

}


public static void main(String[] args) {


    BlockingQueue<Weather> queue = new LinkedBlockingQueue<>();

    BlockingQueue<Weather> queue2 = new LinkedBlockingQueue<>();


    // Assume Spring @Repository "A-1"

    new Thread(() -> {

        for (int d = 1; d < 1000; d += 1) {

            for (String s: new String[] {"LDN", "NYC", "PAR", "ZUR"}) {

                queue.add(new Weather(s, d));

                try { Thread.sleep(250); } catch (InterruptedException e) {}

            }

        }

    }).start(); 


    // Assume Spring @Repository "B-1"

    new Thread(() -> {

        for (int d = 1; d < 1000; d += 1) {

            for (String s: new String[] {"MOS", "TLV"}) {

                queue2.add(new Weather(s, d));

                try { Thread.sleep(1000); } catch (InterruptedException e) {}

            }

        }

    }).start();


    // Assume Spring @Service "A-2" = real-time LDN, NYC, PAR, ZUR

    Flux<Weather> outgoing1 = Flux.<Weather>create(

        sink -> {

            for (int i = 0; i < 1000; i++) {

                try {

                    sink.next(queue.take());

                    System.err.println("1 " + queue.size());

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

            sink.complete();

        }

    )

湖上湖
浏览 98回答 1
1回答

阿晨1998

这里有一些事情在起作用。请注意&nbsp;.merge&nbsp;运算符的以下建议...请注意,合并是为使用异步源或有限源而定制的。当处理尚未在专用计划程序上发布的无限源时,您必须将该源隔离在其自己的计划程序中,否则合并会尝试在订阅另一个源之前将其排出。您的出站助焊剂使用 ,但这只影响在运算符之后链接的运算符。即,它不会影响之前的任何内容。具体来说,它不会影响 lambda 中的代码传递到执行的线程。如果您在每个出站通量之前添加,您可以看到这一点。.publishOn.publishOn.publishOnFlux.create.log().publishOn您的 lambda 已传递给调用阻塞方法 ()。Flux.createqueue.take由于您在线程中调用合并的 Flux,因此您的 lambda 将传递给线程中的执行,并阻止它。subscribe(...)mainFlux.createmain最简单的解决方法是使用而不是使 lambda 中的代码传递到不同的线程(不是 )上运行。这将防止线程阻塞,并允许来自两个出站流的合并输出交错。.subscribeOn.publishOnFlux.createmainmain
随时随地看视频慕课网APP

相关分类

Java
我要回答