Re: [streams] rsReader.readBatch(numberOfChunks)? rbsReader.read(view, { waitUntilDone: true })? (#320)

Here's an example of a super simple message queue that runs in io.js:
```javascript
'use strict';
const SlowBuffer = require('buffer').SlowBuffer;
const ITER = 1e7;


function Inst() {
  this._messages = [];
  this._onreadable = undefined;
  this._onreadable_args = undefined;
}

Inst.prototype.push = function push(data){
  this._messages.push(data);
  if (this._messages.length === 1) {
    if (this._onreadable_args)
      this._onreadable.apply(this, this._onreadable_args);
    else
      this._onreadable();
  }
};

Inst.prototype.read = function read() {
  if (this._messages.length > 0)
    return this._messages.pop();
};

Inst.prototype.onreadable = function onreadable(cb) {
  this._onreadable = cb;
  if (arguments.length > 1)
    this._onreadable_args = parseArgs(arguments, 1);
};


/* actual benchmark */

var inst = new Inst();
var t = process.hrtime();
var iter = 0;

inst.onreadable(function onReadable(foo) {
  if (foo !== 'foo')
    throw new Error('where is foo?');

  if (++iter > ITER)
    return printTime(process.hrtime(t));

  this.read();
  var self = this;
  process.nextTick(function nt() {
    self.push(new SlowBuffer(64));
  });
}, 'foo');
inst.push();


/* utility functions */

function parseArgs(args, start) {
  var arr = [];
  for (var i = start; i < args.length; i++)
    arr.push(args[i]);
  return arr;
}

function printTime(t) {
  console.log(((t[0] * 1e9 + t[1]) / ITER).toFixed(1) + ' ns/op');
  console.log((ITER / (t[0] + t[1] / 1e9)).toFixed(1) + ' op/sec');
}
```
Output:
```
$ iojs --nouse-osr --nouse-inlining bench.js
1451.8 ns/op
688801.2 op/sec
```
A few key points of things intentionally implemented:
* The use of `SlowBuffer()` that forces a JS -> C++ round trip.
* Allocating 64KB, same size as a TCP packet to place appropriate strain on GC.
* Force the pushing of new data to be "async" using `process.nextTick()`.
* Emitting the actual `onreadable` callback event so the data can be read.
* Allow extra arguments to be passed to propagate to `onreadable` callback to force the use of `.apply()`.
* Disabled OSR and inlining for the benchmark.

As you can see I am still able to process over 600k messages per second. Performance-wise I'd like to see the final result take no more than 2us per operation. "Good enough" isn't really my thing. IMO the implementation should be no more than a footnote in total measured CPU time.

BTW, if `SlowBuffer()` allocations are taken down to the very small size of 32 bytes performance increases to 2 million ops/sec.

---
Reply to this email directly or view it on GitHub:
https://github.com/whatwg/streams/issues/320#issuecomment-91996256

Received on Sunday, 12 April 2015 06:04:15 UTC