最近我写了一个演示程序来启动反应式编程,结合 Reactor 和 RabbitMQ。这是我的演示代码:
public class FluxWithRabbitMQDemo {
private static final String QUEUE = "demo_thong";
private final reactor.rabbitmq.Sender sender;
private final Receiver receiver;
public FluxWithRabbitMQDemo() {
this.sender = ReactorRabbitMq.createSender();
this.receiver = ReactorRabbitMq.createReceiver();
}
public void run(int count) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
SenderOptions senderOptions = new SenderOptions()
.connectionFactory(connectionFactory)
.resourceCreationScheduler(Schedulers.elastic());
reactor.rabbitmq.Sender sender = ReactorRabbitMq.createSender(senderOptions);
Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE));
Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE);
queueDeclaration.thenMany(messages).subscribe(m->System.out.println("Get message "+ new String(m.getBody())));
Flux<OutboundMessageResult> dataStream = sender.sendWithPublishConfirms(Flux.range(1, count)
.filter(m -> !m.equals(10))
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(i->System.out.println("Message " + i + " run on thread "+Thread.currentThread().getId()))
.map(i -> new OutboundMessage("", QUEUE, ("Message " + i).getBytes())));
sender.declareQueue(QueueSpecification.queue(QUEUE))
.thenMany(dataStream)
.doOnError(e -> System.out.println("Send failed"+ e))
.subscribe(m->{
if (m!= null){
System.out.println("Sent successfully message "+new String(m.getOutboundMessage().getBody()));
}
});
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
我希望在 Flux 发出一个项目后,Sender 必须将它发送到 RabbitMQ,并且在接收到 RabbitMQ 之后,Receiver 必须接收它。但一切都是按顺序发生的,这就是我得到的结果
慕盖茨4494581
相关分类