Skip to content

Commit 193d1d8

Browse files
committed
Fix: Use custom implementation of fs.WriteStream that supports flush (fixes #189)
1 parent d905bcd commit 193d1d8

File tree

5 files changed

+339
-42
lines changed

5 files changed

+339
-42
lines changed
+31-38
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,58 @@
11
'use strict';
22

3-
var fs = require('graceful-fs');
4-
53
var fo = require('../../file-operations');
64
var readStream = require('../../src/get-contents/read-stream');
75

86
function writeStream(file, onWritten) {
97
var opt = {
108
mode: file.stat.mode,
9+
// TODO: need to test this (node calls this `flags` property)
1110
flag: file.flag,
1211
};
1312

14-
var outStream = fs.createWriteStream(file.path, opt);
15-
var fd = null;
16-
17-
file.contents.once('error', complete);
18-
file.contents.once('end', readStreamEnd);
19-
outStream.once('error', complete);
20-
outStream.once('finish', complete);
21-
outStream.once('open', onOpen);
13+
// TODO: is this the best API?
14+
var outStream = fo.createWriteStream(file.path, opt, onFlush);
2215

23-
// Streams are piped with end disabled, this prevents the
24-
// WriteStream from closing the file descriptor after all
25-
// data is written.
26-
file.contents.pipe(outStream, { end: false });
16+
file.contents.once('error', onComplete);
17+
outStream.once('error', onComplete);
18+
outStream.once('finish', onComplete);
2719

28-
// Obtain the file descriptor from the "open" event.
29-
function onOpen(openFd) {
30-
fd = openFd;
31-
}
20+
// TODO: should this use a clone?
21+
file.contents.pipe(outStream);
3222

33-
function readStreamEnd() {
34-
readStream(file, complete);
35-
}
23+
function onComplete(streamErr) {
24+
// Cleanup event handlers before closing
25+
file.contents.removeListener('error', onComplete);
26+
outStream.removeListener('error', onComplete);
27+
outStream.removeListener('finish', onComplete);
3628

37-
function end(propagatedErr) {
38-
outStream.end(onEnd);
29+
// Need to guarantee the fd is closed before forwarding the error
30+
outStream.once('close', onClose);
31+
outStream.end();
3932

40-
function onEnd(endErr) {
41-
onWritten(propagatedErr || endErr);
33+
function onClose(closeErr) {
34+
onWritten(streamErr || closeErr);
4235
}
4336
}
4437

4538
// Cleanup
46-
function complete(streamErr) {
47-
file.contents.removeListener('error', complete);
48-
file.contents.removeListener('end', readStreamEnd);
49-
outStream.removeListener('error', complete);
50-
outStream.removeListener('finish', complete);
51-
outStream.removeListener('open', onOpen);
39+
function onFlush(fd, callback) {
40+
// TODO: removing this before readStream because it replaces the stream
41+
file.contents.removeListener('error', onComplete);
5242

53-
if (streamErr) {
54-
return end(streamErr);
55-
}
43+
// TODO: this is doing sync stuff & the callback seems unnecessary
44+
// TODO: do we really want to replace the contents stream or should we use a clone
45+
readStream(file, complete);
5646

57-
if (typeof fd !== 'number') {
58-
return end();
59-
}
47+
function complete() {
48+
if (typeof fd !== 'number') {
49+
return callback();
50+
}
6051

61-
fo.updateMetadata(fd, file, end);
52+
fo.updateMetadata(fd, file, callback);
53+
}
6254
}
55+
6356
}
6457

6558
module.exports = writeStream;

lib/file-operations.js

+101
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
'use strict';
22

3+
var util = require('util');
4+
35
var fs = require('graceful-fs');
46
var path = require('path');
57
var assign = require('object-assign');
68
var isEqual = require('lodash.isequal');
79
var isValidDate = require('vali-date');
10+
var FlushWriteStream = require('flush-write-stream');
811

912
var constants = require('./constants');
1013

@@ -324,6 +327,103 @@ function mkdirp(dirpath, customMode, callback) {
324327
}
325328
}
326329

330+
function createWriteStream(path, options, flush) {
331+
return new WriteStream(path, options, flush);
332+
}
333+
334+
// Taken from node core and altered to receive a flush function and simplified
335+
// To be used for cleanup (like updating times/mode/etc)
336+
function WriteStream(path, options, flush) {
337+
// Not exposed so we can avoid the case where someone doesn't use `new`
338+
339+
if (typeof options === 'function') {
340+
flush = options;
341+
options = null;
342+
}
343+
344+
options = options || {};
345+
346+
FlushWriteStream.call(this, options, worker, cleanup);
347+
348+
this.flush = flush;
349+
this.path = path;
350+
351+
this.mode = options.mode || constants.DEFAULT_FILE_MODE;
352+
this.flag = options.flag || 'w';
353+
this.pos = APPEND_MODE_REGEXP.test(this.flag) ? null : 0;;
354+
355+
// Used by node's `fs.WriteStream`
356+
this.fd = null;
357+
this.start = null;
358+
359+
this.open();
360+
361+
// Dispose on finish.
362+
this.once('finish', this.close);
363+
}
364+
365+
util.inherits(WriteStream, FlushWriteStream);
366+
367+
WriteStream.prototype.open = function() {
368+
var self = this;
369+
370+
fs.open(this.path, this.flag, this.mode, onOpen);
371+
372+
function onOpen(openErr, fd) {
373+
if (openErr) {
374+
self.destroy();
375+
self.emit('error', openErr);
376+
return;
377+
}
378+
379+
self.fd = fd;
380+
self.emit('open', fd);
381+
}
382+
};
383+
384+
// Use our `end` method since it is patched for flush
385+
WriteStream.prototype.destroySoon = WriteStream.prototype.end;
386+
// Use node's `fs.WriteStream` methods
387+
WriteStream.prototype.destroy = fs.WriteStream.prototype.destroy;
388+
WriteStream.prototype.close = fs.WriteStream.prototype.close;
389+
390+
function worker(data, encoding, callback) {
391+
var self = this;
392+
393+
// This is from node core but I have no idea how to get code coverage on it
394+
if (!Buffer.isBuffer(data)) {
395+
return this.emit('error', new Error('Invalid data'));
396+
}
397+
398+
if (typeof this.fd !== 'number') {
399+
return this.once('open', onOpen);
400+
}
401+
402+
fs.write(this.fd, data, 0, data.length, this.pos, onWrite);
403+
404+
function onOpen() {
405+
self._write(data, encoding, callback);
406+
}
407+
408+
function onWrite(writeErr) {
409+
if (writeErr) {
410+
self.destroy();
411+
callback(writeErr);
412+
return;
413+
}
414+
415+
callback();
416+
}
417+
}
418+
419+
function cleanup(callback) {
420+
if (typeof this.flush !== 'function') {
421+
return callback();
422+
}
423+
424+
this.flush(this.fd, callback);
425+
}
426+
327427
module.exports = {
328428
closeFd: closeFd,
329429
isValidUnixId: isValidUnixId,
@@ -334,4 +434,5 @@ module.exports = {
334434
updateMetadata: updateMetadata,
335435
writeFile: writeFile,
336436
mkdirp: mkdirp,
437+
createWriteStream: createWriteStream,
337438
};

package.json

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
],
1313
"dependencies": {
1414
"duplexify": "^3.2.0",
15+
"flush-write-stream": "^1.0.0",
1516
"glob-stream": "^5.3.2",
1617
"graceful-fs": "^4.0.0",
1718
"gulp-sourcemaps": "^1.5.2",
@@ -36,6 +37,7 @@
3637
"eslint": "^1.10.3",
3738
"eslint-config-gulp": "^2.0.0",
3839
"expect": "^1.14.0",
40+
"from2": "^2.1.1",
3941
"github-changes": "^1.0.1",
4042
"istanbul": "^0.3.0",
4143
"istanbul-coveralls": "^1.0.1",

test/dest.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ var expect = require('expect');
1818
var bufEqual = require('buffer-equal');
1919
var through = require('through2');
2020
var File = require('vinyl');
21+
var from = require('from2');
2122

2223
var should = require('should');
2324
require('mocha');
@@ -1146,7 +1147,9 @@ describe('dest stream', function() {
11461147
var inputPath = path.join(__dirname, './fixtures/test.coffee');
11471148
var inputBase = path.join(__dirname, './fixtures/');
11481149

1149-
var contentStream = through.obj();
1150+
var contentStream = from(function(size, cb) {
1151+
cb(new Error('mocked error'));
1152+
});
11501153
var expectedFile = new File({
11511154
base: inputBase,
11521155
cwd: __dirname,
@@ -1156,9 +1159,6 @@ describe('dest stream', function() {
11561159

11571160
var stream = vfs.dest('./out-fixtures/', { cwd: __dirname });
11581161
stream.write(expectedFile);
1159-
setTimeout(function() {
1160-
contentStream.emit('error', new Error('mocked error'));
1161-
}, 100);
11621162
stream.on('error', function(err) {
11631163
expect(err).toExist();
11641164
done();

0 commit comments

Comments
 (0)