我正在使用 Spotify 的 Scio 库在 scala 中编写 apache beam 管道。我想在文件系统(可以是 hdfs、alluxio 或 GCS)上以递归方式搜索目录下的文件。像 *.jar 一样应该找到提供的目录和子目录下的所有文件。
Apache Beam sdk 提供了org.apache.beam.sdk.io.FileIO
用于此类目的的类,我可以使用pipeline.apply(FileIO.match().filepattern(filesPattern))
.
如何使其递归搜索与提供的模式匹配的所有文件?
目前,我正在尝试另一种方法,其中我正在创建提供的模式的resourceId并获取提供的模式的当前目录,然后我尝试使用方法解析当前目录中的所有子目录resourceId.resolve()
。但它抛出了一个例外。
val currentDir = FileSystems.matchNewResource(filesPattern, false).getCurrentDirectory val childDir = currentDir.resolve("{@literal *}", StandardResolveOptions.RESOLVE_DIRECTORY)
请建议使用 apache beam 递归搜索文件的正确方法是什么?
参考文献: https: //beam.apache.org/releases/javadoc/2.11.0/index.html?org /apache/beam/sdk/io/fs/ResourceId.html
心有法竹
相关分类