[whatwg/streams] 'Splitter' stream? (#1030)

I'm needing a 'splitter' WritableStream that divvies input based on some criteria, sequentially generating multiple output ReadableStreams, each created by a factory function that gets called when the input calls for it. 

The specific scenario is a log file splitter, which splits a readable based on an input pattern such that anytime the pattern is seen, subsequent chunks are available from a new ReadableStream that after some transforms ultimately pipesTo a writable file stream (output1.log, output2.log, etc.).

A usage example of the API I had in mind looks something like this: 

```
let counter = 0;
readable.pipeTo(new WritableStream(new StreamSplitter(
  (chunk: string) => { return string.includes(thePattern); },
  () => {
    counter++;
    return new ReadableStream()
        .pipeThrough(createMyRemainingTransforms())
        .pipeTo(createMyWritableFileStream(counter);
  }
)));
```

First, is there a better approach to this? And second, I'm scratching my head implementing StreamSplitter. My thought was to create it something like this, but I'm stuck on how to create a writable/readable pair such that writes to a writer are automatically forwarded to the readable:

```
export class StreamSplitter implements UnderlyingSink {
    
    private _writable: WritableStream;

    constructor(readonly shouldSplit: (chunk:any) => boolean, readonly createReadable: () => ReadableStream)
    {
    }
    
    private createNewReadable() {
        if (this._writable) {
            this._writable.getWriter().close();
        }
       const readable = this.createReadable();
        // ???
        // How do we create a writable and readable pair
        // such that every write to the writable goes to the readable?

    }

    start(controller: any) {
        this.createNewReadable();
    }

    write(chunk: any, controller: any) {
        if (this.shouldSplit(chunk)) {
            this.createNewReadable();
        }
        this._writable.getWriter().write(chunk);
    }

    close() {
        this._writable.getWriter().close();
    }

    abort(error: any) {
        this._writable.getWriter().abort();
    }
}
```

Any thoughts? Love the piping metaphor, but still very confused.

-- 
You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/issues/1030

Received on Sunday, 8 March 2020 16:30:21 UTC