猿问

使用 Spring Boot 从 Kafka 队列消费时获取序列化异常

我正在使用带有 Kafka 的 Spring Boot 来使用来自 Kafka 队列的消息。


但是,如果 Jackson 解析 payload 有任何错误。


该消息仍然卡住,并不断重新尝试消费,始终给出解析异常。


我尝试在 Kafka 配置中使用 ErrorHandlingDeserializer2 并映射错误处理程序,但问题仍然存在。


@Configuration

@EnableKafka

public class KafkaConfig 

{


    @Value("${spring.kafka.bootstrap-servers}")

    private String bootstrapServers;


    @Bean

    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);


        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);

        props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, StringDeserializer.class);


        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);

        props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);


        props.put(ConsumerConfig.GROUP_ID_CONFIG, "${connecto.group-id}");

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);


        return props;

    }


    @Bean

    public ConsumerFactory<String, UserDto> consumerFactory() {

        return new DefaultKafkaConsumerFactory<>(

                consumerConfigs(),

                new StringDeserializer(),

                new JsonDeserializer<>(UserDto.class));

    }


    @Bean

    public ConcurrentKafkaListenerContainerFactory<String, UserDto> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, UserDto> factory =

                new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());


        factory.setErrorHandler(new MosaicKafkaErrorHandler());


        return factory;

    }


    @Bean

    public KafkaTemplate kafkaTemplate() 

    {

        return new KafkaTemplate<>(producerFactory());

    }

}



米琪卡哇伊
浏览 246回答 1
1回答

开满天机

为了解决这个问题,2.2 版引入了ErrorHandlingDeserializer2. 此反序列化器委托给真正的反序列化器(键或值)。如果委托未能反序列化记录内容,则ErrorHandlingDeserializer2返回一个空值和一个DeserializationException包含原因和原始字节的标头。当您使用记录级别MessageListener时,如果 ConsumerRecord 包含DeserializationException键或值的标头,则会使用失败的 ConsumerRecord 调用容器的 ErrorHandler。记录不会传递给侦听器。但是,由于您使用的是早期版本,因此可以使用以下 hack。&nbsp; &nbsp; &nbsp;@Override&nbsp; &nbsp; &nbsp; &nbsp; public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; MessageListenerContainer container) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; thrownException.printStackTrace();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (thrownException instanceOf SerializationException){&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String topics = s.split("-")[0];&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int offset = Integer.valueOf(s.split("offset ")[1]);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TopicPartition topicPartition = new TopicPartition(topics, partition);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; consumer.seek(topicPartition, offset + 1);&nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
随时随地看视频慕课网APP

相关分类

Java
我要回答