如果在阶段期间发生任何故障,我正在尝试RestartFlow在 Akka Streams javadsl 中使用来重新启动我的流程阶段,但它似乎并没有重新启动流程,而只是丢弃了消息。
我已经看到了:Akka Streams 中的 RestartFlow not working as expected,但我使用的是 2.5.19 版本,所以应该修复它?
我都试过了RestartFlow.onFailuresWithBackoff,RestartFlow.withBackoff但都没有奏效。我也尝试过使用整个 Actor 系统主管策略,但这似乎只是拦截了异常,因此它不会从流程中抛出,而且似乎没有提供我想要的退避和最大重试策略。
流:
public Consumer.DrainingControl<Done> stream() {
return Consumer.committableSource(consumerSettings,
Subscriptions.topics(config.getString(ConfigKeys.KAFKA_CONFIG_PREFIX +
ConfigKeys.CONSUMER_TOPIC)))
.via(RestartFlow.onFailuresWithBackoff(
Duration.ofSeconds(1), // min backoff
Duration.ofSeconds(2), // max backoff,
0.2, // adds 20% "noise" to vary the intervals slightly
10, // limits the amount of restarts to 10
this::dispatchMessageFlow))
.via(Committer.flow(CommitterSettings.create(system)))
.toMat(Sink.ignore(), Keep.both())
.mapMaterializedValue(Consumer::createDrainingControl)
.run(mat);
}
我看到过一次异常,akka 表示它将由于失败而重新启动图表,但在那之后就没有别的了。根据我的理解,我应该再看 10 次。消费者继续收听新消息,因此看起来消息刚刚被丢弃。
如果有人可以帮助我指出正确的方向,我将不胜感激。
萧十郎
相关分类