- From: Domenic Denicola <notifications@github.com>
- Date: Fri, 21 Oct 2016 14:59:09 -0700
- To: whatwg/streams <streams@noreply.github.com>
- Message-ID: <whatwg/streams/pull/512/review/5325787@github.com>
domenic commented on this pull request. > -abstract operation calls, instead of using the JavaScript-exposed readable and writable stream APIs. This will better -allow optimization and specialization. See <a href="https://github.com/whatwg/streams/issues/407">#407</a> and <a -href="https://github.com/whatwg/streams/issues/97">#97</a> for more information. +<emu-alg> + 1. If ! IsReadableStream(*this*) is *false*, return a promise rejected with a *TypeError* exception. + 1. If ! IsWritableStream(_dest_) is *false*, return a promise rejected with a *TypeError* exception. + 1. Set _preventClose_ to ! ToBoolean(_preventClose_), set _preventAbort_ to ! ToBoolean(_preventAbort_), and set + _preventCancel_ to ! ToBoolean(_preventCancel_). + 1. If ! IsReadableStreamLocked(*this*) is *true*, return a promise rejected with a *TypeError* exception. + 1. If ! IsWritableStreamLocked(_dest_) is *true*, return a promise rejected with a *TypeError* exception. + 1. If ! IsReadableByteStreamController(*this*.[[readableStreamController]]) is true, let _reader_ be either ! + AcquireReadableStreamBYOBReader(*this*) or ! AcquireReadableStreamDefaultReader(*this*), at the user agent's + discretion. + 1. Otherwise, let _reader_ be ! AcquireReadableStreamDefaultReader(*this*). + 1. Let _writer_ be ! AcquireWritableStreamDefaultWriter(_dest_). + 1. Let _promise_ be a new promise. done > + discretion. + 1. Otherwise, let _reader_ be ! AcquireReadableStreamDefaultReader(*this*). + 1. Let _writer_ be ! AcquireWritableStreamDefaultWriter(_dest_). + 1. Let _promise_ be a new promise. + 1. Let _shuttingDown_ be *false*. + 1. <a>In parallel</a>, using _reader_ and _writer_, read all <a>chunks</a> from *this* and write them to _dest_. Due + to the locking provided by the reader and writer, the exact manner in which this happens is not observable to + author code, and so there is flexibility in how this is done. The following constraints apply regardless of the + exact algorithm used: + * <strong>Public API must not be used:</strong> while reading or writing, or performing any of the operations + below, the JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes) + must not be used. Instead, the streams must be manipulated directly. + * <strong>Backpressure must be enforced:</strong> + * While WritableStreamDefaultWriterGetDesiredSize(_writer_) is ≤ *0* or is *null*, the user agent must not read + from _reader_. + * If _reader_ is a <a>BYOB reader</a>, WritableStreamDefaultWriterGetDesiredSize(_writer_) should be used to Sorry, I don't quite understand. What were you thinking of adding? > + from _reader_. + * If _reader_ is a <a>BYOB reader</a>, WritableStreamDefaultWriterGetDesiredSize(_writer_) should be used to + determine the size of the chunks read from _reader_. + * Otherwise, WritableStreamDefaultWriterGetDesiredSize(_writer_) may be used to determine the flow rate + heuristically, e.g. by delaying reads while it is judged to be "low" compared to the size of chunks that have + been typically read. + * <strong>Shutdown must stop all activity:</strong> if _shuttingDown_ becomes *true*, the user agent must not + initiate further reads from _reader_ or writes to _writer_. (Ongoing reads and writes may finish.) + * <strong>Errors must be propagated forward:</strong> if *this*.[[state]] is or becomes `"errored"`, then + 1. If _preventAbort_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of ! + WritableStreamAbort(_dest_, *this*.[[storedError]]) and with *this*.[[storedError]]. + 1. Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a> with *this*.[[storedError]]. + * <strong>Errors must be propagated backward:</strong> if _dest_.[[state]] is or becomes `"errored"`, then + 1. If _preventCancel_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of ! + ReadableStreamCancel(*this*, _dest_.[[storedError]]) and with _dest_.[[storedError]]. + 1. Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a> with _dest_.[[storedError]]. fixed > + * <strong>Closing must be propagated forward:</strong> if *this*.[[state]] is or becomes `"closed"`, then + 1. If _preventClose_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of ! + WritableStreamDefaultWriterCloseWithErrorPropagation(_writer_). + 1. Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a>. + * <strong>Closing must be propagated backward:</strong> if _dest_.[[state]] is `"closing"` or `"closed"`, then + 1. Let _destClosed_ be a new *TypeError*. + 1. If _preventCancel_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of ! + ReadableStreamCancel(*this*, _destClosed_) and with _destClosed_. + 1. Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a> with _destClosed_. + * <i id="rs-pipeTo-shutdown-with-action">Shutdown with an action</i>: if any of the above requirements ask to + shutdown with an action _action_, optionally with an error _originalError_, then: + 1. If _shuttingDown_ is *true*, abort these substeps. + 1. Set _shuttingDown_ to *true*. + 1. Wait until any ongoing write finishes (i.e. the corresponding promises settle). + 1. Let _p_ be the result of performing _action_. + 1. If _p_ fulfills, <a href="#rs-pipeTo-finalize">finalize</a>, passing along _originalError_ if it was given. fixed > @@ -3001,6 +3060,22 @@ nothrow>WritableStreamDefaultWriterClose ( <var>writer</var> )</h4> 1. Return _promise_. </emu-alg> +<h4 id="writable-stream-default-writer-close-with-error-propagation" aoid="WritableStreamDefaultWriterCloseWithErrorPropagation" +nothrow>WritableStreamDefaultWriterCloseWithErrorPropagation ( <var>writer</var> )</h4> + +<p class="note">This abstract operation helps implement the error propagation semantics of +{{ReadableStream/pipeTo()}}.</p> + +<emu-alg> + 1. Let _stream_ be _writer_.[[ownerWritableStream]]. + 1. Assert: _stream_ is not *undefined*. + 1. Let _state_ be _stream_.[[state]]. + 1. Assert: _state_ is not `"closing"` or `"closed"`. + 1. If _state_ is `"errored"`, return a new promise rejected with _stream_.[[storedError]]. + 1. Assert: _state_ is `"writable"`. done > + setTimeout(() => rs.controller.close()); + + return pipePromise.then(value => { + assert_equals(value, undefined, 'the promise must fulfill with undefined'); + }) + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['close']); + + return Promise.all([ + rs.getReader().closed, + ws.getWriter().closed + ]); + }); + +}, 'Closing must be propagated forward: becomes closed while empty; preventClose omitted; fulfilled close promise'); Ah yeah, good point, changed > + + const writer = ws.getWriter(); + writer.write('Hello'); + + return promise_rejects(t, error1, writer.closed, 'writer.closed must reject with the write error') + .then(() => { + writer.releaseLock(); + + return promise_rejects(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the write error') + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]); + assert_array_equals(ws.events, ['write', 'Hello']); + }); + }); + +}, 'Errors must be propagated backward: becomes errored before piping due to write; preventCancel omitted; ' + I think it is pretty redundant, but probably it already existed in some form in the existing tests, so I thought it would be good to port it and keep the coverage. > + controller.enqueue('c'); + controller.close(); + } + }); + + const ws = recordingWritableStream({ + write(chunk) { + if (chunk === 'c') { + return Promise.reject(error1); + } + return undefined; + } + }); + + return promise_rejects(t, error1, rs.pipeTo(ws, { preventCancel: true }), + 'pipeTo must reject with the same error') fixed > + return promise_rejects(t, new TypeError(), rs.pipeTo(ws)).then(() => { + assert_false(ws.locked, 'the WritableStream must still be unlocked'); + }); + +}, 'pipeTo must fail if the ReadableStream is locked, and not lock the WritableStream'); + +promise_test(t => { + + const rs = new ReadableStream(); + const ws = new WritableStream(); + + ws.getWriter(); + + assert_false(rs.locked, 'sanity check: the ReadableStream does not start locked'); + assert_true(ws.locked, 'sanity check: the WritableStream starts locked'); + fixed -- You are receiving this because you are subscribed to this thread. Reply to this email directly or view it on GitHub: https://github.com/whatwg/streams/pull/512
Received on Friday, 21 October 2016 21:59:52 UTC