Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 62 additions & 17 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ var urllib = require('url');
var debug = require('debug')('http-disk-cache');
var glob = require('glob');

var util = require('util');
var stream = require('stream');

/////////////// CacheEntry ///////////////

function canonicalUrl(url) {
Expand Down Expand Up @@ -178,6 +181,32 @@ CacheWriter.prototype.pipeFrom = function pipeFrom(readable) {
});
};

function ChecksumStream(expectedChecksum, options) {
if (!(this instanceof ChecksumStream)) {
return new ChecksumStream(expectedChecksum, options);
}
stream.Transform.call(this, options);
this.hash = crypto.createHash('md5');
this.expectedChecksum = expectedChecksum;
}
util.inherits(ChecksumStream, stream.Transform);

ChecksumStream.prototype._transform = function (chunk, enc, cb) {
var buffer = Buffer.isBuffer(chunk) ? chunk : new Buffer(chunk, enc);
this.hash.update(buffer); // update hash
this.push(chunk, enc);
cb();
};

ChecksumStream.prototype._flush = function (cb) {
var checksum = this.hash.digest('hex');
if (checksum != this.expectedChecksum) {
return cb(new Error('invalid checksum'));
}
cb();
};


/////////////// HTTPCache ///////////////

// HTTPCache handles HTTP requests, and caches them to disk if allowed by the Cache-Control
Expand Down Expand Up @@ -247,7 +276,7 @@ function deleteEntry(metaPath, cb) {
// 'notcached' - the cache entry is missing, invalid, or expired, but ready to be cached anew.
// 'error' - the cache entry is corrupted, and could not be deleted. This indicates that
// we shouldn't try to cache any responses right now.
HTTPCache.prototype._checkCache = function(cacheEntry, callback) {
HTTPCache.prototype._checkCache = function(cacheEntry, skipVerify, callback) {
var _this = this;
function loadMetadata(cb) {
debug('loading metadata from', cacheEntry.metaPath);
Expand Down Expand Up @@ -339,8 +368,12 @@ HTTPCache.prototype._checkCache = function(cacheEntry, callback) {
});
}

// We now have valid metadata for an un-expired cache entry. Next, we checksum the contents.
validateContents(metadata);
if (skipVerify) {
return callback(null, CACHE_STATE_CACHED, metadata);
} else {
// We now have valid metadata for an un-expired cache entry. Next, we checksum the contents.
validateContents(metadata);
}
});
};

Expand Down Expand Up @@ -404,20 +437,22 @@ HTTPCache.prototype.assertCached = function(url, onProgress, cb) {
options = { url: url };
}

options._skipReadStream = true;

var entry = new CacheEntry(url, options.etagFormat);

