Re: [streams] Support reading bytes into buffers allocated by user code on platforms where only async read is available (#253)

To list for bad points of (A), I'm writing examples of (A) (almost based on Domenic's PR #288), and I'm becoming fan of it ... Let me share how it's like in my mind:

Simple greedy consuming:

```es6
function consume(rs) {
  return new Promise((resolve, reject) => {
    function pump() {
      rs.read().then(value => {
        if (value === rs.constructor.EOS) {
          handleEOS();
          resolve();
        } else {
          process(value);
        }
      }, reason => {
        const translatedError = handleError(reason);
        reject(translatedError);
      }).catch(reject);
    }
    pump();
  });
}
```

Above works for both object stream and byte stream.

For byte stream, this also works for bring-your-own-buffer style reading.

```es6
function consume(rbs) {
  return new Promise((resolve, reject) => {
    function pump() {
      rbs.read(view).then(filledView => {
        if (filledView.byteLength === 0) {
          handleEOS();
          recycleBuffer(filledView);
          resolve();
        } else {
          handleData(filledView);
          recycleBuffer(filledView);
        }
      }, reason => {
        recycleBuffer(extractBuffer(reason));
        const translatedError = handleError(reason);
        reject(translatedError);
      }).catch(reject);
    }
    pump();
  });
}
```

If consume() sees `rs` becoming closed, or consume() is called on a closed stream, `filledView` with byteLength === 0 is returned.

To watch for any error on `rs` when some other things are not ready (e.g. dest not yet ready in `pipeTo` algorithm):

```es6
rs.closed.catch(reason => {
  const translatedError = handleError(reason);
  terminateTheAlgorithm(translatedError);
});
```

I agree (IIRC you said somewhere) that state can be eliminated for further API simplification if we sacrifice one microtask for probing state.

The following is examples of WritableStream but after making similar simplification on WritableStream like what we did for ReadableStream.

Backpressure caring push onto WritableStream:

```es6
function supply(ws) {
  return new Promise((resolve, reject) => {
    function handleErrorAndReject() {
      const translatedError = handleError(reason);
      reject(translatedError);
    }
    push() {
      const data = generateData();
      if (data === EOF) {
        ws.close().then(result => {
          handleCloseResult(result);
          resolve();
        }, handleErrorAndReject).catch(reject);
      } else {
        ws.write(data).then(result => {
          handleWriteResult(result);
          pushIfNoBackpressure();
        }, handleErrorAndReject).catch(reject);
      }
    }
    function pushIfNoBackpressure() {
      ws.ready.then(() => {
        push();
      }, handleErrorAndReject).catch(reject);
    }
    pushIfNoBackpressure();
  });
}
```

Backpressure caring push onto byte stream:

```es6
function supply(wbs) {
  return new Promise((resolve, reject) => {
    push() {
      const data = generateData();
      if (data === EOF) {
        wbs.close().then(result => {
          handleCloseResult(result);
          resolve();
        }, reason => {
          const translatedError = handleError(reason);
          reject(translatedError);
        }).catch(reject);
      } else {
        wbs.write(data).then(result => {
           recycleBuffer(extractBuffer(result));
           handleWriteResult(result);
           pushIfNoBackpressure();
        }, reason => {
          recycleBuffer(extractBuffer(reason));
          const translatedError = handleError(reason);
          reject(translatedError);
        }).catch(reject);
      }
    }
    function pushIfNoBackpressure() {
      wbs.ready.then(() => {
        push();
        pushIfNoBackpressure();
      }, reason => {
        const translatedError = handleError(reason);
        reject(translatedError);
      }).catch(reject);
    }
    pushIfNoBackpressure();
  });
}
```


---
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/issues/253#issuecomment-77389072

Received on Thursday, 5 March 2015 15:54:09 UTC