我有一个基于 Kafka Streams 的简单 Spring 应用程序,它使用来自传入主题的消息,进行map转换并打印此消息。KStream像这样配置
@Bean
public KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,
PrintAction printAction, String topicName) {
KStream<String, JsonNode> source = builder.stream(topicName,
Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));
// @formatter:off
source
.map(myTransformer)
.foreach(printAction);
// @formatter:on
return source;
}
在内部MyTransformer,我正在调用此时可能会关闭的外部微服务。如果调用失败(通常是抛出错误RuntimeException),我就无法进行转换。
这里的问题是,如果在之前的处理过程中发生任何错误,有什么方法可以再次重新处理 Streams 应用程序中的消息?
根据我目前的研究,这里没有办法这样做,我唯一的可能是将消息推送到死信主题中,并在将来再次失败时尝试处理它我再次将其推送到 DLT 中并以这种方式重试.
holdtom
米琪卡哇伊
随时随地看视频慕课网APP
相关分类