- From: Vuk <notifications@github.com>
- Date: Mon, 19 Aug 2024 01:07:22 -0700
- To: whatwg/streams <streams@noreply.github.com>
- Cc: Subscribed <subscribed@noreply.github.com>
- Message-ID: <whatwg/streams/issues/1323@github.com>
### What is the issue with the Streams Standard?
This is actually more of a question than an issue, so sorry in advance.
I want to know how to handle backpressure for the ReadableSide of a transform stream.
For example I am trying to implement a replace text functionality, where I replace a string of text coming from a ReadableStream with text coming from a second Readable stream, so far I couldn't find any example for this using whatwg streams.
This is pseudo code, so for simplicity I'm pretending like the text to replace won't span chunk boundaries
```javascript
class Transformer {
async transform(chunk, controller) {
const foundTextIndex = chunk.indexOf('text to replace');
// Enqueue text before the part that needs to be replaced
controller.enqueue(chunk.slice(0, foundTextIndex));
// Enqueue the text from the second source
// this is the part I'm not sure for which how to handle if we are adding
// chunks to the readable sides internal queue even if it's full
for await (const chunk of streamForReplacement) {
controller.enqueue(chunk.slice(0, foundTextIndex));
console.log(chunk);
}
// Enqueue text after replacement
controller.enqueue(chunk.slice(foundTextIndex + 'text to replace'.length));
}
}
```
One thing I saw thanks to good old `chatGTP` was to use `Promise.resolve()` to wait for the next event loop tick in case the `desiredSize` was negative.
For example
```javascript
class Transformer {
async transform(chunk, controller) {
const foundTextIndex = chunk.indexOf('text to replace');
// Enqueue text before the part that needs to be replaced
controller.enqueue(chunk.slice(0, foundTextIndex));
// Enqueue the text from the second source
// this is the part I'm not sure for which how to handle if we are adding
// chunks to the readable sides internal queue even if it's full
for await (const chunk of streamForReplacement) {
while (controller.desiredSize <= 0) {
await Promise.resolve(); // wait for next tick to check if we can enqueue more chunks
}
controller.enqueue(chunk.slice(0, foundTextIndex));
}
// Enqueue text after replacement
controller.enqueue(chunk.slice(foundTextIndex));
}
}
```
I'm not sure if doing `Promise.resolve();` is valid, as I couldn't find any example dealing with this problem, and I want to know if there are better alternatives?
--
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/issues/1323
You are receiving this because you are subscribed to this thread.
Message ID: <whatwg/streams/issues/1323@github.com>
Received on Monday, 19 August 2024 08:07:26 UTC