I can't for the life of me figure out how to implement a stream that properly handles backpressure. I have a test script I'm running:
var stream = require('stream')
var util = require('util')
var highWaterMark = 10
var Streamer = function() {
stream.Readable.call(this, {highWaterMark: highWaterMark})
this.count=0
}
util.inherits(Streamer, stream.Readable)
Streamer.prototype._read = function() {
this.startStreamin()
}
Streamer.prototype.startStreamin = function() {
this.pausedYo = false
setTimeout(function() {
if(this.pausedYo) return
// else
this.count++
if(this.count%10 === 0)
var data = "_"
else
var data = "."
if(this.count > 500) {
this.stopStreamin()
this.push(null)
return
}
if(!this.push(data)) this.stopStreamin()
process.stdout.write(data)
this.startStreamin() // keep streamin
}.bind(this),200)
}
Streamer.prototype.stopStreamin = function() {
this.pausedYo = true
}
var StreamPeeker = function(myStream, callback) {
stream.Readable.call(this, {highWaterMark: highWaterMark})
this.stream = myStream
this.reading = true
this.callback = callback
this.stream.on('readable', function() {
//process.stdout.write('reading: '+this.reading)
this.sourceReadable = true
this._readStuff()
}.bind(this))
this.stream.on('end', function() {
this.push(null)
}.bind(this))
this._read()
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
//process.stdout.write('start')
this.reading = true
if(this.sourceReadable) {
this._readStuff()
}
}
StreamPeeker.prototype._readStuff = function() {
if(this.reading) {
var data = this.stream.read()
if(data !== null) {
if(!this.push(data)) {
this.sourceReadable = false
this._stopReading()
}
this.callback(data)
}
}
}
StreamPeeker.prototype._stopReading = function() {
//process.stdout.write('stop')
this.reading = false
}
var a = new Streamer()
var b = new StreamPeeker(a, function(){})
var looping = false
var loop=function() {
var data = b.read(5)
if(data === null) {
looping = false
return
}
// else
process.stdout.write(data.toString().replace(/\./g, 'x').replace(/_/g,'L'))
setTimeout(loop, 1000)
}
b.on('readable', function() {
if(!looping) {
looping = true
loop()
}
})
The output should ideally be something like:
.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL
.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL
.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL
.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL.....xxxxx...._xxxxL
But instead I'm seeing something like this:
....xxxxx...._........._........._........._........._........._........._......
..._........._........._xxxxL_........._........._........._........._........._
........._........._........._........._........._.....xxxxx...._xxxxLxxxxxxxxxL
xxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxL
xxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxLxxxxxxxxxL
A: Why are there 4 dots and then 5 x's in the beggining, but mainly B: What am I doing wrong with backpressure?
Aucun commentaire:
Enregistrer un commentaire