Skip to content
This repository was archived by the owner on Dec 27, 2019. It is now read-only.

Stop pushing data when push() returns false #41

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class PgQueryStream extends Readable {
this.cursor = new Cursor(text, values)
this._reading = false
this._closed = false
this._buffer = []
this.batchSize = (options || { }).batchSize || 100

// delegate Submittable callbacks to cursor
Expand Down Expand Up @@ -35,7 +36,22 @@ class PgQueryStream extends Readable {
return false
}
this._reading = true
const readAmount = Math.max(size, this.batchSize)
var readAmount = Math.max(size, this.batchSize)
var object;

while ((object = this._buffer.shift()) && readAmount) {
readAmount--;
if (!this.push(object)) {
this._reading = false
return
}
}

if (!readAmount) {
this._reading = false
return
}

this.cursor.read(readAmount, (err, rows) => {
if (this._closed) {
return
Expand All @@ -52,8 +68,11 @@ class PgQueryStream extends Readable {

// push each row into the stream
this._reading = false
for (var i = 0; i < rows.length; i++) {
this.push(rows[i])
while (object = rows.shift()) {
if (!this.push(object)) {
this._buffer = this._buffer.concat(rows)
return
}
}
})
}
Expand Down
26 changes: 26 additions & 0 deletions test/pauses.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var concat = require('concat-stream')
var tester = require('stream-tester')
var JSONStream = require('JSONStream')
var assert = require('assert');

var QueryStream = require('../')

Expand All @@ -15,4 +16,29 @@ require('./helper')('pauses', function (client) {
done()
}))
})

it('keeps a stable internal buffer size when paused/resumed', function (done) {
this.timeout(5000)

var stream = client.query(new QueryStream('SELECT * FROM generate_series(0, $1)', [10000], {batchSize: 100}))
var results = []
var concurrency = 50

stream.on('data', function (result) {
results.push(result)

if (results.length == concurrency) {
stream.pause()

setTimeout(function () {
results = []
stream.resume()
}, 10)
}

assert(stream._readableState.buffer.length <= stream.batchSize)
})

stream.on('end', done);
})
})
31 changes: 31 additions & 0 deletions test/pushes.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
var helper = require('./helper')
var QueryStream = require('../')
var Writable = require('stream').Writable
var assert = require('assert')

helper('pushes', function (client) {
it('stops pushing data when push() returns false and resumes on _read()', function (done) {
var readable = client.query(new QueryStream('SELECT * FROM generate_series(0, $1)', [500]))
var writable = new Writable({highWaterMark: 100, objectMode: true})
var shouldPushAgain = true

readable.original_read = readable._read
readable._read = function (size) {
shouldPushAgain = true
return this.original_read(size)
}

readable.originalPush = readable.push
readable.push = function (data) {
assert(shouldPushAgain)
shouldPushAgain = this.originalPush(data)
return shouldPushAgain
}

writable._write = function (chunk, encoding, callback) {
setImmediate(callback)
}

readable.pipe(writable).on('finish', done)
})
})