如果您的目标只是将文件复制到 s3,那么有更简单、更合适的工具。也许同步是合适的。假设使用 Flink 有意义(例如,因为您想要对数据执行一些有状态转换),则所有任务管理器(工作人员)都可以使用相同的 URI 访问要处理的文件。为此,您可以使用 file:// URI。您可以执行以下操作来监视目录并在新文件出现时摄取它们:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// monitor directory, checking for new files// every 100 millisecondsTextInputFormat format = new TextInputFormat( new org.apache.flink.core.fs.Path("file:///tmp/dir/"));DataStream<String> inputStream = env.readFile( format, "file:///tmp/dir/", FileProcessingMode.PROCESS_CONTINUOUSLY, 100, FilePathFilter.createDefaultFilter());请注意文档中的此警告:如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,则修改文件时,将完全重新处理其内容。这可能会破坏“仅一次”语义,因为在文件末尾附加数据将导致其所有内容被重新处理。这意味着您应该自动将准备好摄取的文件移动到正在监视的文件夹中。您可以使用流文件接收器写入S3。Flink 的写入操作(例如 )writeUsingOutputFormat()不参与检查点,因此在这种情况下这不是一个好的选择。