Re: [whatwg/streams] Aborting a stream should wait for pending writes (#619)

domenic commented on this pull request.



> @@ -158,7 +162,6 @@ function WritableStreamError(stream, e) {
   }
 
   stream._state = 'errored';

Can we keep this line together with assigning storedError? It seems bad for them to get out of sync.

> @@ -173,11 +176,26 @@ function WritableStreamFinishClose(stream) {
   defaultWriterClosedPromiseResolve(stream._writer);
 }
 
-function WritableStreamFulfillWriteRequest(stream) {
-  assert(stream._writeRequests.length > 0);
+function WritableStreamRejectUnresolvedPromises(stream) {
+  const state = stream._state;
+  assert(state === 'writable' || state === 'closing' || state === 'errored');

This assertion might be tightened if we move the state assignment as discussed above.

> @@ -20,6 +20,12 @@ class WritableStream {
     // producer without waiting for the queued writes to finish.
     this._writeRequests = [];
 
+    // Write requests are removed from _writeRequests when write() is called on the underlying sink. This prevents
+    // them from being erroneously rejected on error. If a write() call is pending, the request is stored here.
+    this._pendingWriteRequest = undefined;
+
+    this._pendingCloseRequest = undefined;

I wonder if this could be consolidated with writer's closedPromise...

> @@ -173,11 +176,26 @@ function WritableStreamFinishClose(stream) {
   defaultWriterClosedPromiseResolve(stream._writer);
 }
 
-function WritableStreamFulfillWriteRequest(stream) {
-  assert(stream._writeRequests.length > 0);
+function WritableStreamRejectUnresolvedPromises(stream) {

I wonder if there's a better name for this that indicates clearly that it will reject write requests + pending close requests + writer closed promise, but not pending write requests.

Also, can it maybe assert that there is no pending write request?

> +  const ws = new WritableStream({
+    write() {
+      return flushAsyncEvents();
+    }
+  });
+  const writer = ws.getWriter();
+  return writer.ready.then(() => {
+    const writePromise = writer.write('a');
+    writer.abort(new Error());
+    let closedResolved = false;
+    return Promise.all([
+      writePromise.then(() => assert_false(closedResolved, '.closed should not resolve before write()')),
+      promise_rejects(t, new TypeError(), writer.closed, '.closed should reject')
+          .then(() => {
+            closedResolved = true;
+          })]);

Style nit: I think this would be cleaner with `.then` on the preceding line and `]);` on its own line.

> +      return flushAsyncEvents();
+    }
+  });
+  const writer = ws.getWriter();
+  return writer.ready.then(() => {
+    const writePromise = writer.write('a');
+    writer.abort(new Error());
+    let closedResolved = false;
+    return Promise.all([
+      writePromise.then(() => assert_false(closedResolved, '.closed should not resolve before write()')),
+      promise_rejects(t, new TypeError(), writer.closed, '.closed should reject')
+          .then(() => {
+            closedResolved = true;
+          })]);
+  });
+}, '.closed should not resolve before write()');

before fulfilled write()

> +    const writePromise = writer.write('a');
+    writer.abort(new Error());
+    let closedResolved = false;
+    return Promise.all([
+      writePromise.then(() => assert_false(closedResolved, '.closed should not resolve before write()')),
+      promise_rejects(t, new TypeError(), writer.closed, '.closed should reject')
+          .then(() => {
+            closedResolved = true;
+          })]);
+  });
+}, '.closed should not resolve before write()');
+
+promise_test(t => {
+  const ws = new WritableStream({
+    write() {
+      return Promise.reject(new Error());

Use `error1` here and below?

> +
+promise_test(t => {
+  const ws = new WritableStream({
+    write() {
+      return Promise.reject(new Error());
+    }
+  });
+  const writer = ws.getWriter();
+  return writer.ready.then(() => {
+    const writePromise = writer.write('a');
+    writer.abort(new Error());
+    let closedResolved = false;
+    return Promise.all([
+      promise_rejects(t, new Error(), writePromise, 'write() should reject')
+          .then(() => assert_false(closedResolved, '.closed should not resolve before write()')),
+      promise_rejects(t, new TypeError(), writer.closed, '.closed should reject')

Hmm. I think the write promise should get the first crack at determining how the stream errors, even if abort() has been called. I guess this is related to https://github.com/whatwg/streams/issues/617#issuecomment-262348791.

> +    writer.abort(new Error());
+    let closedResolved = false;
+    return Promise.all([
+      promise_rejects(t, new Error(), writePromise, 'write() should reject')
+          .then(() => assert_false(closedResolved, '.closed should not resolve before write()')),
+      promise_rejects(t, new TypeError(), writer.closed, '.closed should reject')
+          .then(() => {
+            closedResolved = true;
+          })]);
+  });
+}, '.closed should not resolve before rejected write()');
+
+promise_test(t => {
+  const ws = new WritableStream({
+    write() {
+      return delay(0);

why not flushAsyncEvents?

> +      promise_rejects(t, new TypeError(), writer.closed, '.closed should reject')
+          .then(() => {
+            closedResolved = true;
+          })]);
+  });
+}, '.closed should not resolve before rejected write()');
+
+promise_test(t => {
+  const ws = new WritableStream({
+    write() {
+      return delay(0);
+    }
+  }, new CountQueuingStrategy(4));
+  const writer = ws.getWriter();
+  return writer.ready.then(() => {
+    const resolveOrder = [];

settlementOrder

> +    const resolveOrder = [];
+    return Promise.all([
+      writer.write('1').then(() => resolveOrder.push(1)),
+      promise_rejects(t, new TypeError(), writer.write('2'), 'first queued write should be rejected')
+          .then(() => resolveOrder.push(2)),
+      promise_rejects(t, new TypeError(), writer.write('3'), 'second queued write should be rejected')
+          .then(() => resolveOrder.push(3)),
+      writer.abort(new Error())
+    ]).then(() => assert_array_equals([1, 2, 3], resolveOrder, 'writes should be satisfied in order'));
+  });
+}, 'writes should be satisfied in order when aborting');
+
+promise_test(t => {
+  const ws = new WritableStream({
+    write() {
+      return Promise.reject(new Error());

`error1` here and below

> +  });
+}, 'writes should be satisfied in order when aborting');
+
+promise_test(t => {
+  const ws = new WritableStream({
+    write() {
+      return Promise.reject(new Error());
+    }
+  }, new CountQueuingStrategy(4));
+  const writer = ws.getWriter();
+  return writer.ready.then(() => {
+    const resolveOrder = [];
+    return Promise.all([
+      promise_rejects(t, new Error(), writer.write('1'), 'pending write should be rejected')
+          .then(() => resolveOrder.push(1)),
+      promise_rejects(t, new TypeError(), writer.write('2'), 'first queued write should be rejected')

As before it makes more sense for me that the write() error sets the state, not abort().

-- 
You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/pull/619#pullrequestreview-9778349

Received on Tuesday, 22 November 2016 23:25:57 UTC