Akka Streams onFailuresWithBackoff 未重新启动流程

如果在阶段期间发生任何故障,我正在尝试RestartFlow在 Akka Streams javadsl 中使用来重新启动我的流程阶段,但它似乎并没有重新启动流程,而只是丢弃了消息。


我已经看到了:Akka Streams 中的 RestartFlow not working as expected,但我使用的是 2.5.19 版本,所以应该修复它?


我都试过了RestartFlow.onFailuresWithBackoff,RestartFlow.withBackoff但都没有奏效。我也尝试过使用整个 Actor 系统主管策略,但这似乎只是拦截了异常,因此它不会从流程中抛出,而且似乎没有提供我想要的退避和最大重试策略。


流:


public Consumer.DrainingControl<Done> stream() {

    return Consumer.committableSource(consumerSettings,

        Subscriptions.topics(config.getString(ConfigKeys.KAFKA_CONFIG_PREFIX +

            ConfigKeys.CONSUMER_TOPIC)))

        .via(RestartFlow.onFailuresWithBackoff(

                Duration.ofSeconds(1), // min backoff

                Duration.ofSeconds(2), // max backoff,

                0.2, // adds 20% "noise" to vary the intervals slightly

                10, // limits the amount of restarts to 10

                this::dispatchMessageFlow))

        .via(Committer.flow(CommitterSettings.create(system)))

        .toMat(Sink.ignore(), Keep.both())

        .mapMaterializedValue(Consumer::createDrainingControl)

        .run(mat);

}


我看到过一次异常,akka 表示它将由于失败而重新启动图表,但在那之后就没有别的了。根据我的理解,我应该再看 10 次。消费者继续收听新消息,因此看起来消息刚刚被丢弃。

如果有人可以帮助我指出正确的方向,我将不胜感激。


眼眸繁星
浏览 145回答 1
1回答

萧十郎

它的工作方式有点不同。长话短说 - 如果发生错误,消息将被丢弃,但源/流将重新启动,而不会杀死整个流。它在RestartFlow.onFailuresWithBackoff 文档中有所描述:重新启动过程本质上是有损的,因为取消和发送消息之间没有协调。来自包装流任一端的终止信号将导致另一端终止,并且任何传输中的消息都将丢失。在退避期间,此 Flow 将背压。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java