标签(node
): WebDevelop
上次说了在Node
中两个基本的流:Readable
和Writable
流,接下来说说在Node.js
中更高级的流。
Duplex 流
Duplex
流(双向数据流)结合了可读流和可写的特性,使得Duplex
流可以进行读取和写入操作。
当然,Duplex
流也可以自定义实现,实现方式和可读流(可写流)的方式差不多,使用util模块提供的inherits()
方法:
var util = require('util'); util.inherits(MyDuplexStream,stream.Duplex);
然后创建对象时调用
stream.Duplex.call(this,opt);
创建一个Duplex
流的opt
参数接受一个叫allowHalfOpen
的布尔值属性,若该值为true
,则表示可写端已经结束,可读端也能保持打开状态;反之亦然。若为false
,则表示结束可写入端可读端也会结束;反之亦然。
当然,既然称Duplex
流为可读和可写流的合并,所以一个完整的Duplex
流需要同时实现_read(size)
和_write(data,encoding,callback)
方法。同时,Duplex
流同时拥有可读和可写流方法和事件。具体的例子如下:
var stream = require('stream');var util = require('util'); util.inherits(Duplexer, stream.Duplex);function Duplexer(opt) { stream.Duplex.call(this, opt); this.data = []; } Duplexer.prototype._read = function readItem(size) { var chunk = this.data.shift(); if (chunk == "stop"){ this.push(null); } else{ if(chunk){ this.push(chunk); } else { setTimeout(readItem.bind(this), 500, size); } } }; Duplexer.prototype._write = function(data, encoding, callback) { this.data.push(data); callback(); };var d = new Duplexer(); d.on('data', function(chunk){ console.log('read: ', chunk.toString()); }); d.on('end', function(){ console.log('Message Complete'); }); d.write("I think, "); d.write("therefore "); d.write("I am."); d.write("Rene Descartes"); d.write("stop");//控制台的输出read: I think,read: therefore read: I am. read: Rene Descartes Message Complete
Transform流
Transform
流(转换流)扩展了Deplex
流,但它修改了Writable
流和Readable
流之间的数据。当你需要从一个系统到另一个系统的数据时,这个流就能大显身手。
用于压缩文件的
zlib
流主要用于加密的
crypto
流
Duplex
和Transform
之间的一个主要区别是,在Transform
流中不需要实现_read()
和_write()
。相反的需要实现另外两个方法,_transform(chunk,encoding,callback)
和_flush(callback)
。
_transform
方法接受来自write()
请求的数据,对其修改后,推出修改结果_flush
方法用于刷新数据
下面就自定义实现一个Transform流:
var stream = require("stream");var util = require("util"); util.inherits(JSONObjectStream, stream.Transform);function JSONObjectStream (opt) { stream.Transform.call(this, opt); }; JSONObjectStream.prototype._transform = function (data, encoding, callback) { object = data ? JSON.parse(data.toString()) : ""; this.emit("object", object); object.handled = true; this.push(JSON.stringify(object)); callback(); }; JSONObjectStream.prototype._flush = function(cb) { cb(); };var tc = new JSONObjectStream(); tc.on("object", function(object){ console.log("Name: %s", object.name); console.log("Color: %s", object.color); }); tc.on("data", function(data){ console.log("Data: %s", data.toString()); }); tc.write('{"name":"Carolinus", "color": "Green"}'); tc.write('{"name":"Solarius", "color": "Blue"}'); tc.write('{"name":"Lo Tae Zhao", "color": "Gold"}'); tc.write('{"name":"Ommadon", "color": "Red"}');//控制台的输出Name: Carolinus Color: Green Data: {"name":"Carolinus","color":"Green","handled":true} Name: Solarius Color: Blue Data: {"name":"Solarius","color":"Blue","handled":true} Name: Lo Tae Zhao Color: Gold Data: {"name":"Lo Tae Zhao","color":"Gold","handled":true} Name: Ommadon Color: Red Data: {"name":"Ommadon","color":"Red","handled":true}
把Readable流用管道输送到Writable流
在Readable
流的方法描述中,提到一个方法pipe(writableStream,[options])
,此方法会把一个Readable
流链接到Writable
流,会把Readable
流中的数据直接输入到Writable
流。option
参数接受一个end
属性设置为true
或false
的对象。当end
是true
是,Writable
流随着Readable
流的结束而结束,默认是true
。
一下例子简要的展示了使用的基本过程。
var stream = require('stream');var util = require('util'); util.inherits(Reader, stream.Readable); util.inherits(Writer, stream.Writable);function Reader(opt) { stream.Readable.call(this, opt); this._index = 1; } Reader.prototype._read = function(size) { var i = this._index++; if (i > 10){ this.push(null); } else { this.push("Item " + i.toString()); } };function Writer(opt) { stream.Writable.call(this, opt); this._index = 1; } Writer.prototype._write = function(data, encoding, callback) { console.log(data.toString()); callback(); };var r = new Reader();var w = new Writer(); r.pipe(w);//控制台的输出Item 1Item 2Item 3Item 4Item 5Item 6Item 7Item 8Item 9Item 10
用Zlib压缩和解压数据
在数据量吞吐量大的系统中,数据的存储量将会变得特别的大,这时压缩/解压数据就显的十分的重要。在Node.js
中提供了一个Zlib
的模块,这样就可以非常方便、高效地解压和压缩在缓冲区的数据了。
但是需要的记住的是,压缩数据和解压数据需要花大量的时间,所以在使用压缩/解压模块时,应该确定压缩能带来的好处。同时,压缩操作也应当考虑在程序空闲时使用。
Zlib支持以下几种压缩方式:
gzip/gunzip
标准的gzip
压缩。deflate/inflate
基于Huffman
编码的标准deflate
压缩算法。deflateRaw/inflateRaw
针对原始缓冲区的deflate
压缩算法。
这些方法具有通用的调用方法function(buffer,callback)
,其中function
是压缩/解压方法,buffer
是被压缩/解压的缓冲区,callback
为回掉函数。
以下就是在Zlib
下压缩/解压的方法展示:
var zlib = require("zlib");var input = '...............text...............'; zlib.deflate(input, function(err, buffer) { if (!err) { console.log("deflate (%s): ", buffer.length, buffer.toString('base64')); zlib.inflate(buffer, function(err, buffer) { if (!err) { console.log("inflate (%s): ", buffer.length, buffer.toString()); } }); zlib.unzip(buffer, function(err, buffer) { if (!err) { console.log("unzip deflate (%s): ", buffer.length, buffer.toString()); } }); } }); zlib.deflateRaw(input, function(err, buffer) { if (!err) { console.log("deflateRaw (%s): ", buffer.length, buffer.toString('base64')); zlib.inflateRaw(buffer, function(err, buffer) { if (!err) { console.log("inflateRaw (%s): ", buffer.length, buffer.toString()); } }); } }); zlib.gzip(input, function(err, buffer) { if (!err) { console.log("gzip (%s): ", buffer.length, buffer.toString('base64')); zlib.gunzip(buffer, function(err, buffer) { if (!err) { console.log("gunzip (%s): ", buffer.length, buffer.toString()); } }); zlib.unzip(buffer, function(err, buffer) { if (!err) { console.log("unzip gzip (%s): ", buffer.length, buffer.toString()); } }); } });//控制台显示deflate (18): eJzT00MBJakVJagiegB9Zgcq deflateRaw (12): 09NDASWpFSWoInoA gzip (30): H4sIAAAAAAAAC9PTQwElqRUlqCJ6AIq+x+AiAAAA inflate (34): ...............text............... unzip deflate (34): ...............text............... inflateRaw (34): ...............text............... gunzip (34): ...............text............... unzip gzip (34): ...............text...............
Zlib
模块不仅仅提供了对缓冲区的压缩/解压方法,同时也有对流的压缩/解压方法,可以使用pipe()
函数,通过压缩/解压对象来吧数据从一个流输出到另一个流。以下就是具体的例子:
var zlib = require("zlib");var gzip = zlib.createGzip();var fs = require('fs');var inFile = fs.createReadStream('zlib_file.js');var outFile = fs.createWriteStream('zlib_file.gz'); inFile.pipe(gzip).pipe(outFile); setTimeout(function(){ var gunzip = zlib.createUnzip({flush: zlib.Z_FULL_FLUSH}); var inFile = fs.createReadStream('zlib_file.gz'); var outFile = fs.createWriteStream('zlib_file.unzipped'); inFile.pipe(gunzip).pipe(outFile); }, 3000);
小结:大部分密集的Web应用程序和服务的核心都是从一个系统进入另一个系统的庞大数据流,Node.js
中的数据处理IO
到这里也就差不多,从处理JSON
到二进制的缓冲区,再到数据流的处理都进行了比较深入的了解。
作者:aco阿飞
链接:https://www.jianshu.com/p/c1fe91bc7481