数据流中的动态目标问题

我有一个 Dataflow 作业,它从 pubsub 读取数据并根据时间和文件名将内容写入 GCS,其中文件夹路径基于 YYYY/MM/DD。这允许根据日期在文件夹中生成文件,并使用 apache beamFileIO和Dynamic Destinations.


大约两周前,我注意到未确认消息的异常堆积。重新启动 df 作业后,错误消失了,新文件正在 GCS 中写入。


几天后,写入再次停止,除了这一次,有错误声称处理被卡住了。经过一些可靠的 SO 研究后,我发现这可能是由于 Beam 2.90 之前的死锁问题造成的,因为它使用 Conscrypt 库作为默认安全提供程序。所以,我从 Beam 2.8 升级到 Beam 2.11。


再一次,它起作用了,直到它没有。我更仔细地查看了这个错误,并注意到它有一个 SimpleDateFormat 对象的问题,它不是线程安全的。因此,我转而使用线程安全的 Java.time 和 DateTimeFormatter。它一直有效,直到它没有。但是,这一次,错误略有不同,并没有指向我的代码中的任何内容:错误如下所示。


Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing in state process

  at sun.misc.Unsafe.park(Native Method)

  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)

  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)

  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)

  at org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:202)

  at org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:409)

  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:311)

  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$BagPagingIterable$1.computeNext(WindmillStateReader.java:700)

  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)

此错误在作业部署后大约 5 小时开始发生,并且随着时间的推移而增加。写作在 24 小时内显着减慢。我有 60 名工人,我怀疑每次出现错误时都会有一名工人失败,这最终会杀死工作。


繁星点点滴滴
浏览 89回答 2
2回答

慕田峪4524236

错误“处理卡住...”表示某些特定操作花费的时间超过 5m,而不是作业永久卡住。但是,由于步骤 FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles 是卡住并且作业被取消/终止的步骤,所以我会在作业写入临时文件时考虑一个问题。我发现了与用于写入临时文件的第二粒度时间戳 (yyyy-MM-dd_HH-mm-ss) 相关的BEAM-7689问题。发生这种情况是因为多个并发作业可以共享同一个临时目录,这可能导致其中一个作业在其他作业完成之前将其删除。根据之前的链接,为缓解此问题,请升级到 SDK 2.14。并让我们知道错误是否消失。

饮歌长啸

自从发布这个问题以来,我已经优化了数据流作业以避开瓶颈并增加并行化。就像 rsantiago 解释的那样,处理卡住不是错误,而只是数据流传达的一种方式,即一个步骤比其他步骤花费的时间要长得多,这本质上是一个瓶颈,无法用给定的资源清除。我所做的更改似乎已经解决了这些问题。新代码如下:public void streamData() {&nbsp; &nbsp; &nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Pipeline pipeline = Pipeline.create(options);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .apply(options.getWindowDuration() + " Window",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .triggering(AfterWatermark.pastEndOfWindow())&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .discardingFiredPanes()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .withAllowedLateness(parseDuration("24h")))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .apply(FileIO.<String,PubsubMessage>writeDynamic()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .by(new datePartition(options.getOutputFilenamePrefix()))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .via(Contextful.fn(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (SerializableFunction<PubsubMessage, String>) inputMsg -> new String(inputMsg.getPayload(), StandardCharsets.UTF_8)),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TextIO.sink())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .withDestinationCoder(StringUtf8Coder.of())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .to(options.getOutputDirectory())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .withNaming(type -> new CrowdStrikeFileNaming(type))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .withNumShards(options.getNumShards())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .withTempDirectory(options.getTempLocation()));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pipeline.run();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; catch(Exception e) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LOG.error("Unable to deploy pipeline");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LOG.error(e.toString(), e);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }最大的变化涉及删除 extractMsg() 函数并将分区更改为仅使用元数据。这两个步骤都强制对消息进行反序列化/重新序列化并严重影响性能。此外,由于我的数据集是无限的,我必须设置一个非零数量的分片。我想简化我的文件命名策略,所以我将它设置为 1 却不知道它对性能的影响有多大。从那时起,我为我的工作找到了工人/碎片/机器类型的良好平衡(不幸的是,主要基于猜测和检查)。尽管在足够大的数据负载下仍然可能观察到瓶颈,但尽管负载很重(每天 3-5 tb),管道仍然表现良好。这些更改还显着改善了自动缩放,但我不知道为什么。数据流作业现在对峰值和谷值的反应要快得多。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java