我正在启动一个使用 kafka 消息的应用程序。
为了捕捉反序列化异常,我遵循了关于反序列化错误处理的Spring 文档。我试过 failedDeserializationFunction 方法。
这是我的消费者配置类
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
/* Error Handling */
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);
return consumerProps;
}
@Bean
public ConsumerFactory<String, NTCMessageBody> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(NTCMessageBody.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
这是 BiFunction 提供者
public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> {
@Override
public NTCMessageBody apply(byte[] t, Headers u) {
return new NTCBadMessageBody(t);
}
}
当我仅发送一条有关该主题的损坏消息时,我收到了此错误(循环中):
org.apache.kafka.common.errors.SerializationException:反序列化键/值时出错
我知道 ErrorHandlingDeserializer2 应该委托 NTCBadMessageBody 类型并继续消费。我还看到(在调试模式下)它从未进入 NTCBadMessageBody 类的构造函数中。
qq_遁去的一_1
青春有我
慕桂英546537
Smart猫小萌
相关分类