Re: Overlap between StreamReader and FileReader

So, just to be clear, I'm *not* suggesting that browser streams copy
Node streams verbatim.

>>>
In Node.js doing A looks something like:

stream.on('readable', function() {
  var buffer;
  while((buffer = stream.read())) {
    processData(buffer);
  }
});
>>>

Not quite.  In Node.js, doing A looks like:

    stream.on('data', processData);


In my opinion, marrying a browser stream implementation to an
EventEmitter abstraction would be a mistake.  I also think that
marrying it to a Promise implementation would be a mistake.  As
popular as Promises are, they are an additional layer of abstraction
that is not fundamentally related to streaming data, and it is trivial
to turn:

    obj.method(fn)

into:

    obj.method().then(fn)

at the user/library level.  This allows performance-critical
applications to avoid any unnecessary complexity and shorten their
code paths as much as possible, but is easily extended for those who
prefer promises (or generators, or coroutines, or what have you.)
 Despite what you may see on twitter or mailing lists, the choice to
use this minimal abstraction for Node's asynchrony has allowed all
these different things to coexist rather peacefully, and I believe
that it is a great success.

Even if you feel that promises or generators are the best thing since
generational garbage collection (and certainly, both have their
merits), I think it is worth exploring where such a constraint would
lead us.



So far in this conversation, I've been mostly just trying to figure
out what the state of things is, and pointing out what I see as
potential hazards.  Here are some pro-active suggestions, but this is
still not anything I'm particularly in love with, so treat it only
as an exploration of the problem space.


1. Drop the "read n bytes" part of the API entirely.  It is hard to do
in a way that makes sense for both binary and string streams (as we
see in Node), and complicates the internal mechanisms.  People think
they need it, but what they really need is readUntil(delimiterChar).
 And, that's trivial to implement on top of unshift().  So let's just
add unshift(chunk).


2. Reading strings vs ArrayBuffers or other types of things MUST be a
property of the stream, not of the read() call.  Having readBinary()
and readUtf8(), or read(encoding), is a terrible idea which bloats the
API surface and exposes multibyte landmines.  The easiest way to do
this is to make the API agnostic as to the specific data type
returned.  If we ditch read(number of bytes), then this becomes much
simpler, and also allows for things like streaming JSON parsers that
return arbitrary JavaScript objects.


3. Sync vs async read().  Let's dig into the issue of
`var d = s.read()` vs `s.read(function(d) {})` for getting data out of
a stream.

The problem is that this assumes that each call to read() will be done
when there is no data buffered, and will result in a call to the
underlying system, requiring some async stuff.  However, that's not
always the case, and a lot of times, you actually want a bit of
buffering to occur if you have pipe chains of streams that are
processing at different speeds, where one is bursty and the other is
consistent.

For example, consider a situation where you're interacting with a
local database, and also a 3g network connection. The 3g connection
will be either very fast or completely not moving, and the local
database will be relatively stable.  If you're reading the data from
the network connection, and putting it into the db, you don't want to
pause the 3g connection unnecessarily and miss a potential burst just
because you were waiting for the db.  You *also* don't want those
bursts to overwhelm your buffer, of course.  The solution for this is
to have some pre-defined buffer in the stream implementation, so that
you only pause the bursty stream if the slow-and-steady stream can't
keep up.

If you have a readable stream which is buffering its data in memory,
then doing `s.read(cb)` is always going to be strictly more expensive
than doing `var d = s.read()`.

The only way to make an async read not more expensive than a sync
returning read is for the callback to be inline-able, and called
immediately.  However, this means that it is no longer possible to
reason about any particular read() call, and so this releases Zalgo.

For example:

    console.log('before');
    stream.read(function(data) { console.log('got data') });
    console.log('after');

The ordering of logs must be predictable, which means that we must
*always* defer the callback's execution until at least the end of the
current run-to-completion.  This isn't free.

This problem could potentially be solved if we used synchronous reads,
but mirrored the epoll-like behavior more closely than Node.js does
today, without the read(n) mistake.

```
stream.poll(function ondata() {
  var d = stream.read();
  while (stream.state === 'OK') {
    processData(d);
    d = stream.read();
  }
  switch (stream.state) {
    case 'EOF': onend(); break;
    case 'EWOULDBLOCK': stream.poll(ondata); break;
    default: onerror(new Error('Stream read error: ' + stream.state));
  }
});
```

Of course, this API is terribly verbose if what you want to do is just
get all the data out as fast as possible, but that can be addressed
with a readAll function:

```
// I'm sure nicer APIs could be imagined, this is still rather ugly
ReadableStream.prototype.readAll = function(onerror, ondata, onend) {
  onpoll();
  function onpoll() {
    var d = this.read();
    while (this.state === 'OK') {
      ondata(d);
      d = this.read();
    }
    switch (this.state) {
      case 'EOF': onend(); break;
      case 'EWOULDBLOCK': this.poll(onpoll); break;
      default: onerror(new Error('Stream read error: ' + this.state));
    }
  }
};
```

