使用 Flink 将数据集设置为 Kafka?是否可以

我有一个用例,我需要将记录从 hive 移动到 kafka。我找不到可以直接将 kafka sink 添加到 flink 数据集的方法。因此,我使用了一种解决方法,我在 flink 数据集上调用地图转换,并在地图函数内部对给定记录使用 kafkaProducer.send() 命令。

我面临的问题是我没有任何方法可以在每个工作节点上执行 kafkaProducer.flush(),因此用 kafka 写入的记录数总是比数据集中的记录数略少。

有没有一种优雅的方法来处理这个问题?有什么办法可以在 flink 中将 kafka sink 添加到数据集?或者调用 kafkaProducer.flush() 作为终结器的方法?


PIPIONE
浏览 141回答 1
1回答

梦里花落0921

您可以简单地创建一个Sink将KafkaProducer在后台使用并将数据写入 Kafka 的文件。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java