在 Kafka Stream 中处理消息时发生错误时重新处理消息

我有一个基于 Kafka Streams 的简单 Spring 应用程序,它使用来自传入主题的消息,进行map转换并打印此消息。KStream像这样配置


@Bean

public KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,

         PrintAction printAction, String topicName) {

    KStream<String, JsonNode> source = builder.stream(topicName,

                Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));

    // @formatter:off

    source

        .map(myTransformer)

        .foreach(printAction);

    // @formatter:on

    return source;

}

在内部MyTransformer,我正在调用此时可能会关闭的外部微服务。如果调用失败(通常是抛出错误RuntimeException),我就无法进行转换。


这里的问题是,如果在之前的处理过程中发生任何错误,有什么方法可以再次重新处理 Streams 应用程序中的消息?


根据我目前的研究,这里没有办法这样做,我唯一的可能是将消息推送到死信主题中,并在将来再次失败时尝试处理它我再次将其推送到 DLT 中并以这种方式重试.


holdtom
浏览 162回答 1
1回答

米琪卡哇伊

如果在 Kafka Streams 处理期间发生任何未捕获的异常,您的流将状态更改为 ERROR 并停止使用发生错误的分区的传入消息。您需要自己捕获异常。重试可以通过以下两种方式实现:1)使用 SpringRetryTemplate调用外部微服务(但请记住,您将延迟使用来自特定分区的消息),或 2)将失败的消息推送到另一个主题以供以后重新处理(如您所建议的)更新时间kafka-streams 2.8.0因为,您可以使用方法自动替换kafka-streams 2.8.0失败的流线程(由未捕获的异常引起)。有关详细信息,请查看Kafka Streams 特定的未捕获异常处理程序KafkaStreamsvoid setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);StreamThreadExceptionResponse.REPLACE_THREADkafkaStreams.setUncaughtExceptionHandler(ex -> {&nbsp; &nbsp; log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);&nbsp; &nbsp; return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;});
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java