- From: Doctor <notifications@github.com>
- Date: Thu, 07 Aug 2025 21:01:06 -0700
- To: whatwg/streams <streams@noreply.github.com>
- Cc: Subscribed <subscribed@noreply.github.com>
- Message-ID: <whatwg/streams/issues/1350@github.com>
BlackAsLight created an issue (whatwg/streams#1350) ### What problem are you trying to solve? The chunk of a stream can contain any type of value. Some of these values may need to call a specific method on them for them to be properly cleaned up. When a ReadableStream or a WritableStream is cancelled prematurely they both have a `cancel` method that will be called so they can clean up any resources they've locked down. BUT, this is insufficient for when the chunks themselves contain resources that need to be manually freed. For example, a readable stream may produce chunks of objects that have more readable streams on themselves. If these chunks are sitting in the internal queue waiting for the next `pull` method to be called when `cancel` is, then the readable streams on the chunks won't have their cancel methods be called. Below is an example of this, where three chunks have been enqueued, but only the first was able to be cleaned up. What happens to the other two? We don't know. ```ts function createSource(i: number): ReadableStream<Uint8Array> { return new ReadableStream({ start(_controller) { console.log(`${i} Started!`); }, pull(controller) { controller.enqueue(new Uint8Array(100)); }, cancel() { console.log(`${i} Cancelled!`); }, }); } type Input = { path: string; readable: ReadableStream<Uint8Array> }; ReadableStream.from<Input>([ { path: "potato", readable: createSource(1) }, { path: "cake", readable: createSource(2) }, { path: "carrot", readable: createSource(3) }, ]) .pipeThrough( new TransformStream<Input, unknown>({ transform(chunk, controller) { const reader = chunk.readable.getReader(); try { // Has an error while processing. throw "STOP"; } catch (e) { reader.cancel(e); controller.error(e); } }, }), ) .pipeTo(new WritableStream()) .catch(() => {}); ``` ### What solutions exist today? The only reliable way currently to clean up these resources would be to catch the error at the transform step to stop it reaching the next readable up, and manually ask it for each chunk to manually cancel their readables. Because we don't know how many are in the queue. In the above example, all three sources are in the queue, but realistically we don't know how many are in the queue or how many more chunks there are to come. ### How would you solve it? The easiest way to solve it would be to provide the internal queue upon cancellation so we can also clean up any resources that may exist in it. ```ts { async cancel(reason: any, internalQueue: Input[]): Promise<void> { for (const chunk of internalQueue) { await chunk.readable.cancel(reason); } } } ``` ### Anything else? TransformStreams don't have a `cancel` method, so if their step have resources that need releasing, then one can't use the `TransformStream` constructor to create their transform stream and must instead shimmy a WritableStream and ReadableStream together. Although from my experience, one would just use a `TransformStream` internally still merely to convert a writable to a readable. -- Reply to this email directly or view it on GitHub: https://github.com/whatwg/streams/issues/1350 You are receiving this because you are subscribed to this thread. Message ID: <whatwg/streams/issues/1350@github.com>
Received on Friday, 8 August 2025 04:01:10 UTC