@@ -147,7 +147,7 @@ BodyDispatcher.prototype.promise = function dispatchBodyPromise(
147
147
contentType , response
148
148
) {
149
149
var operation = this . operation ;
150
-
150
+
151
151
operation . logger . debug ( 'body promise' ) ;
152
152
var collectObject = function collectPromiseBodyObject ( data ) {
153
153
operation . data = operation . collectBodyObject ( data ) ;
@@ -166,7 +166,7 @@ BodyDispatcher.prototype.chunkedStream = function dispatchBodyChunkedStream(
166
166
contentType , response
167
167
) {
168
168
var operation = this . operation ;
169
-
169
+
170
170
operation . logger . debug ( 'body chunked stream' ) ;
171
171
172
172
response . pipe ( operation . outputStream ) ;
@@ -175,7 +175,7 @@ BodyDispatcher.prototype.objectStream = function dispatchBodyObjectStream(
175
175
contentType , response
176
176
) {
177
177
var operation = this . operation ;
178
-
178
+
179
179
operation . logger . debug ( 'body object stream' ) ;
180
180
181
181
var outputStream = operation . outputStream ;
@@ -345,7 +345,7 @@ MultipartDispatcher.prototype.chunkedStream = function dispatchMultipartChunkedS
345
345
boundary , response
346
346
) {
347
347
var operation = this . operation ;
348
-
348
+
349
349
var errorListener = operation . errorListener ;
350
350
351
351
var outputStream = operation . outputStream ;
@@ -414,7 +414,7 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr
414
414
boundary , response
415
415
) {
416
416
var operation = this . operation ;
417
-
417
+
418
418
var errorListener = operation . errorListener ;
419
419
420
420
var rawHeaderQueue = new FifoQueue ( 5 ) ;
@@ -426,31 +426,13 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr
426
426
var hasParsed = false ;
427
427
var hasEnded = false ;
428
428
429
- var partTransform = function objectPartQueueTransform (
430
- isLast , data , objectQueue
431
- ) {
429
+ var partTransform = function objectPartQueueTransform ( data ) {
432
430
parsedParts ++ ;
431
+ var objectQueue = queuedReader . getItemQueue ( ) ;
433
432
metadataBuffer = operation . queueDocument (
434
433
( data . length === 0 ) ? null : data , rawHeaderQueue , metadataBuffer , objectQueue
435
434
) ;
436
-
437
- if ( isLast ) {
438
- if ( metadataBuffer !== null ) {
439
- operation . queueMetadata ( metadataBuffer , objectQueue ) ;
440
- metadataBuffer = null ;
441
- }
442
-
443
- rawHeaderQueue = null ;
444
- queuedReader = null ;
445
- parser = null ;
446
- partHeadersListener = null ;
447
- partListener = null ;
448
- parseFinishListener = null ;
449
- responseEndListener = null ;
450
- partTransform = null ;
451
- } else if ( ! hasEnded && parsedParts === parsingParts ) {
452
- parser . emit ( 'drain' ) ;
453
- }
435
+ doneChecker ( ) ;
454
436
} ;
455
437
456
438
var queuedReader = new QueuedReader (
@@ -470,15 +452,44 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr
470
452
queuedReader . addReader ( partReadStream ) ;
471
453
} ;
472
454
473
- var parseFinishListener = function promiseParseFinishListener ( ) {
455
+ var parseFinishListener = function objectParseFinishListener ( ) {
474
456
hasParsed = true ;
475
- if ( queuedReader !== null ) {
476
- queuedReader . queuedAll ( ) ;
477
- }
457
+ doneChecker ( ) ;
478
458
} ;
479
459
480
- var responseEndListener = function promiseResponseEndListener ( ) {
460
+ var responseEndListener = function objectResponseEndListener ( ) {
481
461
hasEnded = true ;
462
+ doneChecker ( ) ;
463
+ } ;
464
+
465
+ /**
466
+ * Check if HTTP response has ended, Dicer has finished parsing,
467
+ * and the Queue is empty. If all are true, then we can call end()
468
+ * on output stream.
469
+ */
470
+ var doneChecker = function doneChecker ( ) {
471
+
472
+ if ( queuedReader . isQueueEmpty ( ) && hasParsed && hasEnded ) {
473
+ if ( metadataBuffer !== null ) {
474
+ var objectQueue = queuedReader . getItemQueue ( ) ;
475
+ operation . queueMetadata ( metadataBuffer , objectQueue ) ;
476
+ metadataBuffer = null ;
477
+ }
478
+
479
+ rawHeaderQueue = null ;
480
+ queuedReader = null ;
481
+ parser = null ;
482
+ partHeadersListener = null ;
483
+ partListener = null ;
484
+ parseFinishListener = null ;
485
+ responseEndListener = null ;
486
+ partTransform = null ;
487
+
488
+ operation . outputStream . end ( ) ;
489
+ } else if ( ! hasEnded && parsedParts === parsingParts ) {
490
+ parser . emit ( 'drain' ) ;
491
+ }
492
+
482
493
} ;
483
494
484
495
var parser = new Dicer ( { boundary : boundary } ) ;
@@ -537,13 +548,10 @@ function QueuedReader(options, logger, itemsTransform) {
537
548
538
549
self . logger . debug ( 'concatenated item' ) ;
539
550
540
- var isLast = ( self . queueDone && self . readerQueue . length ( ) === 0 &&
541
- self . writerQueue . length ( ) === 0 ) ;
542
-
543
551
var itemQueue = self . itemQueue ;
544
552
var beforeLength = itemQueue . length ( ) ;
545
553
546
- self . itemsTransform ( isLast , data , itemQueue ) ;
554
+ self . itemsTransform ( data ) ;
547
555
548
556
if ( beforeLength < itemQueue . length ( ) ) {
549
557
if ( beforeLength === 0 ) {
@@ -569,6 +577,12 @@ QueuedReader.prototype.addReader = function queuedAddReader(reader) {
569
577
this . logger . debug ( 'queued item %d' , readerQueue . getTotal ( ) ) ;
570
578
this . nextReader ( ) ;
571
579
} ;
580
+ QueuedReader . prototype . getItemQueue = function getItemQueue ( ) {
581
+ return this . itemQueue ;
582
+ } ;
583
+ QueuedReader . prototype . isQueueEmpty = function isQueueEmpty ( ) {
584
+ return ( this . readerQueue . length ( ) === 0 && this . writerQueue . length ( ) === 0 ) ;
585
+ } ;
572
586
QueuedReader . prototype . nextReader = function queuedReaderNextReader ( ) {
573
587
if ( ! this . isReading ) {
574
588
return ;
@@ -635,12 +649,6 @@ QueuedReader.prototype._read = function queuedReaderRead(/*size*/) {
635
649
this . nextReader ( ) ;
636
650
}
637
651
} ;
638
- QueuedReader . prototype . queuedAll = function queuedReaderAll ( ) {
639
- if ( ! this . queueDone ) {
640
- this . logger . debug ( 'queued all items' ) ;
641
- this . queueDone = true ;
642
- }
643
- } ;
644
652
645
653
function FifoQueue ( min ) {
646
654
if ( ! ( this instanceof FifoQueue ) ) {
@@ -799,13 +807,13 @@ function operationResultPromise(fullfilled, rejected) {
799
807
switch ( operation . outputMode ) {
800
808
case 'none' :
801
809
if ( operation . startedResponse === true ) {
802
- throw new Error ( 'cannot create result promise after receiving response' ) ;
810
+ throw new Error ( 'cannot create result promise after receiving response' ) ;
803
811
}
804
812
break ;
805
813
case 'promise' :
806
- throw new Error ( 'already created result promise' ) ;
814
+ throw new Error ( 'already created result promise' ) ;
807
815
default :
808
- throw new Error ( 'cannot create result promise after creating stream' ) ;
816
+ throw new Error ( 'cannot create result promise after creating stream' ) ;
809
817
}
810
818
operation . outputMode = 'promise' ;
811
819
@@ -847,7 +855,7 @@ function resolvedPromise(operation, resolve) {
847
855
) + ' data' ) ;
848
856
849
857
if ( ! hasData ) {
850
- resolve . call ( operation ) ;
858
+ resolve . call ( operation ) ;
851
859
} else {
852
860
resolve . call ( operation , data ) ;
853
861
}
@@ -932,7 +940,7 @@ function operationResultStream() {
932
940
if ( error != null ) {
933
941
var i = 0 ;
934
942
for ( ; i < error . length ; i ++ ) {
935
- outputStream . emit ( 'error' , error [ i ] ) ;
943
+ outputStream . emit ( 'error' , error [ i ] ) ;
936
944
}
937
945
operation . error = undefined ;
938
946
}
0 commit comments