我正在构建一个经典的制作人 -> rabbitmq -> 消费者流程。所有 3 个节点都在单独的 jvm 甚至单独的主机上运行
Producer 是一个 spring boot 命令行运行器应用程序,预计在完成生产后停止。
消费者应用程序是一个 spring boot web 应用程序,它监听 3 个 rabbitmq 队列(2 个持久队列绑定到直接交换,1 个非持久队列绑定到扇出交换)
我的启动顺序如下: - 启动 rabbitmq - 启动消费者 - 启动生产者
生产者和消费者 amqp 依赖mvn dependency:tree
[INFO] | +- org.springframework.boot:spring-boot-starter-amqp:jar:2.1.6.RELEASE:compile
[INFO] | | +- org.springframework:spring-messaging:jar:5.1.8.RELEASE:compile
[INFO] | | \- org.springframework.amqp:spring-rabbit:jar:2.1.7.RELEASE:compile
[INFO] | | +- org.springframework.amqp:spring-amqp:jar:2.1.7.RELEASE:compile
[INFO] | | | \- org.springframework.retry:spring-retry:jar:1.2.4.RELEASE:compile
[INFO] | | +- com.rabbitmq:amqp-client:jar:5.4.3:compile
[INFO] | | \- org.springframework:spring-tx:jar:5.1.8.RELEASE:compile
生产者代码
/**
* @author louis.gueye@gmail.com
*/
@RequiredArgsConstructor
@Slf4j
public class PlatformBrokerExampleProducerJob implements CommandLineRunner {
private final AmqpTemplate template;
@Override
public void run(String... args) {
final Instant now = Instant.now();
final Instant anHourAgo = now.minus(Duration.ofHours(1));
final String directExchangeName = "careassist_queues";
final String fanoutExchangeName = "careassist_schedules_topics";
IntStream.range(0, 60).boxed().forEach(i -> {
final SensorEventDto event = SensorEventDto.builder() //
.id(UUID.randomUUID().toString()) //
.businessId("sens-q7ikjxk1ftik") //
.timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
.state(SensorState.on) //
.build();
final String routingKey = "care.events";
template.convertAndSend(directExchangeName, routingKey, event);
log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), directExchangeName, routingKey);
});
神不在的星期二
大话西游666
相关分类