- From: Domenic Denicola <notifications@github.com>
- Date: Fri, 21 Oct 2016 14:59:09 -0700
- To: whatwg/streams <streams@noreply.github.com>
- Message-ID: <whatwg/streams/pull/512/review/5325787@github.com>
domenic commented on this pull request.
> -abstract operation calls, instead of using the JavaScript-exposed readable and writable stream APIs. This will better
-allow optimization and specialization. See <a href="https://github.com/whatwg/streams/issues/407">#407</a> and <a
-href="https://github.com/whatwg/streams/issues/97">#97</a> for more information.
+<emu-alg>
+ 1. If ! IsReadableStream(*this*) is *false*, return a promise rejected with a *TypeError* exception.
+ 1. If ! IsWritableStream(_dest_) is *false*, return a promise rejected with a *TypeError* exception.
+ 1. Set _preventClose_ to ! ToBoolean(_preventClose_), set _preventAbort_ to ! ToBoolean(_preventAbort_), and set
+ _preventCancel_ to ! ToBoolean(_preventCancel_).
+ 1. If ! IsReadableStreamLocked(*this*) is *true*, return a promise rejected with a *TypeError* exception.
+ 1. If ! IsWritableStreamLocked(_dest_) is *true*, return a promise rejected with a *TypeError* exception.
+ 1. If ! IsReadableByteStreamController(*this*.[[readableStreamController]]) is true, let _reader_ be either !
+ AcquireReadableStreamBYOBReader(*this*) or ! AcquireReadableStreamDefaultReader(*this*), at the user agent's
+ discretion.
+ 1. Otherwise, let _reader_ be ! AcquireReadableStreamDefaultReader(*this*).
+ 1. Let _writer_ be ! AcquireWritableStreamDefaultWriter(_dest_).
+ 1. Let _promise_ be a new promise.
done
> + discretion.
+ 1. Otherwise, let _reader_ be ! AcquireReadableStreamDefaultReader(*this*).
+ 1. Let _writer_ be ! AcquireWritableStreamDefaultWriter(_dest_).
+ 1. Let _promise_ be a new promise.
+ 1. Let _shuttingDown_ be *false*.
+ 1. <a>In parallel</a>, using _reader_ and _writer_, read all <a>chunks</a> from *this* and write them to _dest_. Due
+ to the locking provided by the reader and writer, the exact manner in which this happens is not observable to
+ author code, and so there is flexibility in how this is done. The following constraints apply regardless of the
+ exact algorithm used:
+ * <strong>Public API must not be used:</strong> while reading or writing, or performing any of the operations
+ below, the JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes)
+ must not be used. Instead, the streams must be manipulated directly.
+ * <strong>Backpressure must be enforced:</strong>
+ * While WritableStreamDefaultWriterGetDesiredSize(_writer_) is ≤ *0* or is *null*, the user agent must not read
+ from _reader_.
+ * If _reader_ is a <a>BYOB reader</a>, WritableStreamDefaultWriterGetDesiredSize(_writer_) should be used to
Sorry, I don't quite understand. What were you thinking of adding?
> + from _reader_.
+ * If _reader_ is a <a>BYOB reader</a>, WritableStreamDefaultWriterGetDesiredSize(_writer_) should be used to
+ determine the size of the chunks read from _reader_.
+ * Otherwise, WritableStreamDefaultWriterGetDesiredSize(_writer_) may be used to determine the flow rate
+ heuristically, e.g. by delaying reads while it is judged to be "low" compared to the size of chunks that have
+ been typically read.
+ * <strong>Shutdown must stop all activity:</strong> if _shuttingDown_ becomes *true*, the user agent must not
+ initiate further reads from _reader_ or writes to _writer_. (Ongoing reads and writes may finish.)
+ * <strong>Errors must be propagated forward:</strong> if *this*.[[state]] is or becomes `"errored"`, then
+ 1. If _preventAbort_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of !
+ WritableStreamAbort(_dest_, *this*.[[storedError]]) and with *this*.[[storedError]].
+ 1. Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a> with *this*.[[storedError]].
+ * <strong>Errors must be propagated backward:</strong> if _dest_.[[state]] is or becomes `"errored"`, then
+ 1. If _preventCancel_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of !
+ ReadableStreamCancel(*this*, _dest_.[[storedError]]) and with _dest_.[[storedError]].
+ 1. Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a> with _dest_.[[storedError]].
fixed
> + * <strong>Closing must be propagated forward:</strong> if *this*.[[state]] is or becomes `"closed"`, then
+ 1. If _preventClose_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of !
+ WritableStreamDefaultWriterCloseWithErrorPropagation(_writer_).
+ 1. Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a>.
+ * <strong>Closing must be propagated backward:</strong> if _dest_.[[state]] is `"closing"` or `"closed"`, then
+ 1. Let _destClosed_ be a new *TypeError*.
+ 1. If _preventCancel_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of !
+ ReadableStreamCancel(*this*, _destClosed_) and with _destClosed_.
+ 1. Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a> with _destClosed_.
+ * <i id="rs-pipeTo-shutdown-with-action">Shutdown with an action</i>: if any of the above requirements ask to
+ shutdown with an action _action_, optionally with an error _originalError_, then:
+ 1. If _shuttingDown_ is *true*, abort these substeps.
+ 1. Set _shuttingDown_ to *true*.
+ 1. Wait until any ongoing write finishes (i.e. the corresponding promises settle).
+ 1. Let _p_ be the result of performing _action_.
+ 1. If _p_ fulfills, <a href="#rs-pipeTo-finalize">finalize</a>, passing along _originalError_ if it was given.
fixed
> @@ -3001,6 +3060,22 @@ nothrow>WritableStreamDefaultWriterClose ( <var>writer</var> )</h4>
1. Return _promise_.
</emu-alg>
+<h4 id="writable-stream-default-writer-close-with-error-propagation" aoid="WritableStreamDefaultWriterCloseWithErrorPropagation"
+nothrow>WritableStreamDefaultWriterCloseWithErrorPropagation ( <var>writer</var> )</h4>
+
+<p class="note">This abstract operation helps implement the error propagation semantics of
+{{ReadableStream/pipeTo()}}.</p>
+
+<emu-alg>
+ 1. Let _stream_ be _writer_.[[ownerWritableStream]].
+ 1. Assert: _stream_ is not *undefined*.
+ 1. Let _state_ be _stream_.[[state]].
+ 1. Assert: _state_ is not `"closing"` or `"closed"`.
+ 1. If _state_ is `"errored"`, return a new promise rejected with _stream_.[[storedError]].
+ 1. Assert: _state_ is `"writable"`.
done
> + setTimeout(() => rs.controller.close());
+
+ return pipePromise.then(value => {
+ assert_equals(value, undefined, 'the promise must fulfill with undefined');
+ })
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['close']);
+
+ return Promise.all([
+ rs.getReader().closed,
+ ws.getWriter().closed
+ ]);
+ });
+
+}, 'Closing must be propagated forward: becomes closed while empty; preventClose omitted; fulfilled close promise');
Ah yeah, good point, changed
> +
+ const writer = ws.getWriter();
+ writer.write('Hello');
+
+ return promise_rejects(t, error1, writer.closed, 'writer.closed must reject with the write error')
+ .then(() => {
+ writer.releaseLock();
+
+ return promise_rejects(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the write error')
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
+ assert_array_equals(ws.events, ['write', 'Hello']);
+ });
+ });
+
+}, 'Errors must be propagated backward: becomes errored before piping due to write; preventCancel omitted; ' +
I think it is pretty redundant, but probably it already existed in some form in the existing tests, so I thought it would be good to port it and keep the coverage.
> + controller.enqueue('c');
+ controller.close();
+ }
+ });
+
+ const ws = recordingWritableStream({
+ write(chunk) {
+ if (chunk === 'c') {
+ return Promise.reject(error1);
+ }
+ return undefined;
+ }
+ });
+
+ return promise_rejects(t, error1, rs.pipeTo(ws, { preventCancel: true }),
+ 'pipeTo must reject with the same error')
fixed
> + return promise_rejects(t, new TypeError(), rs.pipeTo(ws)).then(() => {
+ assert_false(ws.locked, 'the WritableStream must still be unlocked');
+ });
+
+}, 'pipeTo must fail if the ReadableStream is locked, and not lock the WritableStream');
+
+promise_test(t => {
+
+ const rs = new ReadableStream();
+ const ws = new WritableStream();
+
+ ws.getWriter();
+
+ assert_false(rs.locked, 'sanity check: the ReadableStream does not start locked');
+ assert_true(ws.locked, 'sanity check: the WritableStream starts locked');
+
fixed
--
You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/pull/512
Received on Friday, 21 October 2016 21:59:52 UTC