Re: [whatwg/streams] light weight transformations (#461)

>> @domenic: How would back pressure propagation work in this scheme? If enqueue() does not return a promise, would there be another interface to pace the transform?

> I'm not sure I quite understand the question. It would return a promise, so that's a pretty solid way to pace the transform. So rs would automatically get backpressure applied.

Okay, that's good. As you are probably aware, this differs from [ReadableStreamDefaultControllerEnqueue](https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue). Are you planning to make that return a promise as well? It would be somewhat unfortunate to have two almost identical interfaces with subtly different semantics. It also affects performance, as I'll explain below.

> I'm not sure what you mean by "concurrent" here, since JS doesn't have concurrency except with web workers.

You seem to be thinking of *parallelism*. There is plenty of support for concurrency in a single node process, but no support for parallelism. See for example [this explanation of the difference](http://c2.com/cgi/wiki?CategoryConcurrency) between the two.

> Do you mean the transform enqueuing multiple chunks in response to one incoming chunk? Yes, that is definitely supported. But in rs.pipeThrough({ writable, readable }), readable already has a buffer we can use---one which is automatically drained if there are outstanding read requests (which is I think what you are referring to by talking about Promise.prototype.then()). So I guess the cost you are worried about is about putting an element in the array, then taking it out? I'm not really concerned about that, and it can be optimized away if necessary to just directly resolve the pending promise.

The point both @dominictarr and myself have made & experimentally explored is that threading a single Promise chain through several transforms is significantly cheaper than allocating a new Promise + resolve / reject closures per chunk and transform. Sync transforms just add another link to the promise chain they received from their Reader, and return this to their respective caller. Once you add mandatory buffering between transforms & return a Promise from the transform enqueue (but not from [ReadableStreamDefaultControllerEnqueue](https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue)?), you typically end up doing `new Promise((resolve, reject) => { ... })` for each chunk & transform in a pipeline. This adds quite a few allocations. 

Now, you *could* make a push strategy work with a reasonable number of allocations if you pass Promises all the way back up through the pipeline, and avoid buffering by only feeding in one chunk at a time. This would require a change in the spec to return a Promise from [ReadableStreamDefaultControllerEnqueue](https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue). A piped ReadableStream would also need to synchronously call its downstream transform if no concurrent enqueues are pending (in the queue). For a fully synchronous transform pipeline, this would mean that the TransformStream would return the Promise instance returned by the last stage's `ReadableStream.enqueue()` call. 

All this also assumes that `transform` in your code example is never called concurrently:
```javascript
new TransformStream({
  transform(chunk, controller) {
    // use controller.enqueue()
    // return promise to succeed/fail asynchronously
    // **IMPORTANT: Transform is only called once 
    // previous return promise has been resolved / rejected.**
  },
  flush(controller) {
    // use controller.enqueue() if you want
    // return promise to close/error asynchronously
  }
});
```
This is the same no-concurrency guarantee that keeps unbuffered pull transforms simple. 

If concurrent `transform` calls were allowed, then you would force async transforms to implement internal buffering, and sync transforms to immediately write out all their results to the next ReadableStream's buffer. Both cases would be worse than in a `pull` model. While the async case is probably obvious, the sync case might be less so. To illustrate, typical synchronous `split` transforms already return their results in an array. In a `pull` model, that array is then returned one by one ([code example](https://github.com/gwicke/mixmaster/blob/13518bc68644b0c71d6c57e72d2fad9fac02ed7a/index.js#L102)), and when it is exhausted the next input is pulled in & transformed. In a `push` model, the transform needs to synchronously write out all chunks to the downstream `ReadableStream` to escape the need to buffer incoming calls. This just transfers chunks from an existing buffer to another buffer, adding unnecessary overheads in the process.

---
You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/issues/461#issuecomment-237985513

Received on Saturday, 6 August 2016 00:27:37 UTC