- From: Adam Rice <notifications@github.com>
- Date: Fri, 23 Aug 2024 04:10:44 -0700
- To: whatwg/streams <streams@noreply.github.com>
- Cc: Subscribed <subscribed@noreply.github.com>
- Message-ID: <whatwg/streams/issues/1323/2306863082@github.com>
Probably something like StackOverflow is a more appropriate forum for this kind of question, but I will try to answer anyway.
The easiest "solution" is to ignore the whole problem and assume that `streamForReplacement` will always be a reasonably small size. Your `transform()` method won't get called again until all the data you enqueued is used up, so it will mostly work.
Depending on what your consumer is
```js
await Promise.resolve();
```
may not be sufficient to let backpressure be cleared. It doesn't give the browser the opportunity to handle any events from outside JavaScript. A portable approach that will let the browser get work done is
```js
await new Promise(resolve => setTimeout(resolve, 0));
```
Using a non-zero delay on setTimeout is even better if you can afford to wait a bit before you start to write again, because it will avoid consuming 100% CPU and running down your users' batteries.
In this case, abandoning TransformStream altogether may be the best option. `pipeThrough` can accept any pair of a ReadableStream and WritableStream, and writing the transform yourself will give you maximum control.
```js
function MyTransform() {
let pullCalled;
let pullCalledResolver;
function clearPullCalled() {
pullCalled = new Promise(resolve => { pullCalledResolver = resolve; });
}
clearPullCalled();
let controller;
let pipePromise;
let identityTransform = new TransformStream();
function startPiping() {
const backpressureSniffingReadable = new ReadableStream({
start(c) {
controller = c;
},
pull() {
pullCalledResolver();
}
}, { highWaterMark: 0 });
controller = new AbortController();
pipePromise = backpressureSniffingReadable.pipeTo(
identityTransform.writable,
{signal: controller.signal, preventAbort: true});
}
startPiping();
const readable = identityTransform.readable;
const writable = new WritableStream({
async write(chunk, controller) {
await pullCalled;
clearPullCalled();
const foundTextIndex = chunk.indexOf('text to replace');
if (foundTextIndex === -1) {
controller.enqueue(chunk);
return;
}
// Enqueue text before the part that needs to be replaced
controller.enqueue(chunk.slice(0, foundTextIndex));
// Stop the current pipe.
controller.signal();
await pipePromise.catch({} => {});
await streamForReplacement.pipeTo(identityTransform.writable,
{ preventClose: true });
// Restart our pipe.
startPiping();
},
close() {
controller.close();
},
abort(err) {
controller.error(err);
}
});
return {readable, writable};
}
```
This is all untested but hopefully you get the idea.
I assume you know that if `'text to replace'` is split between two chunks this won't work. If you need to handle that, see https://streams.spec.whatwg.org/#example-ts-lipfuzz for an example.
--
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/issues/1323#issuecomment-2306863082
You are receiving this because you are subscribed to this thread.
Message ID: <whatwg/streams/issues/1323/2306863082@github.com>
Received on Friday, 23 August 2024 11:10:48 UTC