Re: [whatwg/streams] Rigorously specify and test pipeTo (#512)

domenic commented on this pull request.

> -abstract operation calls, instead of using the JavaScript-exposed readable and writable stream APIs. This will better
-allow optimization and specialization. See <a href="">#407</a> and <a
-href="">#97</a> for more information.
+  1. If ! IsReadableStream(*this*) is *false*, return a promise rejected with a *TypeError* exception.
+  1. If ! IsWritableStream(_dest_) is *false*, return a promise rejected with a *TypeError* exception.
+  1. Set _preventClose_ to ! ToBoolean(_preventClose_), set _preventAbort_ to ! ToBoolean(_preventAbort_), and set
+     _preventCancel_ to ! ToBoolean(_preventCancel_).
+  1. If ! IsReadableStreamLocked(*this*) is *true*, return a promise rejected with a *TypeError* exception.
+  1. If ! IsWritableStreamLocked(_dest_) is *true*, return a promise rejected with a *TypeError* exception.
+  1. If ! IsReadableByteStreamController(*this*.[[readableStreamController]]) is true, let _reader_ be either !
+     AcquireReadableStreamBYOBReader(*this*) or ! AcquireReadableStreamDefaultReader(*this*), at the user agent's
+     discretion.
+  1. Otherwise, let _reader_ be ! AcquireReadableStreamDefaultReader(*this*).
+  1. Let _writer_ be ! AcquireWritableStreamDefaultWriter(_dest_).
+  1. Let _promise_ be a new promise.


> +     discretion.
+  1. Otherwise, let _reader_ be ! AcquireReadableStreamDefaultReader(*this*).
+  1. Let _writer_ be ! AcquireWritableStreamDefaultWriter(_dest_).
+  1. Let _promise_ be a new promise.
+  1. Let _shuttingDown_ be *false*.
+  1. <a>In parallel</a>, using _reader_ and _writer_, read all <a>chunks</a> from *this* and write them to _dest_. Due
+     to the locking provided by the reader and writer, the exact manner in which this happens is not observable to
+     author code, and so there is flexibility in how this is done. The following constraints apply regardless of the
+     exact algorithm used:
+     * <strong>Public API must not be used:</strong> while reading or writing, or performing any of the operations
+       below, the JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes)
+       must not be used. Instead, the streams must be manipulated directly.
+     * <strong>Backpressure must be enforced:</strong>
+       * While WritableStreamDefaultWriterGetDesiredSize(_writer_) is ≤ *0* or is *null*, the user agent must not read
+         from _reader_.
+       * If _reader_ is a <a>BYOB reader</a>, WritableStreamDefaultWriterGetDesiredSize(_writer_) should be used to

Sorry, I don't quite understand. What were you thinking of adding?

> +         from _reader_.
+       * If _reader_ is a <a>BYOB reader</a>, WritableStreamDefaultWriterGetDesiredSize(_writer_) should be used to
+         determine the size of the chunks read from _reader_.
+       * Otherwise, WritableStreamDefaultWriterGetDesiredSize(_writer_) may be used to determine the flow rate
+         heuristically, e.g. by delaying reads while it is judged to be "low" compared to the size of chunks that have
+         been typically read.
+     * <strong>Shutdown must stop all activity:</strong> if _shuttingDown_ becomes *true*, the user agent must not
+       initiate further reads from _reader_ or writes to _writer_. (Ongoing reads and writes may finish.)
+     * <strong>Errors must be propagated forward:</strong> if *this*.[[state]] is or becomes `"errored"`, then
+       1. If _preventAbort_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of !
+          WritableStreamAbort(_dest_, *this*.[[storedError]]) and with *this*.[[storedError]].
+       1. Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a> with *this*.[[storedError]].
+     * <strong>Errors must be propagated backward:</strong> if _dest_.[[state]] is or becomes `"errored"`, then
+       1. If _preventCancel_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of !
+          ReadableStreamCancel(*this*, _dest_.[[storedError]]) and with _dest_.[[storedError]].
+       1.  Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a> with _dest_.[[storedError]].


