使用 Reactor 和 RabbitMQ 进行反应式编程

最近我写了一个演示程序来启动反应式编程,结合 Reactor 和 RabbitMQ。这是我的演示代码:


public class FluxWithRabbitMQDemo {


private static final String QUEUE = "demo_thong";


private final reactor.rabbitmq.Sender sender;

private final Receiver receiver;


public FluxWithRabbitMQDemo() {

    this.sender = ReactorRabbitMq.createSender();

    this.receiver = ReactorRabbitMq.createReceiver();

}


public void run(int count) {

    ConnectionFactory connectionFactory = new ConnectionFactory();

    connectionFactory.useNio();

    SenderOptions senderOptions =  new SenderOptions()

            .connectionFactory(connectionFactory)

            .resourceCreationScheduler(Schedulers.elastic());


    reactor.rabbitmq.Sender sender = ReactorRabbitMq.createSender(senderOptions);


    Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE));

    Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE);

    queueDeclaration.thenMany(messages).subscribe(m->System.out.println("Get message "+ new String(m.getBody())));




    Flux<OutboundMessageResult> dataStream = sender.sendWithPublishConfirms(Flux.range(1, count)

            .filter(m -> !m.equals(10))

            .parallel()

            .runOn(Schedulers.parallel())

            .doOnNext(i->System.out.println("Message  " + i + " run on thread "+Thread.currentThread().getId()))

            .map(i -> new OutboundMessage("", QUEUE, ("Message " + i).getBytes())));


    sender.declareQueue(QueueSpecification.queue(QUEUE))

            .thenMany(dataStream)

            .doOnError(e -> System.out.println("Send failed"+ e))

            .subscribe(m->{

                if (m!= null){

                    System.out.println("Sent successfully message "+new String(m.getOutboundMessage().getBody()));

                }

            });


    try {

        Thread.sleep(20000);

    } catch (InterruptedException e) {

        e.printStackTrace();

    }


}


我希望在 Flux 发出一个项目后,Sender 必须将它发送到 RabbitMQ,并且在接收到 RabbitMQ 之后,Receiver 必须接收它。但一切都是按顺序发生的,这就是我得到的结果


摇曳的蔷薇
浏览 445回答 1
1回答

慕盖茨4494581

消息生成太快。要查看交错,请dataStream添加.doOnNext(i->Thread.sleep(10))
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java