猿问

节点 - 在管道之后正确关闭流

假设我有以下代码:


try {

    let size = 0;


    await pipeline(

        fs.createReadStream('lowercase.txt'),

        async function* (source) {

            for await (const chunk of source) {

                size += chunk.length;

           

                if (size >= 1000000) {

                    throw new Error('File is too big');

                }


                yield String(chunk).toUpperCase();

            }

        },

        fs.createWriteStream('uppercase.txt')

    );


    console.log('Pipeline succeeded.');

} catch (error) {

    console.log('got error:', error);

}

如何确保在每种情况下都正确关闭流?节点文档没有多大帮助——他们只是告诉我我将有悬空事件侦听器:


stream.pipeline() 将在所有流上调用 stream.destroy(err),除了:


发出“结束”或“关闭”的可读流。


发出“完成”或“关闭”的可写流。


调用回调后,stream.pipeline() 会在流上留下悬空的事件侦听器。在失败后重用流的情况下,这可能会导致事件侦听器泄漏和吞噬错误。


千巷猫影
浏览 190回答 2
2回答

守着星空守着你

因此,我发现许多 node.js 流复合操作,例如pipeline()和.pipe()在错误处理方面非常糟糕/不完整。例如,如果你这样做:fs.createReadStream("input.txt")  .pipe(fs.createWriteStream("output.txt"))  .on('error', err => {      console.log(err);  }).on('finish', () => {      console.log("all done");  });您会期望,如果打开 readStream 时出现错误,您会在此处的错误处理程序中收到该错误,但“否”并非如此。打开该输入文件的错误将未处理。这有一些逻辑,因为.pipe()返回输出流并且输入错误不是输出流上的错误,但是当它没有通过时,很容易错过输入流上的错误。该.pipe()操作可以侦听输入流上的错误并传递错误(即使它是 apipeErr或不同的东西),然后它也可以在读取错误时正确清理 writeStream。但是,.pipe()并没有那么彻底地实施。它似乎想假设输入流永远不会出错。相反,您必须单独保存 readStream 对象并直接为其附加错误处理程序才能看到该错误。所以,我只是不再相信这种复合的东西,而且文档从来没有真正解释过如何进行正确的错误处理。我试图查看代码,pipeline()看看我是否能理解错误处理,但这并没有证明是一项富有成效的努力。因此,您的特定问题似乎可以通过转换流来完成:const fs = require('fs');const { Transform } = require('stream');const myTransform = new Transform({    transform: function(chunk, encoding, callback) {        let str = chunk.toString('utf8');        this.push(str.toUpperCase());        callback();    }});function upperFile(input, output) {    return new Promise((resolve, reject) => {        // common function for cleaning up a partial output file        function errCleanup(err) {            fs.unlink(output, function(e) {                if (e) console.log(e);                reject(err);            });        }        let inputStream = fs.createReadStream(input, {encoding: 'utf8'});        let outputStream = fs.createWriteStream(output, {emitClose: true});        // have to separately listen for read/open errors        inputStream.on("error", err => {            // have to manually close writeStream when there was an error reading            if (outputStream) outputStream.destroy();            errCleanup(err);        });        inputStream.pipe(myTransform)            .pipe(outputStream)            .on("error", errCleanup)            .on("close", resolve);            });}// sample usageupperFile("input.txt", "output.txt").then(() => {    console.log("all done");}).catch(err => {    console.log("got error", err);});正如您所看到的,大约 2/3 的代码以稳健的方式处理错误(内置操作无法正确执行的部分)。

慕工程0101907

pipe有那些问题pipeline旨在解决所有问题,它确实pipeline如果从头到尾都有所有部分,那就太好了,但如果没有:即将推出的节点 17 版本将具有stream.compose解决该问题的功能在此之前,流链库是一个不错的选择长篇大论的回答:公认的答案只是忽略了pipeline,但它是专门为解决这个问题而设计的。pipe绝对受到它的影响(更多下文),但我没有发现pipeline没有正确关闭文件、http 等周围的流的情况。带有随机 npm 包的 YMMV,但如果它具有closeordestroy功能以及on('error'事件,应该没问题。为了演示,这会调用 shell 以查看我们的测试文件是否打开:const listOpenFiles = async () => {  const { stdout } = await promisify(exec)("lsof -c node | awk '{print $9}'");  // only show our test files  const openFiles = stdout.split('\n').filter((str) => str.endsWith('case.txt'));  console.log('***** open files:\n', openFiles, '\n-------------');};如果您在上面示例中的循环内调用它:for await (const chunk of source) {  await listOpenFiles();输出将不断重复:***** open files:[  '/path/to/lowercase.txt',  '/path/to/uppercase.txt']如果你在 catch 之后再次调用它,你可以看到一切都关闭了。***** open files: [] 关于引用的文档:文档在前 2 个要点中所指的pipeline是它不会关闭已经关闭的流,因为......好吧,它们已经关闭了。至于悬空的侦听器,它们确实留在传递给的各个流上pipeline。但是,在您的示例(典型案例)中,您无论如何都没有保留对各个流的引用;管道完成后,它们将立即被垃圾收集。例如,如果您经常引用其中一个,它会警告潜在的副作用。// using this same instance over and over will end up with tons of dangling listenersexport const capitalizer = new Transform(// ...相反,最好有“干净”的实例。现在生成器函数很容易链接,甚至根本就没有对转换的引用,但是您可以简单地创建一个返回新实例而不是具有常量实例的函数:export const createCaptilizer = () => new Transform(// ...简而言之,上面的例子在所有 3 点上都很好。更多信息pipepipe,另一方面,确实存在上述传播问题。const csvStream = (file) => {  // does not expose file errors, nor clean up the file stream on parsing errors!!!  return fs.createReadStream(file).pipe(createCsvTransform());};人们普遍认为这很痛苦/不直观,但现在改变它为时已晚。我尽量避免它,我pipeline尽可能推荐。但是,重要的是要注意这pipeline需要将所有部分放在一起。因此,例如,对于上述情况,您还需要最终Writable目标。pipe如果您只想构建链的一部分,您仍然必须在这种情况下使用。解决此问题的方法更容易单独推理:const csvStream = (file) => {  const fileStream = fs.createReadStream(file);  const transform = createCsvTransform();  // pass file errors forward  fileStream.on('error', (error) => transform.emit('error', error));  // close file stream on parsing errors  transform.on('error', () => fileStream.close());  return transform;}不过,也有好消息。它仍然是实验性的,但很快流将公开一个stream.compose功能。它具有 的所有传播/清理优势pipeline,但只是返回一个新流。本质上,这是大多数人认为会pipe做的事情。;)// NO propagation or cleanupreadable.pipe(transform);// automatic propagation and cleanupstream.compose(readable, transform);在此之前,请查看https://www.npmjs.com/package/stream-chain关于pipeline和await请注意,上面的示例使用await pipeline(//...,但链接的文档是同步版本的。这不会返回一个承诺,所以await什么都不做。从节点 15 开始,您通常需要stream/promises这里的 api:https ://nodejs.org/api/stream.html#streams-promises-apiimport { pipeline } from 'stream/promises'; // NOT 'stream'在节点 15 之前,您可以使用 util's 使其成为一个承诺promisify:import { pipeline } from 'stream';import { promisify } from 'util';await promisify(pipeline)(// ...或者,为了简化整个文件:import * as stream from 'stream';import { promisify } from 'util';const pipeline = promisify(stream.pipeline);我之所以提到这一点,是因为如果您使用await同步版本,它实际上不会在 之后完成try/catch,因此可能会给人一种错误的印象,即它实际上尚未完成时清理失败。
随时随地看视频慕课网APP

相关分类

JavaScript
我要回答