> +     * <strong>Closing must be propagated forward:</strong> if *this*.[[state]] is or becomes `"closed"`, then
+       1. If _preventClose_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of !
+          WritableStreamDefaultWriterCloseWithErrorPropagation(_writer_).
+       1. Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a>.
+     * <strong>Closing must be propagated backward:</strong> if _dest_.[[state]] is `"closing"` or `"closed"`, then
+       1. Let _destClosed_ be a new *TypeError*.
+       1. If _preventCancel_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of !
+          ReadableStreamCancel(*this*, _destClosed_) and with _destClosed_.
+       1. Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a> with _destClosed_.
+     * <i id="rs-pipeTo-shutdown-with-action">Shutdown with an action</i>: if any of the above requirements ask to
+       shutdown with an action _action_, optionally with an error _originalError_, then:
+       1. If _shuttingDown_ is *true*, abort these substeps.
+       1. Set _shuttingDown_ to *true*.
+       1. Wait until any ongoing write finishes (i.e. the corresponding promises settle).
+       1. Let _p_ be the result of performing _action_.
+       1. If _p_ fulfills, <a href="#rs-pipeTo-finalize">finalize</a>, passing along _originalError_ if it was given.


> @@ -3001,6 +3060,22 @@ nothrow>WritableStreamDefaultWriterClose ( <var>writer</var> )</h4>
   1. Return _promise_.
+<h4 id="writable-stream-default-writer-close-with-error-propagation" aoid="WritableStreamDefaultWriterCloseWithErrorPropagation"
+nothrow>WritableStreamDefaultWriterCloseWithErrorPropagation ( <var>writer</var> )</h4>
+<p class="note">This abstract operation helps implement the error propagation semantics of
+  1. Let _stream_ be _writer_.[[ownerWritableStream]].
+  1. Assert: _stream_ is not *undefined*.
+  1. Let _state_ be _stream_.[[state]].
+  1. Assert: _state_ is not `"closing"` or `"closed"`.
+  1. If _state_ is `"errored"`, return a new promise rejected with _stream_.[[storedError]].
+  1. Assert: _state_ is `"writable"`.


> +  setTimeout(() => rs.controller.close());
+  return pipePromise.then(value => {
+    assert_equals(value, undefined, 'the promise must fulfill with undefined');
+  })
+  .then(() => {
+    assert_array_equals(rs.eventsWithoutPulls, []);
+    assert_array_equals(, ['close']);
+    return Promise.all([
+      rs.getReader().closed,
+      ws.getWriter().closed
+    ]);
+  });
+}, 'Closing must be propagated forward: becomes closed while empty; preventClose omitted; fulfilled close promise');

Ah yeah, good point, changed

> +
+  const writer = ws.getWriter();
+  writer.write('Hello');
+  return promise_rejects(t, error1, writer.closed, 'writer.closed must reject with the write error')
+    .then(() => {
+      writer.releaseLock();
+      return promise_rejects(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the write error')
+        .then(() => {
+          assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
+          assert_array_equals(, ['write', 'Hello']);
+        });
+    });
+}, 'Errors must be propagated backward: becomes errored before piping due to write; preventCancel omitted; ' +

I think it is pretty redundant, but probably it already existed in some form in the existing tests, so I thought it would be good to port it and keep the coverage.

> +      controller.enqueue('c');
+      controller.close();
+    }
+  });
+  const ws = recordingWritableStream({
+    write(chunk) {
+      if (chunk === 'c') {
+        return Promise.reject(error1);
+      }
+      return undefined;
+    }
+  });
+  return promise_rejects(t, error1, rs.pipeTo(ws, { preventCancel: true }),
+                                      'pipeTo must reject with the same error')


> +  return promise_rejects(t, new TypeError(), rs.pipeTo(ws)).then(() => {
+    assert_false(ws.locked, 'the WritableStream must still be unlocked');
+  });
+}, 'pipeTo must fail if the ReadableStream is locked, and not lock the WritableStream');
+promise_test(t => {
+  const rs = new ReadableStream();
+  const ws = new WritableStream();
+  ws.getWriter();
+  assert_false(rs.locked, 'sanity check: the ReadableStream does not start locked');
+  assert_true(ws.locked, 'sanity check: the WritableStream starts locked');


You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub:

Received on Friday, 21 October 2016 21:59:52 UTC