Also, if you really do want a `read(cb)` kind of approach, you can
still do something like this:

```
ReadableStream.prototype.readAsync = function(cb) {
  var sync = true;
  var self = this;
  onpoll();
  sync = false;

  function call(er, data) {
    if (sync)
      return setImmediate(function() {
        cb.call(self, er, data);
      });
    else
      cb.call(self, er, data);
  }

  function onpoll() {
    var d = this.read();
    if (this.state === 'OK')
      call(null, d);
    else switch (this.state) {
      case 'EOF': call(null, null); break;
      case 'EWOULDBLOCK': this.poll(onpoll); break;
      default: call(null, new Error('Read error: '+this.state));
    }
  }
};
```

4. Passive data listening.  In Node v0.10, it is not possible to
passively "listen" to the data passing through a stream without
affecting the state of the stream.  This is corrected in v0.12, by
making the read() method also emit a 'data' event whenever it returns
data, so v0.8-style APIs work as they used to.

The takeaway here is not to do what Node did, but to learn what Node
learned: the passive-data-listening use-case is relevant.

It could be implemented by wrapping the `read()` method, though that
does feel a bit hacky.

```
function observeData(stream, observer) {
  var read = stream.read;
  stream.read = function() {
    var ret = read.apply(stream, arguments);
    if (stream.state === 'OK') observer(ret);
    return ret;
  };
}
```

I don't have a better solution right now that doesn't rely on event
emission or other heavy abstractions.


5. Piping.  It's important to consider how any proposed readable
stream API will allow one to respond to backpressure, and how it
relates to a *writable* stream API.  Data management from a source to
a destination is the fundamental reason d'etre for streams, after all.

With an epoll-like API, it could work something like this:

```
Readable.prototype.pipe = function(dest, onerror) {
  this.poll(onpoll);
  function onpoll() {
    var d = this.read();
    var drained = true;
    while (this.state === 'OK' && (drained = dest.write(d))) {
      // write returns false for "time to stop",
      // and true for "yes, more please"
      d = this.read();
    }
    if (!drained)
      dest.flush(onpoll.bind(this));
    else switch (this.state) {
      case 'EOF': dest.end(); break;
      case 'EWOULDBLOCK': this.poll(onpoll); break;
      default: onerror(new Error('Stream read error: ' + this.state));
    }
  }
};
```


6. Error handling.  If we ditch event emitters, and have asynchrony
that is not handled by world-wrapping promises, coroutines,
generators, or other such things, then there's no single place to
handle errors.  Maybe a `stream.onerror(cb)` type of method?  Throwing
for run-time async errors is a big no-no, but being alerted to these
errors in your program is very important.


Well, this is probably all quite a lot to consider, so I'll stop here
and wait for you all to think up some clever alternatives to these
issues.  The code in this message is just a very rough sketch, like I
said above, and I'm not in love with any of these APIs.  But, I think
that the design constraints are definitely worth considering.


On Thu, Aug 22, 2013 at 4:20 AM, Aymeric Vitte <vitteaymeric@gmail.com> wrote:
>
>
> Le 22/08/2013 09:28, Jonas Sicking a écrit :
>
>> Does anyone have examples of code that uses the Node.js API? I'd love
>> to look at how people practically end up consuming data?
>
> I am doing something like this:
>
> var parse=function() {
>     //process this.stream_
>     this.queue_.shift();
>     if (this.queue_.length) {
>         this.queue_[0]();
>     };
> };
> var process=function(data) {
>     return function() {
>         this.stream_=[this.stream_,data].concatBuffers();
>         parse.call(this);
>     };
> };
> var on_data=function(data) {
>     this.queue_=this.queue_||[];
>     this.queue_.push(process(data).bind(this));
>     if (this.queue_.length===1) {
>         this.queue_[0]();
>     };
> };
> request.on('data',function(data) {
>     on_data.call(this,data);
> });
>
> I don't remember exactly if it's due to my implementation or node (because I
> am using both node's Buffers and Typed Arrays) but I experienced some
> problems where data was modified while it was being processed, that's why
> this.stream_ is freezing the data received (with remaining bytes received
> earlier, see next sentence) until it is processed.
>
> Coming back to my previous TextEncoder/Decoder remark for utf-8 parsing, I
> don't know how to do this with native node functions.
>
> Regards
>
> Aymeric
>
>
> --
> jCore
> Email :  avitte@jcore.fr
> iAnonym : http://www.ianonym.com
> node-Tor : https://www.github.com/Ayms/node-Tor
> GitHub : https://www.github.com/Ayms
> Web :    www.jcore.fr
> Extract Widget Mobile : www.extractwidget.com
> BlimpMe! : www.blimpme.com
>

Received on Thursday, 22 August 2013 17:41:39 UTC