- From: Mattias Buelens <notifications@github.com>
- Date: Wed, 14 Feb 2018 22:54:58 +0000 (UTC)
- To: whatwg/streams <streams@noreply.github.com>
- Cc: Subscribed <subscribed@noreply.github.com>
- Message-ID: <whatwg/streams/issues/882@github.com>
I believe there's a bug in the reference implementation for piping with `preventClose = true`. The implementation does not correctly wait for all writes to finish before completing the pipe operation.
## Test
The following web platform test is derived from the existing test _"Closing must be propagated forward: shutdown must not occur until the final write completes; preventClose = true"_ ([`streams/piping/close-propagation-forward.js`](https://github.com/w3c/web-platform-tests/blob/840362217deb3f9b82fe18a619d0bf21ddc55781/streams/piping/close-propagation-forward.js#L427)).
The only change is that the readable stream is closed is **asynchronously** after enqueuing, whereas in the original test `enqueue()` and `close()` happen in the same tick.
This test fails on the assert _"the pipe must not be complete"_. That is: the pipe completes **before** the write promise is resolved, which violates the specification.
```js
promise_test(() => {
const rs = recordingReadableStream();
let resolveWritePromise;
const ws = recordingWritableStream({
write() {
return new Promise(resolve => {
resolveWritePromise = resolve;
});
}
});
let pipeComplete = false;
const pipePromise = rs.pipeTo(ws, { preventClose: true }).then(() => {
pipeComplete = true;
});
rs.controller.enqueue('a');
return flushAsyncEvents().then(() => {
rs.controller.close();
// Flush async events and verify that no shutdown occurs.
return flushAsyncEvents();
}).then(() => {
assert_array_equals(ws.events, ['write', 'a'],
'the chunk must have been written, but close must not have happened yet');
assert_equals(pipeComplete, false, 'the pipe must not be complete');
resolveWritePromise();
return pipePromise;
});
}, 'Closing must be propagated forward: shutdown must not occur until the final write completes; preventClose = true, delayed close');
```
## Root cause
>From my investigation, it seems the bug is in [`pipeLoop`](https://github.com/whatwg/streams/blob/d91864509e66fc292f8cdba34ca00bfc09e2565e/reference-implementation/lib/readable-stream.js#L125):
```js
// Using reader and writer, read all chunks from this and write them to dest
// - Backpressure must be enforced
// - Shutdown must stop all activity
function pipeLoop() {
currentWrite = Promise.resolve();
if (shuttingDown === true) {
return Promise.resolve();
}
return writer._readyPromise.then(() => {
return ReadableStreamDefaultReaderRead(reader).then(({ value, done }) => {
if (done === true) {
return;
}
currentWrite = WritableStreamDefaultWriterWrite(writer, value).catch(() => {});
});
})
.then(pipeLoop);
}
```
Removing the first line (`currentWrite = Promise.resolve()`) solves the problem, which I believe is a correct fix for the problem. When `pipeLoop` recurses through `.then(pipeLoop)`, only the read has resolved, and `currentWrite` is still pending. Therefore, `currentWrite` **should not** be reset when starting a new iteration, it should only be replaced by a new write (which is guaranteed to resolve after the previous write).
--
You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/issues/882
Received on Wednesday, 14 February 2018 22:55:29 UTC