流的内部细节


缓冲

无论是可写(Writable)流还是可读(Readable)流,它们都会在内部对象中缓冲数据,这两个内部对象分别可以从 _writableState.getBuffer()_readableState.buffer 中获取。

被缓冲的数据量取决于传递给构造函数的 highWaterMark 选项。

可读(Readable)流的滞留发生于实现调用 stream.push(chunk) 时。如果流的消费者没有调用 stream.read(),那么数据将待在内部队列中,直到被消费。

可写(Writable)流的滞留发生于用户反复调用 stream.write(chunk) 时,即便此时返回 false

流(特别是其中的 stream.pipe() 方法)的初衷是将数据的滞留量限制在一个可接受的水平,以便不同速度的资源和目标不会淹没可用内存。

与 Node.js 早期版本的兼容性

在 v0.10 之前版本的 Node.js 中,可读(Readable)流的接口较为简单,同时功能和实用性也较弱。

  • ‘data’ 事件会在开始时立即发生,而不会等你调用 stream.read() 方法。如果你需要进行某些 I/O 来决定如何处理数据,那么你只能将数据块储存到某种缓冲区中以防它们流失。

  • stream.pause() 方法仅供参考,不保证生效。意味着即便当流处于暂停状态时,你仍需要准备接收 ‘data’ 事件。

在 Node.js v0.10 中新增了 Readable 类。为了向后兼容考虑,可读(Readable)流会在添加了 ‘data’ 事件监听器或调用 stream.resume() 方法时切换至“流动模式”。其效果是,即便你不使用新的 stream.read() 方法和 ‘readable’ 事件,你也不必担心丢失 ‘data’ 事件产生的数据块。

大多数程序会维持正常功能。然而,会在下列条件下引入一种边界情况:

  • 没有添加 ‘data’ 事件处理器。

  • stream.resume() 方法从未被调用。

  • 流未被导流到任何可写目标。

例如,请留意以下代码:

  1. // WARNING! BROKEN!
  2. net.createServer((socket) => {
  3. // we add an 'end' method, but never consume the data
  4. socket.on('end', () => {
  5. // It will never get here.
  6. socket.end('I got your message (but didnt read it)\n');
  7. });
  8. }).listen(1337);

在 Node.js v0.10 之前的版本中,传入消息数据会被简单地丢弃。然而在 Node.js v0.10 及之后的版本中,socket 会一直保持暂停。

在这种情况下的解决方法是调用 stream.resume() 方法来开启数据流:

  1. // Workaround
  2. net.createServer((socket) => {
  3. socket.on('end', () => {
  4. socket.end('I got your message (but didnt read it)\n');
  5. });
  6. // start the flow of data, discarding it.
  7. socket.resume();
  8. }).listen(1337);

除了流动模式下新建可读(Readable)流以外,v0.10 之前风格的流可以通过 stream.wrap() 方法包装成 Readable 类。

对象模式

通常情况下,流只操作字符串和 Buffer。

处于对象模式的流除了 Buffer 和字符串外还能读取普通的 JavaScript 值。

一个处于对象模式的可读(Readable)流调用 stream.read(size) 时总会返回单个项目,无论传入什么 size 参数。

一个处于对象模式的可写(Writable)流总是会忽略传给 stream.write(data, encoding)encoding 参数。

特殊值 null 在对象模式流中依旧保持它的特殊性。也就说,对于对象模式的可读流,stream.read() 返回 null 意味着没有更多数据,同时 stream.push(null) 标志着流数据已到达末端(EOF)。

Node.js 核心不存在对象模式的流。这种设计只被某些用户态流式库所使用。这种模式只用于用户级流库。

你应该在你的流子类构造函数的选项对象中设置 objectMode。在流的过程中设置 objectMode 是不安全的。

对于双工(Duplex)流而言,objectMode 只可以在可读端和可写端分别通过 readableObjectModewritableObjectMode 来设置。这些选项可以被用于在转换(Transform)流中实现解析器和串行器。

  1. const util = require('util');
  2. const StringDecoder = require('string_decoder').StringDecoder;
  3. const Transform = require('stream').Transform;
  4. util.inherits(JSONParseStream, Transform);
  5. // Gets \n-delimited JSON string data, and emits the parsed objects
  6. function JSONParseStream() {
  7. if (!(this instanceof JSONParseStream))
  8. return new JSONParseStream();
  9. Transform.call(this, {
  10. readableObjectMode: true
  11. });
  12. this._buffer = '';
  13. this._decoder = new StringDecoder('utf8');
  14. }
  15. JSONParseStream.prototype._transform = function (chunk, encoding, cb) {
  16. this._buffer += this._decoder.write(chunk);
  17. // split on newlines
  18. var lines = this._buffer.split(/\r?\n/);
  19. // keep the last partial line buffered
  20. this._buffer = lines.pop();
  21. for (var l = 0; l < lines.length; l++) {
  22. var line = lines[l];
  23. try {
  24. var obj = JSON.parse(line);
  25. } catch (er) {
  26. this.emit('error', er);
  27. return;
  28. }
  29. // push the parsed object out to the readable consumer
  30. this.push(obj);
  31. }
  32. cb();
  33. };
  34. JSONParseStream.prototype._flush = function (cb) {
  35. // Just handle any leftover
  36. var rem = this._buffer.trim();
  37. if (rem) {
  38. try {
  39. var obj = JSON.parse(rem);
  40. } catch (er) {
  41. this.emit('error', er);
  42. return;
  43. }
  44. // push the parsed object out to the readable consumer
  45. this.push(obj);
  46. }
  47. cb();
  48. };

stream.read(0)

在某些情况下,你可能需要在不真正消费任何数据的情况下,触发底层可读流机制的刷新。在这种情况下,你可以调用 stream.read(0),它总会返回 null。

如果内部读取缓冲低于 highWaterMark 水位线,并且流当前不在读取状态,那么调用 stream.read(0) 会触发一个低级 stream._read() 调用。

虽然几乎没有必要这么做,但你可以在 Node.js 内部的某些地方看到它确实这么做了,尤其是在可读(Readable)流类的内部。

stream.push(‘’)

推入一个零字节字符串或 Buffer(当不在对象模式时)有一个有趣的副作用。因为是一个对 stream.push() 的调用,它会结束 reading 过程。然而,它没有添加任何数据到可读缓冲中,所以没有东西可以被用户消费。

在极少数情况下,你当时没有提供任何数据,但你的流的消费者(或你的代码的其它部分)会通过调用 stream.read(0) 得知何时需要再次检查。在这中情况下,你可以调用 stream.push('')

到目前为止,这个功能唯一一个使用场景是在 tls.CryptoStream 类中,但在 Node.js/io.js v1.0 中已被废弃。如果你发现你不得不使用 stream.push(''),请考虑另一种方式,因为几乎可以明确表明这是某种可怕的错误。