我有一个 Spring Boot 应用程序,它有一个消费者从一个集群中的主题消费并产生到不同集群中的另一个主题。
现在我正在尝试使用 spring 嵌入式 Kafka 编写集成测试用例,但遇到了问题KafkaTemplate could not be registered. A bean with that name has already been defined in class path resource
消费类
@Service
public class KafkaConsumerService {
@Autowired
private KafkaProducerService kafkaProducerService;
@KafkaListener(topics = "${kafka.producer.topic}")
public void professor(List<Professor> pro) {
pro.forEach(kafkaProducerService::produce);
}
}
生产者类
@Service
public class KafkaProducerService {
@Value("${kafka.producer.topic}")
private String topic;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void produce(Professor pro) {
kafkaTemplate.send(topic,"professor",pro);
}
}
在我的测试用例中,我想覆盖KafkaTemplate,这样当我调用其中的kafkaConsumerService.professor方法时,Test它应该将数据生成到嵌入式 Kafka 中,并且我应该对其进行验证。
测试配置
@TestConfiguration
@EmbeddedKafka(partitions = 1, controlledShutdown = false,
brokerProperties = {"listeners=PLAINTEXT://localhost:3333", "port=3333"})
public class KafkaProducerConfigTest {
@Autowired
KafkaEmbedded kafkaEmbeded;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Before
public void setUp() throws Exception {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbeded.getPartitionsPerTopic());
}
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbeded));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
}
月关宝盒
相关分类