我正在尝试使用 Reactor 的 Flux 并行处理多个文件。主要工作负载发生在对 Flux 的调用中flatMap,然后对 Flux 进行转换和过滤。
每当我尝试订阅生成的 Flux 时,主线程都会在收到任何值之前退出。
Flux.fromStream(Files.list(Paths.get("directory"))
.flatMap(path -> {
return Flux.create(sink -> {
try (
RandomAccessFile file = new RandomAccessFile(new File(path), "r");
FileChannel fileChannel = file.getChannel()
) {
// Process file into tokens
sink.next(new Token(".."));
} catch (IOException e) {
sink.error(e);
} finally {
sink.complete();
}
}).subscribeOn(Schedulers.boundedElastic());
})
.map(token -> /* Transform tokens */)
.filter(token -> /* Filter tokens*/)
.subscribe(token -> /* Store tokens in list */)
我希望在列表中找到处理管道的输出,但程序立即退出。首先我想知道我是否正确使用 Flux 类,其次我如何等待订阅调用完成?
青春有我
相关分类