Re: [whatwg/streams] ReadableStream.from(asyncIterable) (#1083)

@MattiasBuelens commented on this pull request.



> + 1. Let |startAlgorithm| be the following steps:
+  1. Set |iteratorRecord| to ? [$GetIterator$](|asyncIterable|, async).
+ 1. Let |pullAlgorithm| be the following steps:
+  1. Let |nextResult| be [$IteratorNext$](|iteratorRecord|).
+  1. If |nextResult| is an abrupt completion, return [=a promise rejected with=]
+     |nextResult|.\[[Value]].
+  1. Let |nextPromise| be [=a promise resolved with=] |nextResult|.\[[Value]].
+  1. Return the result of [=reacting=] to |nextPromise| with the following fulfillment steps,
+     given the argument |iterResult|:
+   1. If [$Type$](|iterResult|) is not Object, throw a {{TypeError}}.
+   1. Let |done| be ? [$IteratorComplete$](|iterResult|).
+   1. If |done| is true:
+    1. Perform ! [$ReadableStreamDefaultControllerClose$](stream.[=ReadableStream/[[controller]]=]).
+   1. Otherwise:
+    1. Let |value| be ? [$IteratorValue$](|iterResult|).
+    1. Perform ! [$ReadableStreamDefaultControllerEnqueue$](stream.[=ReadableStream/[[controller]]=],

This may throw if `strategy.size()` throws or returns an invalid chunk size. We need to catch this error, and then close the async iterator.

> +  }
+  if (typeof func !== 'function') {
+    throw new TypeError();
+  }
+  return func;
+};
+
+exports.GetIterator = (obj, hint = 'sync', method) => {
+  assert(hint === 'sync' || hint === 'async');
+  if (method === undefined) {
+    if (hint === 'async') {
+      method = exports.GetMethod(obj, Symbol.asyncIterator);
+      if (method === undefined) {
+        const syncMethod = exports.GetMethod(obj, Symbol.iterator);
+        const syncIterator = exports.GetIterator(obj, 'sync', syncMethod);
+        return syncIterator; // TODO sync to async iterator

It looks like this might not be necessary for correctness? The `ReadableStream.from()` implementation already wraps all results from `next()` and `return()` in new promises, so we don't *really* need the sync-to-async adapter for that. 🤷 

> +  1. Let |nextResult| be [$IteratorNext$](|iteratorRecord|).
+  1. If |nextResult| is an abrupt completion, return [=a promise rejected with=]
+     |nextResult|.\[[Value]].
+  1. Let |nextPromise| be [=a promise resolved with=] |nextResult|.\[[Value]].
+  1. Return the result of [=reacting=] to |nextPromise| with the following fulfillment steps,
+     given the argument |iterResult|:
+   1. If [$Type$](|iterResult|) is not Object, throw a {{TypeError}}.
+   1. Let |done| be ? [$IteratorComplete$](|iterResult|).
+   1. If |done| is true:
+    1. Perform ! [$ReadableStreamDefaultControllerClose$](stream.[=ReadableStream/[[controller]]=]).
+   1. Otherwise:
+    1. Let |value| be ? [$IteratorValue$](|iterResult|).
+    1. Perform ! [$ReadableStreamDefaultControllerEnqueue$](stream.[=ReadableStream/[[controller]]=],
+       |value|).
+ 1. Let |cancelAlgorithm| be the following steps:
+  1. Let |returnMethod| be ? [$GetMethod$](|iteratorRecord|.\[[Iterator]], "return").

We can't use `?` inside `cancelAlgorithm`. It needs to return a rejected promise, instead of throwing.

> +  1. Let |nextResult| be [$IteratorNext$](|iteratorRecord|).
+  1. If |nextResult| is an abrupt completion, return [=a promise rejected with=]
+     |nextResult|.\[[Value]].
+  1. Let |nextPromise| be [=a promise resolved with=] |nextResult|.\[[Value]].
+  1. Return the result of [=reacting=] to |nextPromise| with the following fulfillment steps,
+     given the argument |iterResult|:
+   1. If [$Type$](|iterResult|) is not Object, throw a {{TypeError}}.
+   1. Let |done| be ? [$IteratorComplete$](|iterResult|).
+   1. If |done| is true:
+    1. Perform ! [$ReadableStreamDefaultControllerClose$](stream.[=ReadableStream/[[controller]]=]).
+   1. Otherwise:
+    1. Let |value| be ? [$IteratorValue$](|iterResult|).
+    1. Perform ! [$ReadableStreamDefaultControllerEnqueue$](stream.[=ReadableStream/[[controller]]=],
+       |value|).
+ 1. Let |cancelAlgorithm| be the following steps:
+  1. Let |returnMethod| be [$GetMethod$](|iteratorRecord|.\[[Iterator]], "`return`").

It would be nicer to use [`AsyncIteratorClose()`](https://tc39.es/ecma262/#sec-asynciteratorclose) instead, but I don't think that's possible since it has an [`Await()`](https://tc39.es/ecma262/#await) in the middle. 😕

> +  1. Return the result of [=reacting=] to |nextPromise| with the following fulfillment steps,
+     given the argument |iterResult|:
+   1. If [$Type$](|iterResult|) is not Object, throw a {{TypeError}}.
+   1. Let |done| be ? [$IteratorComplete$](|iterResult|).
+   1. If |done| is true:
+    1. Perform ! [$ReadableStreamDefaultControllerClose$](stream.[=ReadableStream/[[controller]]=]).
+   1. Otherwise:
+    1. Let |value| be ? [$IteratorValue$](|iterResult|).
+    1. Perform ! [$ReadableStreamDefaultControllerEnqueue$](stream.[=ReadableStream/[[controller]]=],
+       |value|).
+ 1. Let |cancelAlgorithm| be the following steps:
+  1. Let |returnMethod| be ? [$GetMethod$](|iteratorRecord|.\[[Iterator]], "return").
+  1. If |returnMethod| is undefined, return [=a promise resolved with=] undefined.
+  1. Let |returnResult| be ? [$Call$](|returnMethod|, |iteratorRecord|.\[[Iterator]]).
+  1. Return [=a promise resolved with=] |returnResult|.
+ 1. Set |stream| to ! [$CreateReadableStream$](|startAlgorithm|, |pullAlgorithm|, |cancelAlgorithm|).

Hmm, good point.

Would you prefer moving the current *startAlgorithm* steps to the "main steps", and keep the `!` here? The result should the same, I think.

> +  1. Return the result of [=reacting=] to |nextPromise| with the following fulfillment steps,
+     given the argument |iterResult|:
+   1. If [$Type$](|iterResult|) is not Object, throw a {{TypeError}}.
+   1. Let |done| be ? [$IteratorComplete$](|iterResult|).
+   1. If |done| is true:
+    1. Perform ! [$ReadableStreamDefaultControllerClose$](stream.[=ReadableStream/[[controller]]=]).
+   1. Otherwise:
+    1. Let |value| be ? [$IteratorValue$](|iterResult|).
+    1. Perform ! [$ReadableStreamDefaultControllerEnqueue$](stream.[=ReadableStream/[[controller]]=],
+       |value|).
+ 1. Let |cancelAlgorithm| be the following steps:
+  1. Let |returnMethod| be ? [$GetMethod$](|iteratorRecord|.\[[Iterator]], "return").
+  1. If |returnMethod| is undefined, return [=a promise resolved with=] undefined.
+  1. Let |returnResult| be ? [$Call$](|returnMethod|, |iteratorRecord|.\[[Iterator]]).
+  1. Return [=a promise resolved with=] |returnResult|.
+ 1. Set |stream| to ! [$CreateReadableStream$](|startAlgorithm|, |pullAlgorithm|, |cancelAlgorithm|).

Ah, right, that would also fix the edge case where `[Symbol.asyncIterator]()` returns a promise. I'll move those steps out. 🙂

> +     given the argument |iterResult|:
+   1. If [$Type$](|iterResult|) is not Object, throw a {{TypeError}}.
+   1. Let |done| be ? [$IteratorComplete$](|iterResult|).
+   1. If |done| is true:
+    1. Perform ! [$ReadableStreamDefaultControllerClose$](stream.[=ReadableStream/[[controller]]=]).
+   1. Otherwise:
+    1. Let |value| be ? [$IteratorValue$](|iterResult|).
+    1. Perform ! [$ReadableStreamDefaultControllerEnqueue$](stream.[=ReadableStream/[[controller]]=],
+       |value|).
+ 1. Let |cancelAlgorithm| be the following steps:
+  1. Let |returnMethod| be ? [$GetMethod$](|iteratorRecord|.\[[Iterator]], "return").
+  1. If |returnMethod| is undefined, return [=a promise resolved with=] undefined.
+  1. Let |returnResult| be ? [$Call$](|returnMethod|, |iteratorRecord|.\[[Iterator]]).
+  1. Return [=a promise resolved with=] |returnResult|.
+ 1. Set |stream| to ! [$CreateReadableStream$](|startAlgorithm|, |pullAlgorithm|, |cancelAlgorithm|).
+ 1. Return |stream|.

I need *stream* to call `ReadableStreamDefaultControllerEnqueue` and `ReadableStreamDefaultControllerClose` in *pullAlgorithm*... 😕 

-- 
You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/pull/1083#pullrequestreview-527623440

Received on Tuesday, 10 November 2020 22:16:50 UTC