我的任务是拆除一个 Dev 环境并从废料中重新设置它以验证我们的 CI-CD 流程;唯一的问题是我搞砸了创建一个主题,因此 Kafka Streams 应用程序退出并出现错误。
我深入研究并发现了问题并进行了纠正,但在深入研究时,我遇到了另一个奇怪的小问题。
我实现了一个意外异常处理程序,如下所示:
streams.setUncaughtExceptionHandler((t, e) -> {
logger.fatal("Caught unhandled Kafka Streams Exception:", e);
// Do some exception handling.
streams.close();
// Maybe do some more exception handling.
// Open a lock that is waiting after streams.start() call
// to let application exit normally
shutdownLatch.countDown();
});
问题是,如果应用程序在调用 KafkaStreams::close 时由于主题错误引发异常,则应用程序在尝试调用 KafkaStreams::waitOnState 后似乎在 WindowsSelectorImpl::poll 中死锁。
我想这可能是与调用KafkaStreams ::接近异常处理程序内部的问题,但我发现这个SO和评论马蒂亚斯J.萨克斯认为说应该是确定调用KafkaStreams ::关闭在异常处理程序与告诫不从多个线程调用 KafkaStreams::close。
问题是我想实现一个关闭钩子来在请求时优雅地终止 steams 应用程序,以及实现 UnexpectedException 处理程序以在发生异常时优雅地清理和终止。
我想出了以下解决方案,在调用 close 之前检查 KafkaStreams 状态,它确实有效,但似乎有点不确定,因为除了运行(可能是 Pending)之外,我还可以看到其他情况,我们希望确保 KafkaStreams::close它叫。
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.fatal("Caught Shutdown request");
// Do some shutdown cleanup.
if (streams.state().isRunning())
{
If this hook is called due to the Main exiting after handling
an exception we don't want to call close again. It doesn't
cause any errors but logs that the application was closed
a second time.
streams.close(100L, TimeUnit.MILLISECONDS);
}
// Maybe do a little bit more clean up before system exits.
System.exit(0);
}));
streams.setUncaughtExceptionHandler((t, e) -> {
logger.fatal("Caught unhandled Kafka Streams Exception:", e);
// Do some exception handling.
if (streams.state().isRunning())
{
streams.close(100L, TimeUnit.MILLISECONDS);
}
关于为什么在异常处理程序中调用 KafkaSteams:close 会导致此类麻烦的任何建议,或者如果有更好的方法同时实现关闭钩子和异常处理程序,将不胜感激?
相关分类