我正在编写一个消费者,它会收听 Kafka 主题并在消息可用时使用消息。我通过在本地运行 Kafka 测试了逻辑/代码,它运行良好。
在编写单元/组件测试用例时,它因 avro 架构注册表 url 错误而失败。我尝试了互联网上可用的不同选项,但找不到任何有效的方法。我不确定我的方法是否正确。请帮忙。
监听类
@KafkaListener(topics = "positionmgmt.v1", containerFactory = "genericKafkaListenerFactory")
public void receive(ConsumerRecord<String, GenericRecord> consumerRecord) {
try {
GenericRecord generic = consumerRecord.value();
Object obj = generic.get("metadata");
ObjectMapper mapper = new ObjectMapper();
Header headerMetaData = mapper.readValue(obj.toString(), Header.class);
System.out.println("Received payload : " + consumerRecord.value());
//Call backend with details in GenericRecord
}catch (Exception e){
System.out.println("Exception while reading message from Kafka " + e );
}
卡夫卡配置
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> genericKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(genericConsumerFactory());
return factory;
}
public ConsumerFactory<String, GenericRecord> genericConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
return new DefaultKafkaConsumerFactory<>(config);
}
泛舟湖上清波郎朗
慕神8447489
浮云间
慕田峪4524236
相关分类