[whatwg/streams] Performance optimization: `ReadableStreamDefaultReader.prototype.readMany` (Issue #1236)

Some sources enqueue many small chunks, but `ReadableStreamDefaultReader.prototype.read` only returns one chunk at a time. `ReadableStreamDefaultReader.prototype.read` has non-trivial overhead (at least one tick per call)

When the developer knows they want to read everything in the queue in one call, it is faster to ask for all the values directly instead of reading one value at a time. 

I implemented this in [Bun](https://bun.sh). 

This led to a > 30% performance improvement to server-side rendering React via `new Response(await renderToReadableStream(reactElement)).arrayBuffer()`

Before:
![image](https://user-images.githubusercontent.com/709451/173469385-1842abea-060d-4357-b8a0-389a6287d3a6.png)

After: 
![image](https://user-images.githubusercontent.com/709451/173469446-f5c6a944-76f7-429e-806d-cc63c0179f28.png)

Note: these numbers are out of date, bun's implementation is faster now - but the delta above can directly be attributed to `ReadableStreamDefaultReader.prototype.readMany`

<details>

<summary>Here is an example implementation that uses JavaScriptCore builtins</summary>

```js
function readMany()
{
    "use strict";

    if (!@isReadableStreamDefaultReader(this))
        @throwTypeError("ReadableStreamDefaultReader.readMany() should not be called directly");

    const stream = @getByIdDirectPrivate(this, "ownerReadableStream");
    if (!stream)
        @throwTypeError("readMany() called on a reader owned by no readable stream");

    const state = @getByIdDirectPrivate(stream, "state");
    @putByIdDirectPrivate(stream, "disturbed", true);
    if (state === @streamClosed)
        return {value: [], size: 0, done: true};
    else if (state === @streamErrored) {
        throw @getByIdDirectPrivate(stream, "storedError");
    }

    
    var controller = @getByIdDirectPrivate(stream, "readableStreamController");

    const content = @getByIdDirectPrivate(controller, "queue").content;
    var size = @getByIdDirectPrivate(controller, "queue").size;
    var values = content.toArray(false);
    var length = values.length;

    if (length > 0) {

        if (@isReadableByteStreamController(controller)) {
            for (var i = 0; i < value.length; i++) {
                const buf = value[i];
                if (!(@ArrayBuffer.@isView(buf) || buf instanceof @ArrayBuffer)) {
                    value[i] = new @Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength);
                }
            }
        }
        
        @resetQueue(@getByIdDirectPrivate(controller, "queue"));

        if (@getByIdDirectPrivate(controller, "closeRequested"))
            @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
        else if (@isReadableStreamDefaultController(controller)) 
            @readableStreamDefaultControllerCallPullIfNeeded(controller);
        else if (@isReadableByteStreamController(controller))
            @readableByteStreamControllerCallPullIfNeeded(controller);

        return @createFulfilledPromise({value: values, size, done: false});
    }

    var onPullMany = (result) => {
        if (result.done) {
            return @createFulfilledPromise({value: [], size: 0, done: true});
        }
        var controller = @getByIdDirectPrivate(stream, "readableStreamController");
        
        var queue = @getByIdDirectPrivate(controller, "queue");
        var value = [result.value].concat(queue.content.toArray(false));

        if (@isReadableByteStreamController(controller)) {
            for (var i = 0; i < value.length; i++) {
                const buf = value[i];
                if (!(@ArrayBuffer.@isView(buf) || buf instanceof @ArrayBuffer)) {
                    value[i] = new @Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength);
                }
            }
        }
        var size = queue.size;
        @resetQueue(queue);

        if (@getByIdDirectPrivate(controller, "closeRequested"))
            @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
        else if (@isReadableStreamDefaultController(controller)) 
            @readableStreamDefaultControllerCallPullIfNeeded(controller);
        else if (@isReadableByteStreamController(controller))
            @readableByteStreamControllerCallPullIfNeeded(controller);
        

        
        return @createFulfilledPromise({value: value, size: size, done: false});
    };
    
    var pullResult = controller.@pull(controller);
    if (pullResult && @isPromise(pullResult)) {
        return pullResult.@then(onPullMany);
    }

    return onPullMany(pullResult);
}
```






-- 
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/issues/1236
You are receiving this because you are subscribed to this thread.

Message ID: <whatwg/streams/issues/1236@github.com>

Received on Tuesday, 14 June 2022 00:48:20 UTC