- From: Vuk <notifications@github.com>
- Date: Mon, 19 Aug 2024 01:07:22 -0700
- To: whatwg/streams <streams@noreply.github.com>
- Cc: Subscribed <subscribed@noreply.github.com>
- Message-ID: <whatwg/streams/issues/1323@github.com>
### What is the issue with the Streams Standard? This is actually more of a question than an issue, so sorry in advance. I want to know how to handle backpressure for the ReadableSide of a transform stream. For example I am trying to implement a replace text functionality, where I replace a string of text coming from a ReadableStream with text coming from a second Readable stream, so far I couldn't find any example for this using whatwg streams. This is pseudo code, so for simplicity I'm pretending like the text to replace won't span chunk boundaries ```javascript class Transformer { async transform(chunk, controller) { const foundTextIndex = chunk.indexOf('text to replace'); // Enqueue text before the part that needs to be replaced controller.enqueue(chunk.slice(0, foundTextIndex)); // Enqueue the text from the second source // this is the part I'm not sure for which how to handle if we are adding // chunks to the readable sides internal queue even if it's full for await (const chunk of streamForReplacement) { controller.enqueue(chunk.slice(0, foundTextIndex)); console.log(chunk); } // Enqueue text after replacement controller.enqueue(chunk.slice(foundTextIndex + 'text to replace'.length)); } } ``` One thing I saw thanks to good old `chatGTP` was to use `Promise.resolve()` to wait for the next event loop tick in case the `desiredSize` was negative. For example ```javascript class Transformer { async transform(chunk, controller) { const foundTextIndex = chunk.indexOf('text to replace'); // Enqueue text before the part that needs to be replaced controller.enqueue(chunk.slice(0, foundTextIndex)); // Enqueue the text from the second source // this is the part I'm not sure for which how to handle if we are adding // chunks to the readable sides internal queue even if it's full for await (const chunk of streamForReplacement) { while (controller.desiredSize <= 0) { await Promise.resolve(); // wait for next tick to check if we can enqueue more chunks } controller.enqueue(chunk.slice(0, foundTextIndex)); } // Enqueue text after replacement controller.enqueue(chunk.slice(foundTextIndex)); } } ``` I'm not sure if doing `Promise.resolve();` is valid, as I couldn't find any example dealing with this problem, and I want to know if there are better alternatives? -- Reply to this email directly or view it on GitHub: https://github.com/whatwg/streams/issues/1323 You are receiving this because you are subscribed to this thread. Message ID: <whatwg/streams/issues/1323@github.com>
Received on Monday, 19 August 2024 08:07:26 UTC