vendredi 27 mars 2015

How do you implement a stream that properly handles backpressure in node.js?

I can't for the life of me figure out how to implement a stream that properly handles backpressure. I have a test script I'm running:



var stream = require('stream')
var util = require('util')

var highWaterMark = 10

var Streamer = function() {
stream.Readable.call(this, {highWaterMark: highWaterMark})
this.count=0
}
util.inherits(Streamer, stream.Readable)
Streamer.prototype._read = function() {
this.startStreamin()
}
Streamer.prototype.startStreamin = function() {
this.pausedYo = false
setTimeout(function() {
if(this.pausedYo) return
// else

this.count++
if(this.count%10 === 0)
var data = "_"
else
var data = "."

if(this.count > 500) {
this.stopStreamin()
this.push(null)
return
}

if(!this.push(data)) this.stopStreamin()
process.stdout.write(data)

this.startStreamin() // keep streamin
}.bind(this),200)
}
Streamer.prototype.stopStreamin = function() {
this.pausedYo = true
}

var StreamPeeker = function(myStream, callback) {
stream.Readable.call(this, {highWaterMark: highWaterMark})
this.stream = myStream
this.reading = true
this.callback = callback

this.stream.on('readable', function() {
//process.stdout.write('reading: '+this.reading)
this.sourceReadable = true
this._readStuff()
}.bind(this))

this.stream.on('end', function() {
this.push(null)
}.bind(this))

this._read()
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
//process.stdout.write('start')
this.reading = true
if(this.sourceReadable) {
this._readStuff()
}
}
StreamPeeker.prototype._readStuff = function() {
if(this.reading) {
var data = this.stream.read()
if(data !== null) {
if(!this.push(data)) {
this.sourceReadable = false
this._stopReading()
}
this.callback(data)
}
}
}
StreamPeeker.prototype._stopReading = function() {
//process.stdout.write('stop')
this.reading = false
}



var a = new Streamer()
var b = new StreamPeeker(a, function(){})

var looping = false
var loop=function() {
var data = b.read(5)
if(data === null) {
looping = false
return
}
// else
process.stdout.write(data.toString().replace(/\./g, 'x').replace(/_/g,'L'))
setTimeout(loop, 1000)
}

b.on('readable', function() {
if(!looping) {
looping = true
loop()
}
})


The output should ideally be something like:



.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL
.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL
.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL
.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL


But instead I'm seeing something like this:



....xxxxx...._........._........._........._........._........._........._......
..._........._........._xxxxL_........._........._........._........._........._
........._........._........._........._........._.....xxxxx...._xxxxLxxxxxxxxxL
xxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxL
xxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxL


A: Why are there 4 dots and then 5 x's in the beggining, but mainly B: What am I doing wrong with backpressure?


Aucun commentaire:

Enregistrer un commentaire