Skip to content

Commit da7059b

Browse files
committedJan 14, 2016
Merge pull request #145 from gulpjs/readable-event
Allow things to listen for data or readable events and avoid sinking in those cases
2 parents 941b040 + a7fae5a commit da7059b

File tree

2 files changed

+151
-2
lines changed

2 files changed

+151
-2
lines changed
 

‎lib/sink.js

+37-2
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,52 @@
22

33
var Writable = require('readable-stream/writable');
44

5+
function listenerCount(stream, evt) {
6+
return stream.listeners(evt).length;
7+
}
8+
9+
function hasListeners(stream) {
10+
return !!(listenerCount(stream, 'readable') || listenerCount(stream, 'data'));
11+
}
12+
513
function sink(stream) {
14+
var sinkAdded = false;
615
var sinkStream = new Writable({
716
objectMode: true,
817
write: function(file, enc, cb) {
918
cb();
1019
},
1120
});
1221

13-
return function() {
22+
function addSink() {
23+
if (sinkAdded) {
24+
return;
25+
}
26+
27+
if (hasListeners(stream)) {
28+
return;
29+
}
30+
31+
sinkAdded = true;
1432
stream.pipe(sinkStream);
15-
};
33+
}
34+
35+
function removeSink(evt) {
36+
if (evt !== 'readable' && evt !== 'data') {
37+
return;
38+
}
39+
40+
if (hasListeners(stream)) {
41+
sinkAdded = false;
42+
stream.unpipe(sinkStream);
43+
}
44+
}
45+
46+
stream.on('newListener', removeSink);
47+
stream.on('removeListener', removeSink);
48+
stream.on('removeListener', addSink);
49+
50+
return addSink;
1651
}
1752

1853
module.exports = sink;

‎test/dest.js

+114
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ var realMode = function(n) {
3838
return n & parseInt('777', 8);
3939
};
4040

41+
function noop() {}
42+
4143
describe('dest stream', function() {
4244
beforeEach(wipeOut);
4345
afterEach(wipeOut);
@@ -1337,6 +1339,118 @@ describe('dest stream', function() {
13371339
.pipe(slowCountFiles);
13381340
});
13391341

1342+
it('should respect readable listeners on destination stream', function(done) {
1343+
var srcPath = path.join(__dirname, './fixtures/test.coffee');
1344+
var srcStream = vfs.src(srcPath);
1345+
var destStream = vfs.dest('./out-fixtures/', { cwd: __dirname });
1346+
1347+
srcStream
1348+
.pipe(destStream);
1349+
1350+
var readables = 0;
1351+
destStream.on('readable', function() {
1352+
var data = destStream.read();
1353+
1354+
if (data != null) {
1355+
readables++;
1356+
}
1357+
});
1358+
1359+
destStream.on('error', done);
1360+
1361+
destStream.on('finish', function() {
1362+
readables.should.equal(1);
1363+
done();
1364+
});
1365+
});
1366+
1367+
it('should respect data listeners on destination stream', function(done) {
1368+
var srcPath = path.join(__dirname, './fixtures/test.coffee');
1369+
var srcStream = vfs.src(srcPath);
1370+
var destStream = vfs.dest('./out-fixtures/', { cwd: __dirname });
1371+
1372+
srcStream
1373+
.pipe(destStream);
1374+
1375+
var datas = 0;
1376+
destStream.on('data', function() {
1377+
datas++;
1378+
});
1379+
1380+
destStream.on('error', done);
1381+
1382+
destStream.on('finish', function() {
1383+
datas.should.equal(1);
1384+
done();
1385+
});
1386+
});
1387+
1388+
it('sinks the stream if all the readable event handlers are removed', function(done) {
1389+
fs.mkdirSync(path.join(__dirname, './fixtures/highwatermark'));
1390+
for (var idx = 0; idx < 17; idx++) {
1391+
fs.writeFileSync(path.join(__dirname, './fixtures/highwatermark/', 'file' + idx + '.txt'));
1392+
}
1393+
1394+
var srcPath = path.join(__dirname, './fixtures/highwatermark/*.txt');
1395+
var srcStream = vfs.src(srcPath);
1396+
var destStream = vfs.dest('./out-fixtures/', { cwd: __dirname });
1397+
1398+
var fileCount = 0;
1399+
var countFiles = through.obj(function(file, enc, cb) {
1400+
fileCount++;
1401+
1402+
cb(null, file);
1403+
});
1404+
1405+
destStream.on('readable', noop);
1406+
1407+
destStream.once('finish', function() {
1408+
fileCount.should.equal(17);
1409+
done();
1410+
});
1411+
1412+
srcStream.pipe(countFiles).pipe(destStream);
1413+
1414+
process.nextTick(function() {
1415+
destStream.removeListener('readable', noop);
1416+
});
1417+
});
1418+
1419+
it('sinks the stream if all the data event handlers are removed', function(done) {
1420+
fs.mkdirSync(path.join(__dirname, './fixtures/highwatermark'));
1421+
for (var idx = 0; idx < 17; idx++) {
1422+
fs.writeFileSync(path.join(__dirname, './fixtures/highwatermark/', 'file' + idx + '.txt'));
1423+
}
1424+
1425+
var srcPath = path.join(__dirname, './fixtures/highwatermark/*.txt');
1426+
var srcStream = vfs.src(srcPath);
1427+
var destStream = vfs.dest('./out-fixtures/', { cwd: __dirname });
1428+
1429+
var fileCount = 0;
1430+
function onData() {
1431+
fileCount++;
1432+
}
1433+
1434+
var countFiles = through.obj(function(file, enc, cb) {
1435+
onData();
1436+
1437+
cb(null, file);
1438+
});
1439+
1440+
destStream.on('data', onData);
1441+
1442+
destStream.once('finish', function() {
1443+
fileCount.should.equal(17);
1444+
done();
1445+
});
1446+
1447+
srcStream.pipe(countFiles).pipe(destStream);
1448+
1449+
process.nextTick(function() {
1450+
destStream.removeListener('data', onData);
1451+
});
1452+
});
1453+
13401454
it('should pass options to through2', function(done) {
13411455
var srcPath = path.join(__dirname, './fixtures/test.coffee');
13421456
var content = fs.readFileSync(srcPath);

0 commit comments

Comments
 (0)
Please sign in to comment.