Commit 1b02a46 1 parent 3a0cf5b commit 1b02a46 Copy full SHA for 1b02a46
File tree 2 files changed +24
-10
lines changed
2 files changed +24
-10
lines changed Original file line number Diff line number Diff line change @@ -11,33 +11,43 @@ function hasListeners(stream) {
11
11
}
12
12
13
13
function sink ( stream ) {
14
+ var sinkAdded = false ;
14
15
var sinkStream = new Writable ( {
15
16
objectMode : true ,
16
17
write : function ( file , enc , cb ) {
17
18
cb ( ) ;
18
19
} ,
19
20
} ) ;
20
21
22
+ function addSink ( ) {
23
+ if ( sinkAdded ) {
24
+ return ;
25
+ }
26
+
27
+ if ( hasListeners ( stream ) ) {
28
+ return ;
29
+ }
30
+
31
+ sinkAdded = true ;
32
+ stream . pipe ( sinkStream ) ;
33
+ }
34
+
21
35
function removeSink ( evt ) {
22
36
if ( evt !== 'readable' && evt !== 'data' ) {
23
37
return ;
24
38
}
25
39
26
40
if ( hasListeners ( stream ) ) {
27
- stream . unpipe ( sinkStream ) ;
41
+ sinkAdded = false ;
42
+ return stream . unpipe ( sinkStream ) ;
28
43
}
29
44
}
30
45
31
46
stream . on ( 'newListener' , removeSink ) ;
32
47
stream . on ( 'removeListener' , removeSink ) ;
48
+ stream . on ( 'removeListener' , addSink ) ;
33
49
34
- return function ( ) {
35
- if ( hasListeners ( stream ) ) {
36
- return ;
37
- }
38
-
39
- stream . pipe ( sinkStream ) ;
40
- } ;
50
+ return addSink ;
41
51
}
42
52
43
53
module . exports = sink ;
Original file line number Diff line number Diff line change @@ -1411,7 +1411,9 @@ describe('dest stream', function() {
1411
1411
1412
1412
srcStream . pipe ( countFiles ) . pipe ( destStream ) ;
1413
1413
1414
- destStream . removeListener ( 'readable' , noop ) ;
1414
+ process . nextTick ( function ( ) {
1415
+ destStream . removeListener ( 'readable' , noop ) ;
1416
+ } ) ;
1415
1417
} ) ;
1416
1418
1417
1419
it ( 'sinks the stream if all the data event handlers are removed' , function ( done ) {
@@ -1444,7 +1446,9 @@ describe('dest stream', function() {
1444
1446
1445
1447
srcStream . pipe ( countFiles ) . pipe ( destStream ) ;
1446
1448
1447
- destStream . removeListener ( 'data' , onData ) ;
1449
+ process . nextTick ( function ( ) {
1450
+ destStream . removeListener ( 'data' , onData ) ;
1451
+ } ) ;
1448
1452
} ) ;
1449
1453
1450
1454
it ( 'should pass options to through2' , function ( done ) {
You can’t perform that action at this time.
0 commit comments