Re: [whatwg/streams] Add a proof of concept of optimized pipe (#511)

OK.

I forgot to note but this is intended to be used just for demonstrating that our API enables this kind of optimization. Not necessarily intended to be a part of the specification.

### PipeRequest

First, an instance of the PipeRequest class corresponds to each pipeTo() invocation. It holds the instances and parameters given to the method:
- destination writable stream instance
- source readable stream instance
- parameters such as preventClose given to pipeTo()

It also has
- the `pipe` slot which points the Pipe instance which represents ongoing piping which is going to satisfy the PipeRequest (wholly or partially) if any.
- the `done` flag which is initially false.

Every time a new pipeTo() is invoked, a PipeRequest instance describing the pipeTo() invocation is created and:
- the PipeRequest is registered with the global pipe manager.
- the PipeRequest retrieves current **(skipped) piping candidates** from the destination writable stream from `.pipeCandidates` and passes it to the source readable stream by calling `.notifyPipeCandidates()`. This is called **pipe candidates propagation**.
- the PipeRequest subscribes to the destination writable stream to get notified of updates on the piping candidates of the destination writable stream by calling `.waitPipeCandidatesChange()`. Every time new candidates are received, it's propagated to the source readable stream.

The global pipe manager holds:
- a list of all readable stream instances being piped.
- lists representing piping candidates for each of the piped readable streams.
  - It's a list of references to PipeRequest instances. It's sorted in the order where ones went through the least number of "pipe candidates propagation" comes first. I.e. destinations closest to the readable stream comes first.

The global pipe manager chooses the best subset of triplets of:
- a readable stream
- pipe candidate (i.e. the destination writable stream)
- the transfer method to use. E.g. between a TCP socket and a file descriptor, either of the default pipeTo() algorithm operating over the public ReadableStream.*Reader/WritableStream.*Writer interfaces, or offloading by sendfile() could be used. The global pipe manager lists up available transfer methods by checking the underlying sources/sinks of the readable/writable stream pair.

The subset must cover all the active PipeRequests. E.g. skipped piping to `b` in a list representing skipped piping candidates `[a, b, c]` can cover PipeRequests `a` and `b`. `c` must be covered by any other pipe.

### Reorganizing pipes

Ongoing pipes can be stopped to be reorganized into longer or faster pipe. The duration (number of bytes to transfer) of a long piping is limited to the minimum of the requested number of bytes to transfer of all PipeRequests covered by the long piping.

### Candidate propagation

The propagation of pipe candidates may happen asynchronously. E.g. IdentityTransformStream does that. This is useful when an IdentityTransformStream already have some chunks enqueued in it. It may exert backpressure to the source readable stream of an existing pipe whose destination is the writable side of the IdentityTransformStream so that it temporarily stops write()-ing new chunks. Once all the chunks are drained, it can announce the piping candidates received at the readable side to the writable side to choose the faster one. I think this backpressure is not normal backpressure signal but should be made by announcing an empty pipeCandidates to completely stop the piping (readable to the writable of the IdentityTransformStream) and wait for the ongoing piping to stop. We need some mechanism to realize this. Without strict backpressure like this, it's possible the ongoing pipe write()s new chunks before seeing the updates pipe candidates including long piping and switch to it before the queued chunks are processed.


-- 
You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/pull/511#issuecomment-243676501

Received on Wednesday, 31 August 2016 07:03:39 UTC