this._checkCache(entry, function(err, cacheStatus) {
if (cacheStatus === CACHE_STATE_CACHED) {
debug('assert cache hit', url);
return cb();
} else {
debug('assert cache miss', url);
_this.openReadStream(options, onProgress, function(err, _, path) {
cb(err);
});
this.openReadStream(options, onProgress, function (err, readStream, path) {
if (err != null) {
return cb(err);
}
if (readStream == null) { throw new Error("HAY"); }
readStream.on('error', function(err) {
readStream.removeAllListeners();
cb(err);
});
readStream.on('end', function() {
readStream.removeAllListeners();
cb();
});
readStream.resume();
});
};

Expand Down Expand Up @@ -464,14 +499,21 @@ HTTPCache.prototype.openReadStream = function(url, onProgress, cb) {
var cacheWriter = this._createCacheWriter(entry);

// Check if the entry is available in the cache.
this._checkCache(entry, function(err, cacheStatus) {
this._checkCache(entry, true, function(err, cacheStatus, metadata) {

debug("cache entry", entry.url, "status=", cacheStatus);
if (cacheStatus === CACHE_STATE_CACHED) {
// The cache contents are present and valid, so serve the request from cache.
cacheWriter.end();
var readStream = options._skipReadStream ? null : _this._createContentReadStream(entry);
return cb(null, readStream, _this._absPath(entry.contentPath));
var checksumStream = new ChecksumStream(metadata.contentMD5);
checksumStream.on('error', function (err) {
if (err === 'invalid checksum') {
deleteEntry(_this._absPath(entry.metaPath), function(err) {});
}
});
readStream.pipe(checksumStream);
return cb(null, checksumStream, _this._absPath(entry.contentPath));
} else if (cacheStatus == CACHE_STATE_ERROR) {
// Some kind of error occurred and we can't access the cache.
return cb("Error: There was a problem with the asset cache and we can't write files");
Expand Down Expand Up @@ -582,6 +624,8 @@ HTTPCache.prototype.getContents = function(url, cb) {
}
debug("getContents start", options.url);

options._skipVerify = true;

this.openReadStream(options, function(err, readStream, path) {
if (err) { return cb(err); }

Expand Down Expand Up @@ -754,7 +798,7 @@ HTTPCache.prototype.repair = function(cb) {
return;
}

_this._checkCache(entry, function (err, status) {
_this._checkCache(entry, false, function (err, status) {
if (err != null) {
deleteEntry(metaPath, deleteCb);
return;
Expand Down Expand Up @@ -861,3 +905,4 @@ HTTPCache.prototype.clean = function (shouldClean, cb) {
};

exports.HTTPCache = HTTPCache;
exports.ChecksumStream = ChecksumStream;
75 changes: 38 additions & 37 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var execSync = require('child_process').execSync;
var debug = require('debug')('http-disk-cache');
var async = require('artillery-async');
var glob = require('glob');
var stream = require('stream');

var httpcache = require('./index');

Expand All @@ -36,16 +37,19 @@ function newUrlReply(contents, status, headers, defer) {

function catStream(stream, cb) {
chunks = [];
stream.on('error', function (err) {
cb(err);
});
stream.on('data', function (chunk) {
chunks.push(chunk);
});
stream.on('end', function () {
if (chunks.length === 0) {
cb(null);
cb(null, null);
} else if (typeof chunks[0] === 'string') {
cb(chunks.join(''));
cb(null, chunks.join(''));
} else { // Buffer
cb(Buffer.concat(chunks));
cb(null, Buffer.concat(chunks));
}
});
}
Expand Down Expand Up @@ -166,7 +170,7 @@ exports.tests = {
var _this = this;
this.cache.openReadStream(this.createUrl('/url1'), function(err, stream, path) {
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url1 contents');
test.done();
});
Expand All @@ -182,7 +186,7 @@ exports.tests = {
test.equal(_this.requests.length, 1);
test.equal(_this.requests[0], '/url5');
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url5 contents');
test.done();
});
Expand All @@ -192,7 +196,7 @@ exports.tests = {
_this.cache.openReadStream({ url: _this.createUrl('/url5'), etagFormat: 'md5' }, function(err, stream, path) {
test.equal(_this.requests.length, 1); // request is handled from cache.
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url5 contents');
test.done();
});
Expand All @@ -213,7 +217,7 @@ exports.tests = {
test.equal(_this.requests.length, 1);
test.equal(_this.requests[0], '/url7');
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url7 contents');
test.done();
});
Expand All @@ -223,7 +227,7 @@ exports.tests = {
_this.cache.openReadStream({ url: _this.createUrl('/url7'), etagFormat: 'md5' }, function(err, stream, path) {
test.equal(_this.requests.length, 1); // request is handled from cache.
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url7 contents');
test.done();
});
Expand Down Expand Up @@ -256,7 +260,7 @@ exports.tests = {
_this.cache.openReadStream({ url: _this.createUrl('/url1'), etagFormat: 'md5' }, function(err, stream, path) {
test.equal(_this.requests.length, 1);
test.equal(_this.requests[0], '/url1');
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url1 contents');
cb();
});
Expand All @@ -268,7 +272,7 @@ exports.tests = {
_this.cache.openReadStream({ url: _this.createUrl('/url1'), etagFormat: 'md5' }, function(err, stream, path) {
test.equal(_this.requests.length, 2);
test.equal(_this.requests[1], '/url1');
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url1 contents');
cb();
});
Expand Down Expand Up @@ -358,7 +362,7 @@ exports.tests = {
},

testConcurrentRequests: function(test) {
test.expect(4);
test.expect(2);
var _this = this;
var count = 2;

Expand All @@ -367,8 +371,7 @@ exports.tests = {
if (count === 0) { test.done(); }
};
var cb = function(err, stream, path) {
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url1 contents');
barrier();
});
Expand All @@ -379,34 +382,34 @@ exports.tests = {


testBasicCaching: function(test) {
test.expect(8);
test.expect(6);
doTest(this, test, '/url1', 'url1 contents', false, true, 0, test.done);
},

testExplicitNoCache: function(test) {
test.expect(8);
test.expect(6);
doTest(this, test, '/url2', 'url2 contents', false, false, 0, test.done);
},

testUnparseableCacheControl: function(test) {
test.expect(8);
test.expect(6);
doTest(this, test, '/url4', 'url4 contents', false, false, 0, test.done);
},

testNoCache: function(test) {
// URLs without a Cache-Control header don't get cached.
test.expect(8);
test.expect(6);
doTest(this, test, '/url3', 'url3 contents', false, false, 0, test.done);
},

testUnexpiredCache: function(test) {
test.expect(8);
test.expect(6);
// 200 is the maximum allowable age.
doTest(this, test, '/url1', 'url1 contents', false, true, 200, test.done);
},

testExpiredCache: function(test) {
test.expect(8);
test.expect(6);
doTest(this, test, '/url1', 'url1 contents', false, false, 201, test.done);
},

Expand Down Expand Up @@ -691,29 +694,27 @@ exports.tests = {
function doTest(_this, test, url, contents, firstCached, secondCached, deltaT, cb) {
var count = 0;
if (!deltaT) { deltaT = 0; }
_this.cache.openReadStream(_this.createUrl(url), function(err, stream, path) {

_this.cache.getContents(_this.createUrl(url), function(err, buffer, path) {
if (!firstCached) { count++; }
if (!stream) {
test.ok(err, "if stream is null there had better be an error");
if (!buffer) {
test.ok(err, "if buffer is null there had better be an error");
return cb();
}
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
test.ok(fs.existsSync(path));
catStream(stream, function (contents) {
test.equal(contents.toString('utf8'), contents);

test.equal(buffer.toString('utf8'), contents);
test.equal(_this.serverUrls[url].fetchCount, count);
_this.nowSeconds += deltaT;
_this.cache.reset();

_this.cache.getContents(_this.createUrl(url), function(err, buffer, path) {
if (!secondCached) { count++; }

test.ok(fs.existsSync(path));
test.equal(_this.serverUrls[url].fetchCount, count);
_this.nowSeconds += deltaT;
_this.cache.reset();
_this.cache.openReadStream(_this.createUrl(url), function(err, stream, path) {
if (!secondCached) { count++; }
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
test.ok(fs.existsSync(path));
test.equal(_this.serverUrls[url].fetchCount, count);
catStream(stream, function (contents) {
test.equal(contents.toString('utf8'), contents);
cb();
});
});
test.equal(buffer.toString('utf8'), contents);
cb();
});
});

Expand Down