[whatwg/streams] Pipe completes before all writes are finished with preventClose = true (#882)

I believe there's a bug in the reference implementation for piping with `preventClose = true`. The implementation does not correctly wait for all writes to finish before completing the pipe operation.

## Test
The following web platform test is derived from the existing test _"Closing must be propagated forward: shutdown must not occur until the final write completes; preventClose = true"_ ([`streams/piping/close-propagation-forward.js`](https://github.com/w3c/web-platform-tests/blob/840362217deb3f9b82fe18a619d0bf21ddc55781/streams/piping/close-propagation-forward.js#L427)).

The only change is that the readable stream is closed is **asynchronously** after enqueuing, whereas in the original test `enqueue()` and `close()` happen in the same tick.

This test fails on the assert _"the pipe must not be complete"_. That is: the pipe completes **before** the write promise is resolved, which violates the specification.
```js
promise_test(() => {

  const rs = recordingReadableStream();

  let resolveWritePromise;
  const ws = recordingWritableStream({
    write() {
      return new Promise(resolve => {
        resolveWritePromise = resolve;
      });
    }
  });

  let pipeComplete = false;
  const pipePromise = rs.pipeTo(ws, { preventClose: true }).then(() => {
    pipeComplete = true;
  });

  rs.controller.enqueue('a');

  return flushAsyncEvents().then(() => {
    rs.controller.close();

    // Flush async events and verify that no shutdown occurs.
    return flushAsyncEvents();
  }).then(() => {
    assert_array_equals(ws.events, ['write', 'a'],
      'the chunk must have been written, but close must not have happened yet');
    assert_equals(pipeComplete, false, 'the pipe must not be complete');

    resolveWritePromise();

    return pipePromise;
  });

}, 'Closing must be propagated forward: shutdown must not occur until the final write completes; preventClose = true, delayed close');
```

## Root cause
>From my investigation, it seems the bug is in [`pipeLoop`](https://github.com/whatwg/streams/blob/d91864509e66fc292f8cdba34ca00bfc09e2565e/reference-implementation/lib/readable-stream.js#L125):
```js
// Using reader and writer, read all chunks from this and write them to dest
// - Backpressure must be enforced
// - Shutdown must stop all activity
function pipeLoop() {
  currentWrite = Promise.resolve();

  if (shuttingDown === true) {
    return Promise.resolve();
  }

  return writer._readyPromise.then(() => {
    return ReadableStreamDefaultReaderRead(reader).then(({ value, done }) => {
      if (done === true) {
        return;
      }

      currentWrite = WritableStreamDefaultWriterWrite(writer, value).catch(() => {});
    });
  })
  .then(pipeLoop);
}
```
Removing the first line (`currentWrite = Promise.resolve()`) solves the problem, which I believe is a correct fix for the problem. When `pipeLoop` recurses through `.then(pipeLoop)`, only the read has resolved, and `currentWrite` is still pending. Therefore, `currentWrite` **should not** be reset when starting a new iteration, it should only be replaced by a new write (which is guaranteed to resolve after the previous write).

-- 
You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/issues/882

Received on Wednesday, 14 February 2018 22:55:29 UTC