我正在更新模式下运行 Spark 结构化流作业,并且无法确定是否可以获取每个更新的批次 ID。例如,当您以更新模式输出到控制台时,Spark 将在输出时显示每个批次编号:
-------------------------------------------
Batch: 0
-------------------------------------------
...
-------------------------------------------
Batch: 1
-------------------------------------------
...
等等。我需要将相同的信息添加到发送到 Kafka 的每条消息中。为此,我只能使用 Spark 2.3,因此我无法使用 forEachBatch。
我的工作输出一组特定维度的聚合指标。每个触发器,自上次触发器以来指标可能已更新 - 具有更新指标的维度将在下一批中输出,因为我正在更新模式下运行。当我将这些输出到 Kafka 时,我需要知道哪个批次是最新的 - 因此需要批次号。我认为 forEachBatch 可以满足我的需要,但不幸的是我无法访问 Spark 2.4。我可以使用 forEach 来完成这个任务吗?我仅限于使用更新模式,因为后期事件可能会出现并更新之前已输出的指标。
这是我用来测试的控制台模式。此输出分别显示每个批次及其编号:
StreamingQuery query = logs.writeStream()
.format("console")
.outputMode(OutputMode.Update())
.start();
我想做这样的事情
StreamingQuery query = agg.WriteStream()
.format("kafka")
.outputMode(OutputMode.Update())
.option("kafka.bootstrap.servers", "myconnection")
.Option("topic", "mytopic")
.Start();
但仍然保留在mytopic中判断消息来自哪个批次的能力。这可能吗?
12345678_0001
相关分类