NFS(Netapp 服务器)-> Flink -> s3

我是 flink (java) 的新手,并尝试将作为文件路径安装的 netapp 文件服务器上的 xml 文件移动到安装了 flink 的服务器上。

如何实时进行批处理或流处理以获取进入文件夹的文件并使用 s3 接收它。

我在 flink-starter 中找不到任何从本地文件系统读取文件的示例,flink 至少是这个用例的正确选择吗?如果是这样,我在哪里可以找到资源来监听文件夹和管理检查点/保存点?


Helenr
浏览 151回答 2
2回答

料青山看我应如是

如果您的目标只是将文件复制到 s3,那么有更简单、更合适的工具。也许同步是合适的。假设使用 Flink 有意义(例如,因为您想要对数据执行一些有状态转换),则所有任务管理器(工作人员)都可以使用相同的 URI 访问要处理的文件。为此,您可以使用 file:// URI。您可以执行以下操作来监视目录并在新文件出现时摄取它们:StreamExecutionEnvironment env =      StreamExecutionEnvironment.getExecutionEnvironment();// monitor directory, checking for new files// every 100 millisecondsTextInputFormat format = new TextInputFormat(  new org.apache.flink.core.fs.Path("file:///tmp/dir/"));DataStream<String> inputStream = env.readFile(  format,   "file:///tmp/dir/",  FileProcessingMode.PROCESS_CONTINUOUSLY,   100,   FilePathFilter.createDefaultFilter());请注意文档中的此警告:如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,则修改文件时,将完全重新处理其内容。这可能会破坏“仅一次”语义,因为在文件末尾附加数据将导致其所有内容被重新处理。这意味着您应该自动将准备好摄取的文件移动到正在监视的文件夹中。您可以使用流文件接收器写入S3。Flink 的写入操作(例如 )writeUsingOutputFormat()不参与检查点,因此在这种情况下这不是一个好的选择。

烙印99

此问题的完整工作代码位于以下链接中。您需要启用检查点以将 .inprogress 文件移动到实际文件// 每 1000 毫秒启动一个检查点 env.enableCheckpointing(1000);StreamingFileSink 未将数据提取到 s3
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java