Re: [whatwg/streams] Utility buffer class for stream transformers (#1097)

> It looks like there's a lot of plumbing going on to turn the push-style `transform()` method into a pull-based `read()` method though (using an intermediate buffer). This is tricky to get right, especially if you need to consider backpressure. I'll have a think about it, see if there's a better way.

I think this might be a slightly better solution, still no idea how to consider backpressure though.

### PullTransformStream
```typescript
export class PullTransformStream {
  constructor(transform) {
    let buffer = new Buffer();

    this.readable = new ReadableStream({
      async pull(controller){
        let arr = await transform(buffer);
        if(arr === undefined) controller.close();
        else controller.enqueue(arr);
      },
      cancel(){
        this.writable.close();
      }
    });

    this.writable = new WritableStream({
      write(chunk){
        chunk.forEach(c => buffer.push(c));
        buffer.resolve();
      },
      close(){
        buffer.resolve();
      },
      abort(){
        buffer.reject();
      }
    });
  }
}
```

```typescript
export class Buffer extends Array {
  constructor(){
    this.resolve = () => {};
    this.reject = () => {};
  }

  async has(length){
    if(this.length > length) return true;

    await new Promise((resolve, reject) => {
      this.resolve = resolve;
      this.reject = reject;
    });

    return this.length > length;
  }
}
```

### Example
```typescript
export default class PreprocessTransformStream extends PullTransformStream {
  constructor(){
    super(async buffer => {
      if(!await buffer.has(1)) return;

      let c = buffer.shift();

      //replace form feed code point with a line feed
      if(c === "\u000C") return ["\u000A"];

      //replace any carriage return code points...
      if(c === "\u000D"){
        //...or pairs of carriage returns followed by a line feed...
        await buffer.has(2);
        if(buffer[0] === "\u000D" && buffer[1] === "\u000A"){
          buffer.splice(0, 2);
        }
        //...with a single line feed
        return ["\u000A"];
      }
      
      //replace any null or surrogate code points with a replacement character
      if(c === "\u0000" || (c >= "\uD800" && c <= "\uDFFF")){
        return ["\uFFFD"];
      }
      
      return [c];
    });
  }
}
```

> I think it's a bit too early to tell whether this will be useful enough to make it part of the platform. I suggest you start off by first writing it as a userland JavaScript library. 🙂

and sorry, I meant more of an example. I see many people doing my original trail of thought which is something along the line of
```typescript
class Transformation {
  readable: ReadableStream;

  constructor(readable: ReadableStream, writable: WritableStream){
    this.readable = readable;
    let writer = writable.getWriter();
    let token: Token;
    while(token = await this.consumeToken(readable)){
      writer.write([token]);
    }
  }

  async consumeToken(){
    this.consumeWhitespace();

    let [s1, s2] = this.readable.tee();
    let reader = s1.getReader();
    let {done, value} = await reader.read();

    if(done) return;

    switch(value){
      ...
    }
  }

  async consumeWhitespace(){
    let [s1, s2] = this.readable.tee();
    let reader = s1.getReader();
    let {done, value} = await reader.read();
    if(done || !isWhitespace(value)){
      this.readable = s2;
    } else {
      this.readable = s1;
      this.consumeWhitespace();
    }
  }

  ...
}
```


-- 
You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/issues/1097#issuecomment-748553484

Received on Sunday, 20 December 2020 02:22:19 UTC