我无法升级到 Spring 5,所以我坚持使用 spring-kafka 1.3 及其有限的错误处理。所以我无法访问 spring-kafka 2 中的 ConsumerAwareErrorHandler 或 SeekToCurrent 错误处理程序。
我正在使用@KafkaListener
-annotated 方法来收听一个主题,我已将其配置io.confluent.kafka.serializers.KafkaAvroDeserializer
为我的值反序列化器。
问题是,如果我在主题中得到一条不是 Avro 格式的消息,那么 KafkaMessageListenerContainer 轮询循环就会卡住。反序列化程序会在消息上引发异常,并且轮询循环永远不会越过它,因此下次通过循环时,它会尝试反序列化相同的消息并继续循环,每秒将相同的错误转储到我的日志中数千次。
似乎没有办法获得 NeverRetryPolicy 或边缘方面的任何东西,但我可以factory.getContainerProperties().setErrorHandler()
。不幸的是,我不确定我能从那里做什么。
有什么东西可以自动连接到我的错误处理程序中,我可以用它来寻找错误时的前向 1 个偏移量吗?不确定那是什么,文档并没有过多地谈论您可以使用 ErrorHandler 实际做什么,我能找到的大多数示例都是针对 spring-kafka 2.X 的。就像,它不会反序列化,我对消息无能为力,它永远不会起作用,我想避免再次重试它,而且似乎大多数 Stackoverflow 问题都是关于做相反的事情。
我还看到有些人只是将 Avro 的 Deserializer 包装在他们自己的类中,该类会吃掉异常并返回 null。这是一个更好的计划吗?
幕布斯7119047
相关分类