- From: Morgan Barrett <notifications@github.com>
- Date: Sat, 19 Dec 2020 18:22:06 -0800
- To: whatwg/streams <streams@noreply.github.com>
- Cc: Subscribed <subscribed@noreply.github.com>
- Message-ID: <whatwg/streams/issues/1097/748553484@github.com>
> It looks like there's a lot of plumbing going on to turn the push-style `transform()` method into a pull-based `read()` method though (using an intermediate buffer). This is tricky to get right, especially if you need to consider backpressure. I'll have a think about it, see if there's a better way.
I think this might be a slightly better solution, still no idea how to consider backpressure though.
### PullTransformStream
```typescript
export class PullTransformStream {
  constructor(transform) {
    let buffer = new Buffer();
    this.readable = new ReadableStream({
      async pull(controller){
        let arr = await transform(buffer);
        if(arr === undefined) controller.close();
        else controller.enqueue(arr);
      },
      cancel(){
        this.writable.close();
      }
    });
    this.writable = new WritableStream({
      write(chunk){
        chunk.forEach(c => buffer.push(c));
        buffer.resolve();
      },
      close(){
        buffer.resolve();
      },
      abort(){
        buffer.reject();
      }
    });
  }
}
```
```typescript
export class Buffer extends Array {
  constructor(){
    this.resolve = () => {};
    this.reject = () => {};
  }
  async has(length){
    if(this.length > length) return true;
    await new Promise((resolve, reject) => {
      this.resolve = resolve;
      this.reject = reject;
    });
    return this.length > length;
  }
}
```
### Example
```typescript
export default class PreprocessTransformStream extends PullTransformStream {
  constructor(){
    super(async buffer => {
      if(!await buffer.has(1)) return;
      let c = buffer.shift();
      //replace form feed code point with a line feed
      if(c === "\u000C") return ["\u000A"];
      //replace any carriage return code points...
      if(c === "\u000D"){
        //...or pairs of carriage returns followed by a line feed...
        await buffer.has(2);
        if(buffer[0] === "\u000D" && buffer[1] === "\u000A"){
          buffer.splice(0, 2);
        }
        //...with a single line feed
        return ["\u000A"];
      }
      
      //replace any null or surrogate code points with a replacement character
      if(c === "\u0000" || (c >= "\uD800" && c <= "\uDFFF")){
        return ["\uFFFD"];
      }
      
      return [c];
    });
  }
}
```
> I think it's a bit too early to tell whether this will be useful enough to make it part of the platform. I suggest you start off by first writing it as a userland JavaScript library. 🙂
and sorry, I meant more of an example. I see many people doing my original trail of thought which is something along the line of
```typescript
class Transformation {
  readable: ReadableStream;
  constructor(readable: ReadableStream, writable: WritableStream){
    this.readable = readable;
    let writer = writable.getWriter();
    let token: Token;
    while(token = await this.consumeToken(readable)){
      writer.write([token]);
    }
  }
  async consumeToken(){
    this.consumeWhitespace();
    let [s1, s2] = this.readable.tee();
    let reader = s1.getReader();
    let {done, value} = await reader.read();
    if(done) return;
    switch(value){
      ...
    }
  }
  async consumeWhitespace(){
    let [s1, s2] = this.readable.tee();
    let reader = s1.getReader();
    let {done, value} = await reader.read();
    if(done || !isWhitespace(value)){
      this.readable = s2;
    } else {
      this.readable = s1;
      this.consumeWhitespace();
    }
  }
  ...
}
```
-- 
You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/issues/1097#issuecomment-748553484
Received on Sunday, 20 December 2020 02:22:19 UTC