我正在使用带有 Kafka 的 Spring Boot 来使用来自 Kafka 队列的消息。
但是,如果 Jackson 解析 payload 有任何错误。
该消息仍然卡住,并不断重新尝试消费,始终给出解析异常。
我尝试在 Kafka 配置中使用 ErrorHandlingDeserializer2 并映射错误处理程序,但问题仍然存在。
@Configuration
@EnableKafka
public class KafkaConfig
{
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "${connecto.group-id}");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}
@Bean
public ConsumerFactory<String, UserDto> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(UserDto.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserDto> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, UserDto> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setErrorHandler(new MosaicKafkaErrorHandler());
return factory;
}
@Bean
public KafkaTemplate kafkaTemplate()
{
return new KafkaTemplate<>(producerFactory());
}
}
开满天机
相关分类