[whatwg/streams] Proposal: ReadableStream tee() backpressure (Issue #1235)

The current definition of the `ReadableStreamTee` abstract operation includes an inherent flaw when it comes to backpressure signaling ... specifically, there isn't any. Whether `stream.[[controller]]` implements `ReadableByteStreamController` or `ReadableStreamDefaultController`, the issue here is the same so I'll describe the issue in general terms.

## Current behavior

We start with an original `ReadableStream` `RS0`. When `tee()` is called on `RS0`, a single `ReadableStreamDefaultReader` `R0` is created and `RS0` is marked as locked. That default reader `R1` is then shared by two new `ReadableStream`s `RS1` and `RS2`:

When either `RS1` or `RS2` initiates a pull on their respective underlying sources, then shared reader `R0` will initiate a read on `RS0`. Then that read completes the resulting data, if any, is enqueued to both `RS1` and `RS2`. Unfortunately, if one of those does not have an active reader, or is being read at a significantly slower rate than the other, the data enqueued accumulates in the unread branch's internal queue without any backpressure signaling.

```mermaid
flowchart TD;
  R1 --> U1[[Data flows]]
  RS2 --> U2[(Data accumulates)]
  U2 -- no backpressure --x RS2
  RS1 --> R1
  R0 --> RS1
  R0 --> RS2
  RS0 --> R0
```

While the origin stream `RS0` may implement backpressure, that is relieved when only one of the two child branches initiates a `read()`. From `RS0`'s perspective, data is flowing because something is calling read, even though the data is accumulating in the downstream unconsumed branch.

For browser-based uses of `ReadableStream`, this is often acceptable, as the number of streams are limited and the impact is limited to a single user environment. The memory accumulation has low impact. For server and edge environments, particularly multi-tenant environments, the impact is much more significant at scale. We can and should do better.

## Requirements

First, we want a solution to introducing `tee()` backpressure that introduces no observable breaking changes to the streams specification public API. The solution, however, does require changes to the streams internals in a number of important ways that are described below. Importantly, all current implementations of `tee()`, as far as the observable behavior is concerned, remains precisely as it is today.

Second, we want a mechanism that allows for reliably signaling backpressure up through an entire tree of tee branches so that buffering at each step can be minimized or avoided entirely. Specifically, backpressure must be controlled by the branch that is consuming data the slowest as opposed to the branch consuming data the fastest.

Third, our solution must not introduce any changes to the underlying source API.

## `ReadableStreamNonExclusiveReader`

There are currently two kinds of readers: `ReadableStreamDefaultReader` and `ReadableStreamBYOBReader`. For the sake of this discussion, we are going to generically refer to these as "exclusive readers".

This is a one-to-one relationship between a `ReadableStream` an an exclusive reader. When a user calls `getReader()` or users a reader constructor, the `ReadableStream` is locked to that one reader instance. There is no other way to consume the `ReadableStream` once locked. Subsequent calls to get a reader will fail, as will subsequent requests to pipe or tee the `ReadableStream`. This property is precisely why we run into the buffering issue we are seeking to address here.

This proposal seeks to a new kind of non-exclusive reader to the streams standard: `ReadableStreamNonExclusiveReader` (we will call this an N-E-R reader for short). As suggested by the name, `ReadableStream` would be modified to allow *multiple* N-E-R reader to be attached to it simultaneously.

In general, the locking mechanism of a `ReadableStream` does not change much under this proposal. If a readable is piped, or attached to an exclusive reader, or tee'd, the `ReadableStream` is locked such that it cannot be otherwise consumed (exactly the way it is now). When an N-E-R reader is attached to the `ReadableStream`, the stream is still marked as locked such that it cannot be subsequently piped, tee'd, or have an exclusive reader attached to it -- *but it will be possible to attach additional N-E-R readers to the same `ReadableStream`. So long as there is at least one N-E-R reader attached, the readable remains locked. Once all N-E-R readers are released, the lock on the readable is released.

```mermaid
flowchart TD;
  A[ReadableStream] -- locked --> B[pipeTo]
  C[ReadableStream] -- locked --> D[tee]
  E[ReadableStream] -- locked --> F[exclusive reader]
  G[ReadableStream] -- locked --> H1[N-E-R reader]
  G[ReadableStream] -- locked --> H2[N-E-R reader]
```

The addition of an N-E-R reader changes the internal behavior of the `ReadableStream` queue in a couple of critical ways.

Currently, when a controller enqueues data, and there is no read operation pending, the data enqueued is pushed directly into the readable's internal queue, triggering the backpressure calculation to decrement the `desiredSize` reported by the controller by the calculated size of the enqueued chunk. If, when the controller enqueues the data, there is a pending read operation, and the internal buffer is empty, the chunk is used to immediately fulfill the read and the data is never added to the internal buffer. In this case, the `desiredSize` is never updated because no backpressure is applied.

When a readable is attached to multiple N-E-R readers, when a controller enqueues data, and there are no pending readers on any of the attached readers, the data is pushed into the queue and the `desiredSize` is decremented. If any, but not all, of the attached N-E-R readers have pending reads, the enqueued chunk is used to fulfill the pending reads *and* the chunk is added to the internal buffer waiting to be read by the remaining N-E-R readers. The `desiredSize` will be decremented by the calculated size. If all N-E-R readers have pending reads, and the buffer is empty, the chunk will never be added to the internal buffer.

While this does require a more complicated bookkeeping, the design ensures that the backpressured (signaled by `desiredSize`) is relieved only at the pace of the slowest attached N-E-R reader, while allowing other branches to continue consuming data as quickly as it is made available. It continues to be up to the underlying source implementation to determine whether or not to pay attention to the backpressure signal.

The mechanism supports default and BYOB reads, although BYOB reads are certainly more complicated with a N-E-R reader. When an N-E-R reader performs a BYOB read and provides a buffer for the underlying source to fill, once the controller's `byobRequest` is completed (by calling either `respond()` or `respondWithNewView()`, the data provided will be copied into the internal buffer of the `ReadableStream` pending reads from the remaining N-E-R readers. For those, BYOB reads will be treated the same as BYOB reads fulfilled from the queue. A faster reader can continue to submit new BYOB reads that will be passed on to the controller to fulfill if it chooses. The `desiredSize` backpressure signaling will still properly reflect the backpressure status, and is it up to the underlying source implementation to decide whether and how to handle the backpressure.

### Reading Modes

The `ReadableStreamNonExclusiveReader` adapts to the type of the underlying controller. If the controller is a `ReadableStreamDefaultController`, the N-E-R reader's `read()` method accepts no arguments and reads using the same semantics as the `ReadableStreamDefaultReader`. If the controller is a `ReadableByteStreamController`, the N-E-R reader's `read()` method accepts a `TypedArray` to be filled using the BYOB semantics.

### Queue bookkeeping

Each type of controller maintains an internal queue. When data is enqueued in the controller while there are no pending reads, that data is appended to the end of the queue. Reads occurring while there is data in the queue will cause that data to be removed from the queue on a first-in-first-out basis. When data is enqueued while there is a pending read, the read is just filled directly without ever adding the data to the queue.

With N-E-R readers, this accounting changes. Each N-E-R reader maintains its own individual cursor location in the queue. Data may be freed from the queue once all cursors advance beyond the position of that data in the queue. If there is currently data in the queue, there are any attached N-E-R readers whose cursor position is not at the end of the queue, or if at least one attached N-E-R does not have a pending read request, then data pushed into the controller is appended to the end of the queue, If all cursors are positioned at the end of the queue and all N-E-R reader have pending reads, and the queue is empty, then the data is never added to the queue.

### Acquiring a non-exclusive reader
The N-E-R reader would be used internally in the implementation of tee() (see the Re-defining tee() section that follows). But will also possible directly using either a constructor or the getReader() method.

```js
const readable = new ReadableStream();
const reader1 = readable.getReader({ mode: 'non-exclusive' });
console.log(readable.locked); // true
const reader2 = readable.getReader({ mode: 'non-exclusive' }); // Success!!
const reader3 = readable.getReader(); // Fails!!

const reader4 = new ReadableStreamNonExclusiveReader(readable); // Success!!
const reader5 = new ReadableStreamDefaultReader(readable); // Fails!!
```

## Redefining `tee()`

The `ReadableStream` `tee()` operation is modified such that each branch of the tee uses its own N-E-R reader as opposed to sharing a single exclusive reader.

```mermaid
flowchart TD;
  RS0 --> U2[(Data accumulates)]
  U2 -- signal backpressure --> RS0
  R3 --> U[[Data flows]]
  RS1 --> R3
  R1 --> RS1
  R2 --> RS2
  RS0 --> R1[[N-E-R 1]]
  RS0 --> R2[[N-E-R 2]]
```

As a further optimization that is secondary to backpressure, if the child `ReadableStream`s (`RS1` and `RS2` in the diagram) are themselves tee'd, then those are simply added as new N-E-R readers to the original `ReadableStream` `RS0`.

```mermaid
flowchart TD;
  R1 --> RS1
  R2 -.-> RS2
  RS2 -.-> R3[[N-E-R 3]]
  RS2 -.-> R4[[N-E-R 4]]
  RS0 --> R1[[N-E-R 1]]
  RS0 -.-> R2[[N-E-R 2]]
  RS0 --> R3[[N-E-R 3]]
  RS0 --> R4[[N-E-R 4]]
```

(In the above, `RS2` and `N-E-R 2` can each be safely garbage collected as they are no longer an active part of the tree)

With this design, data buffers in only a single location within the tree with proper backpressure signaling that is relieved only at the pace of the slowest reading branch. Faster branches can continue to consume data as quickly as the underlying source makes it available. As mentioned, the underlying source continues to choose whether or not to pay attention to the backpressure signaling.

The design has no observable API impact to existing `tee()` uses and no observable impact to underlying source implementations. There is no change to backpressure signaling. Existing methods of consuming the readable are unchanged. Having an N-E-R reader leaks no information about the presence or state of other attached N-E-R readers beyond observable memory usage metrics.

-- 
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/issues/1235
You are receiving this because you are subscribed to this thread.

Message ID: <whatwg/streams/issues/1235@github.com>

Received on Monday, 13 June 2022 14:38:23 UTC