Re: [whatwg/streams] Handeling backpressure on the Readable side of a transform stream (Issue #1323)

Probably something like StackOverflow is a more appropriate forum for this kind of question, but I will try to answer anyway.

The easiest "solution" is to ignore the whole problem and assume that `streamForReplacement` will always be a reasonably small size. Your `transform()` method won't get called again until all the data you enqueued is used up, so it will mostly work.

Depending on what your consumer is
```js
await Promise.resolve();
```
may not be sufficient to let backpressure be cleared. It doesn't give the browser the opportunity to handle any events from outside JavaScript. A portable approach that will let the browser get work done is
```js
await new Promise(resolve => setTimeout(resolve, 0));
```
Using a non-zero delay on setTimeout is even better if you can afford to wait a bit before you start to write again, because it will avoid consuming 100% CPU and running down your users' batteries.

In this case, abandoning TransformStream altogether may be the best option. `pipeThrough` can accept any pair of a ReadableStream and WritableStream, and writing the transform yourself will give you maximum control.

```js
function MyTransform() {
  let pullCalled;
  let pullCalledResolver;
  function clearPullCalled() {
    pullCalled = new Promise(resolve => { pullCalledResolver = resolve; });
  }
  clearPullCalled();
  let controller;
  let pipePromise;
  let identityTransform = new TransformStream();
  function startPiping() {
    const backpressureSniffingReadable = new ReadableStream({
      start(c) {
        controller = c;
      },
      pull() {
        pullCalledResolver();
      }
    }, { highWaterMark: 0 });
    controller = new AbortController();
    pipePromise = backpressureSniffingReadable.pipeTo(
        identityTransform.writable,
        {signal: controller.signal, preventAbort: true});
  }
  startPiping();
  const readable = identityTransform.readable;
  const writable = new WritableStream({
    async write(chunk, controller) {
      await pullCalled;
      clearPullCalled();

      const foundTextIndex = chunk.indexOf('text to replace');

      if (foundTextIndex === -1) {
        controller.enqueue(chunk);
        return;
      }

      // Enqueue text before the part that needs to be replaced
      controller.enqueue(chunk.slice(0, foundTextIndex));

      // Stop the current pipe.
      controller.signal();
      await pipePromise.catch({} => {});

      await streamForReplacement.pipeTo(identityTransform.writable,
                                        { preventClose: true });

      // Restart our pipe.
      startPiping();
    },

    close() {
      controller.close();
    },

    abort(err) {
      controller.error(err);
    }
  });
  return {readable, writable};
}
```
This is all untested but hopefully you get the idea.

I assume you know that if `'text to replace'` is split between two chunks this won't work. If you need to handle that, see https://streams.spec.whatwg.org/#example-ts-lipfuzz for an example.

-- 
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/issues/1323#issuecomment-2306863082
You are receiving this because you are subscribed to this thread.

Message ID: <whatwg/streams/issues/1323/2306863082@github.com>

Received on Friday, 23 August 2024 11:10:48 UTC