- From: Domenic Denicola <notifications@github.com>
- Date: Fri, 27 Oct 2017 11:59:19 -0700
- To: whatwg/streams <streams@noreply.github.com>
- Cc: Subscribed <subscribed@noreply.github.com>
- Message-ID: <whatwg/streams/pull/847/review/72598728@github.com>
domenic commented on this pull request. > @@ -4572,6 +4572,55 @@ and similarly, closing or aborting the <code>writable</code> side will implicitl </div> +<h3 id="example-ts-promise-to-readable">Using an identity transform stream as a primitive when manipulating streams</h3> + +Combining an identity transform stream with {{pipeTo()}} is a powerful way to manipulate streams. For example, +concatenating multiple readable streams into one: + +<pre><code class="lang-javascript"> +function concatenateReadables(readables) { Although we haven't used async functions much so far, in this case they give a pretty clear benefit: ```js async function concatenateReadables(readables) { const ts = new TransformStream(); for (const readable of readables) { try { await readable.pipeTo(ts.writable, { preventClose: true }); } catch (e) { await Promise.all([ ts.writable.abort(e), readable.cancel(e) ]); return; } } await ts.writable.getWriter().close(); return ts.readable; } ``` This departs from yours slightly in some error cases in ways that I think are probably improvements? I'm also unsure that the catch() is necessary. Perhaps reading the tests will illuminate why you included it. > + .then(() => readable.pipeTo(ts.writable, { preventClose: true }), + reason => { + return Promise.all([ + ts.writable.abort(reason), + readable.cancel(reason)]); + }); + } + promise.then(() => ts.writable.getWriter().close(), + reason => ts.writable.abort(reason)) + .catch(() => {}); + return ts.readable; +} +</code></pre> + +The error handling here is subtle because cancelling the concatenated stream has to cancel all the intput +streams. However, the success case is simple enough. We just pipe each stream in the <code>readables</code> array one at I think it might be better to not cancel them, but leave them for someone else to consume. Which may eliminate the catch block, or at least part of it. Especially in an example. > +</code></pre> + +The error handling here is subtle because cancelling the concatenated stream has to cancel all the intput +streams. However, the success case is simple enough. We just pipe each stream in the <code>readables</code> array one at +a time to the transform stream's <a>writable side</a>, and then close it when we are done. The <a>readable side</a> is +then a concatenation of all the chunks from all of of the streams. We return it from the function. Backpressure is +applied as usual. + +It's sometimes natural to treat a promise for a readable as if it were a readable. A simple adaptor function is all +that's needed: + +<pre><code class="lang-javascript"> +function promiseToReadable(promiseForReadable) { + const ts = new TransformStream(); + promiseForReadable.then(readable => readable.pipeTo(ts.writable)) + .catch(reason => ts.writable.abort(reason)) Since we didn't pass preventAbort: true, why is this necessary? > + return readableStreamToArray(readable).then(array => assert_array_equals(array, [0, 1, 1, 2], 'array should match')); +}, 'concanateReadables should concatenate readables'); + +promise_test(t => { + const erroredReadable = new ReadableStream({ + start(controller) { + controller.enqueue(2); + controller.error(); + } + }); + const readable = concatenateReadables([arrayToReadableStream([1]), erroredReadable]); + const reader = readable.getReader(); + return reader.read().then(({ value, done }) => { + assert_false(done, 'done should be false'); + assert_equals(value, 1, 'value should be 1'); + // Error is not passed through due to WritableStreamAbort algorithm step 4. Now I'm questioning whether abort not passing through the error is a good idea :(. I think this kind of example was one of the primary reasons I originally designed it to pass through. Oh well... > + +promise_test(t => { + const promiseForString = Promise.resolve('foo'); + const readable = promiseToReadable(promiseForString); + return promise_rejects(t, new TypeError(), readable.getReader().read(), 'read() should reject'); +}, 'promiseToReadable should error the stream it returned when the promise doesn\'t resolve to a readable stream'); + +promise_test(t => { + const erroredReadable = new ReadableStream({ + pull() { + throw new Error('badness'); + } + }); + const promise = new Promise(resolve => resolve(erroredReadable)); + const readable = promiseToReadable(promise); + return promise_rejects(t, new TypeError(), readable.getReader().read(), 'read() should reject'); Again sad we lost the error :( -- You are receiving this because you are subscribed to this thread. Reply to this email directly or view it on GitHub: https://github.com/whatwg/streams/pull/847#pullrequestreview-72598728
Received on Friday, 27 October 2017 18:59:45 UTC