diff --git a/README.md b/README.md index 4d63011..5abd889 100755 --- a/README.md +++ b/README.md @@ -600,54 +600,52 @@ const { isInfected, viruses } = await clamscan.scanStream(stream); ## .passthrough() -The `passthrough` method returns a PassthroughStream object which allows you pipe a ReadbleStream through it and on to another output. In the case of this module's passthrough implementation, it's actually forking the data to also go to ClamAV via TCP or Domain Sockets. Each data chunk is only passed on to the output if that chunk was successfully sent to and received by ClamAV. The PassthroughStream object returned from this method has a special event that is emitted when ClamAV finishes scanning the streamed data so that you can decide if there's anything you need to do with the final output destination (ex. delete a file or S3 object). +The `passthrough` method returns a PassthroughStream object which allows you pipe a ReadbleStream through it and on to another output. In the case of this module's passthrough implementation, it's actually forking the data to also go to ClamAV via TCP or Domain Sockets. Each data chunk is only passed on to the output if that chunk was successfully sent to and received by ClamAV. The PassthroughStream object returned from this method contains a 'result' property which will be complete at the end of the pipeline and will contain the elements linked to the scanned file. In the case of an infected file, you can decide if there's anything to be done after the pipeline has been completed (ex. delete a file into I/O disk or S3 object). -In typical, non-passthrough setups, a file is uploaded to the local filesytem and then subsequently scanned. With that setup, you have to wait for the upload to complete _and then wait again_ for the scan to complete. Using this module's `passthrough` method, you could theoretically speed up user uploads intended to be scanned by up to 2x because the files are simultaneously scanned and written to any WriteableStream output (examples: filesystem, S3, gzip, etc...). +With this method, the file is both transmitted via TCP socket to clamav and also piped in an output. However, clamAV waits to collect the file chunks before scanning the entire file. As for these theoretical gains, your mileage my vary and I'd love to hear feedback on this to see where things can still be improved. -Please note that this method is different than all the others in that it returns a PassthroughStream object and does not support a Promise or Callback API. This makes sense once you see the example below (a practical working example can be found in the examples directory of this module): - ### Example ```javascript const NodeClam = require('clamscan'); // You'll need to specify your socket or TCP connection info -const clamscan = new NodeClam().init({ - clamdscan: { - socket: '/var/run/clamd.scan/clamd.sock', - host: '127.0.0.1', - port: 3310, - } -}); - -// For example's sake, we're using the Axios module -const axios = require('Axios'); +(async() => { + const clamscan = new NodeClam().init({ + clamdscan: { + socket: '/var/run/clamd.scan/clamd.sock', + host: '127.0.0.1', + port: 3310, + } + }); + + // For example's sake, we're using the Axios module + const axios = require('Axios'); + + // Get a readable stream for a URL request + const input = axios.get(some_url); + + // Create a writable stream to a local file + const output = fs.createWriteStream(some_local_file); + + // Get instance of this module's PassthroughStream object + const av = await clamscan.passthrough(); + + // Send output of Axios stream to ClamAV. + // Send output of Axios to `some_local_file` if ClamAV receives data successfully + await pipeline(input, av, output) + const { isInfected, viruses, timeout } = av.result; -// Get a readable stream for a URL request -const input = axios.get(some_url); - -// Create a writable stream to a local file -const output = fs.createWriteStream(some_local_file); - -// Get instance of this module's PassthroughStream object -const av = clamscan.passthrough(); - -// Send output of Axios stream to ClamAV. -// Send output of Axios to `some_local_file` if ClamAV receives data successfully -input.pipe(av).pipe(output); - -// What happens when scan is completed -av.on('scan-complete', result => { - const { isInfected, viruses } = result; - // Do stuff if you want -}); + if (isInfected) { + throw new Error(`...${viruses.join("|")}`); + } -// What happens when data has been fully written to `output` -output.on('finish', () => { - // Do stuff if you want -}); + if (timeout) { + throw new Error("..."); + } +})() // NOTE: no errors (or other events) are being handled in this example but standard errors will be emitted according to NodeJS's Stream specifications ``` diff --git a/examples/passthrough.js b/examples/passthrough.js index 68f81ec..f72f705 100755 --- a/examples/passthrough.js +++ b/examples/passthrough.js @@ -1,100 +1,68 @@ // eslint-disable-next-line import/no-extraneous-dependencies const axios = require('axios'); -const fs = require('fs'); -const { promisify } = require('util'); +const { pipeline } = require("stream/promises") +const { Writable, Readable } = require('stream'); -const fsUnlink = promisify(fs.unlink); - -// const fakeVirusUrl = 'https://secure.eicar.org/eicar_com.txt'; -const normalFileUrl = 'https://raw.githubusercontent.com/kylefarris/clamscan/sockets/README.md'; -// const largeFileUrl = 'http://speedtest-ny.turnkeyinternet.net/100mb.bin'; -const passthruFile = `${__dirname}/output`; - -const testUrl = normalFileUrl; -// const testUrl = fakeVirusUrl; -// const testUrl = largeFileUrl; +const testUrl = { + fakeVirusUrl: 'https://raw.githubusercontent.com/fire1ce/eicar-standard-antivirus-test-files/master/eicar-test.txt', + normalFileUrl: 'https://raw.githubusercontent.com/kylefarris/clamscan/master/examples/passthrough.js' +}; // Initialize the clamscan module const NodeClam = require('../index.js'); // Offically: require('clamscan'); -/** - * Removes whatever file was passed-through during the scan. - */ -async function removeFinalFile() { - try { - await fsUnlink(passthruFile); - console.log(`Output file: "${passthruFile}" was deleted.`); - process.exit(1); - } catch (err) { - console.error(err); - process.exit(1); - } -} - /** * Actually run the example code. */ async function test() { const clamscan = await new NodeClam().init({ - debugMode: true, clamdscan: { host: 'localhost', port: 3310, bypassTest: true, + timeout: 30000 // socket: '/var/run/clamd.scan/clamd.sock', }, }); - const input = axios.get(testUrl); - const output = fs.createWriteStream(passthruFile); - const av = clamscan.passthrough(); - - input.pipe(av).pipe(output); - - av.on('error', (error) => { - if ('data' in error && error.data.isInfected) { - console.error('Dang, your stream contained a virus(es):', error.data.viruses); - } else { - console.error(error); + const input = await axios.get(testUrl.fakeVirusUrl); + // output can be a fs.createWriteStream + const output = new Writable({ + write(chunk, _, cb) { + cb(null, chunk); } - removeFinalFile(); }) - .on('timeout', () => { - console.error('It looks like the scanning has timedout.'); - process.exit(1); - }) - .on('finish', () => { - console.log('All data has been sent to virus scanner'); - }) - .on('end', () => { - console.log('All data has been scanned sent on to the destination!'); - }) - .on('scan-complete', (result) => { - console.log('Scan Complete: Result: ', result); - if (result.isInfected === true) { - console.log( - `You've downloaded a virus (${result.viruses.join( - ', ' - )})! Don't worry, it's only a test one and is not malicious...` - ); - } else if (result.isInfected === null) { - console.log(`There was an issue scanning the file you downloaded...`); - } else { - console.log(`The file (${testUrl}) you downloaded was just fine... Carry on...`); - } - removeFinalFile(); - process.exit(0); - }); - - output.on('finish', () => { - console.log('Data has been fully written to the output...'); - output.destroy(); - }); output.on('error', (error) => { console.log('Final Output Fail: ', error); process.exit(1); }); + + try { + const av = await clamscan.passthrough(); + + await pipeline(Readable.from(input.data), av, output) + const { isInfected, viruses, timeout } = av.result; + + if (isInfected === null) { + console.log(`There was an issue scanning the file you downloaded...`); + } + + if (isInfected === true) { + console.log( + `You've downloaded a virus (${viruses.join( + ', ' + )})! Don't worry, it's only a test one and is not malicious...` + ); + } + + if (timeout === true) { + console.error('It looks like the scanning has timedout.'); + } + } catch (error) { + // handle errors + // Can be piped error, or connexion error + } } test(); diff --git a/index.js b/index.js index bdbe0f4..87af077 100755 --- a/index.js +++ b/index.js @@ -14,7 +14,7 @@ const nodePath = require('path'); // renamed to prevent conflicts in `scanDir` const tls = require('tls'); const { promisify } = require('util'); const { execFile } = require('child_process'); -const { PassThrough, Transform, Readable } = require('stream'); +const { Transform, Readable } = require('stream'); const { Socket } = require('dgram'); const NodeClamError = require('./lib/NodeClamError'); const NodeClamTransform = require('./lib/NodeClamTransform'); @@ -1153,302 +1153,176 @@ class NodeClam { /** * Returns a PassthroughStream object which allows you to - * pipe a ReadbleStream through it and on to another output. In the case of this + * pipe a ReadableStream through it and on to another output. In the case of this * implementation, it's actually forking the data to also * go to ClamAV via TCP or Domain Sockets. Each data chunk is only passed on to * the output if that chunk was successfully sent to and received by ClamAV. - * The PassthroughStream object returned from this method has a special event - * that is emitted when ClamAV finishes scanning the streamed data (`scan-complete`) - * so that you can decide if there's anything you need to do with the final output - * destination (ex. delete a file or S3 object). + * The PassthroughStream object returned from this method contains a 'result' property + * which will be complete at the end of the pipeline and will contain the elements linked to the scanned file + * In the case of an infected file, you can decide if there's anything to be done after + * the pipeline has been completed (ex. delete a file into I/O disk or S3 object). * * @returns {Transform} A Transform stream for piping a Readable stream into * @example * const NodeClam = require('clamscan'); + * const { pipeline } = require("stream/promises"); + * + * (async() => { + * // You'll need to specify your socket or TCP connection info + * const clamscan = await new NodeClam().init({ + * clamdscan: { + * socket: '/var/run/clamd.scan/clamd.sock', + * host: '127.0.0.1', + * port: 3310, + * } + * }); * - * // You'll need to specify your socket or TCP connection info - * const clamscan = new NodeClam().init({ - * clamdscan: { - * socket: '/var/run/clamd.scan/clamd.sock', - * host: '127.0.0.1', - * port: 3310, - * } - * }); - * - * // For example's sake, we're using the Axios module - * const axios = require('axios'); + * // For example's sake, we're using the Axios module + * const axios = require('axios'); * - * // Get a readable stream for a URL request - * const input = axios.get(someUrl); + * // Get a readable stream for a URL request + * const input = axios.get(someUrl); * - * // Create a writable stream to a local file - * const output = fs.createWriteStream(someLocalFile); + * // Create a writable stream to a local file + * const output = fs.createWriteStream(someLocalFile); * - * // Get instance of this module's PassthroughStream object - * const av = clamscan.passthrough(); + * // Get instance of this module's PassthroughStream object + * const av = await clamscan.passthrough(); * - * // Send output of Axios stream to ClamAV. - * // Send output of Axios to `someLocalFile` if ClamAV receives data successfully - * input.pipe(av).pipe(output); + * // Send output of Axios stream to ClamAV. + * // Send output of Axios to `someLocalFile` if ClamAV receives data successfully + * await pipeline(input, av, output); * - * // What happens when scan is completed - * av.on('scan-complete', result => { - * const {isInfected, viruses} = result; - * // Do stuff if you want - * }); - * - * // What happens when data has been fully written to `output` - * output.on('finish', () => { - * // Do stuff if you want - * }); + * // What happens when scan is completed + * const {isInfected, viruses} = av.result; + * if (isInfected) { + * // Do something (ex. remove the file and throw a specific error) + * } + * })() * * // NOTE: no errors (or other events) are being handled in this example but standard errors will be emitted according to NodeJS's Stream specifications */ - passthrough() { + async passthrough() { const me = this; - // A chunk counter for debugging - let _scanComplete = false; - let _avWaiting = null; - let _avScanTime = false; - - // DRY method for clearing the interval and counter related to scan times - const clearScanBenchmark = () => { - if (_avWaiting) clearInterval(_avWaiting); - _avWaiting = null; - _avScanTime = 0; - }; - // Return a Transform stream so this can act as a "man-in-the-middle" - // for the streaming pipeline. - // Ex. uploadStream.pipe().pipe(destination_stream) - return new Transform({ - // This should be fired on each chunk received - transform(chunk, encoding, cb) { - // DRY method for handling each chunk as it comes in - const doTransform = () => { - // Write data to our fork stream. If it fails, - // emit a 'drain' event - if (!this._forkStream.write(chunk)) { - this._forkStream.once('drain', () => { - cb(null, chunk); - }); - } else { - // Push data back out to whatever is listening (if anything) - // and let Node know we're ready for more data - cb(null, chunk); - } - }; + let clamAVSocket = null; + try { + // Get a socket + clamAVSocket = await me._initSocket('passthrough'); + clamAVSocket.write('zINSTREAM\0'); + if (me.settings.debugMode) { + console.log(`${me.debugLabel}: ClamAV Socket Initialized...`); + } + } catch (err) { + // If there's an issue connecting to the ClamAV socket, this is where that's handled + if (me.settings.debugMode) { + console.error(`${me.debugLabel}: Error initiating socket to ClamAV: `, err); + } - // DRY method for handling errors when they arise from the - // ClamAV Socket connection - const handleError = (err, isInfected = null, result = null) => { - this._forkStream.unpipe(); - this._forkStream.destroy(); - this._clamavTransform.destroy(); - if (this._clamavSocket) { - this._clamavSocket.end(); - } - clearScanBenchmark(); + throw err; + } - // Finding an infected file isn't really an error... - if (isInfected === true) { - if (_scanComplete === false) { - _scanComplete = true; - this.emit('scan-complete', result); - } - this.emit('stream-infected', result); // just another way to catch an infected stream - } else { - this.emit('error', err || new NodeClamError(result)); - } - }; + // Return a Transform stream so this can act as a "man-in-the-middle" + // for the streaming pipeline. + // Ex. await pipeline(uploadStream, , destination_stream) + class ClamAVStream extends Transform { + constructor() { + super(); + + this.scanResult = ''; + // Used for execution time. + this.firstChunk = true; + this.startTime = 0; + } - // If we haven't initialized a socket connection to ClamAV yet, - // now is the time... - if (!this._clamavSocket) { - // We're using a PassThrough stream as a middle man to fork the input - // into two paths... (1) ClamAV and (2) The final destination. - this._forkStream = new PassThrough(); - // Instantiate our custom Transform stream that coddles - // chunks into the correct format for the ClamAV socket. - this._clamavTransform = new NodeClamTransform({}, me.settings.debugMode); - // Setup an array to collect the responses from ClamAV - this._clamavResponseChunks = []; - - // Get a connection to the ClamAV Socket - me._initSocket('passthrough').then( - (socket) => { - this._clamavSocket = socket; - - if (me.settings.debugMode) console.log(`${me.debugLabel}: ClamAV Socket Initialized...`); - - // Setup a pipeline that will pass chunks through our custom Tranform and on to ClamAV - this._forkStream.pipe(this._clamavTransform).pipe(this._clamavSocket); - - // When the CLamAV socket connection is closed (could be after 'end' or because of an error)... - this._clamavSocket - .on('close', (hadError) => { - if (me.settings.debugMode) - console.log( - `${me.debugLabel}: ClamAV socket has been closed! Because of Error:`, - hadError - ); - this._clamavSocket.end(); - }) - // When the ClamAV socket connection ends (receives chunk) - .on('end', () => { - this._clamavSocket.end(); - if (me.settings.debugMode) - console.log(`${me.debugLabel}: ClamAV socket has received the last chunk!`); - // Process the collected chunks - const response = Buffer.concat(this._clamavResponseChunks); - const result = me._processResult(response.toString('utf8'), null); - this._clamavResponseChunks = []; - if (me.settings.debugMode) { - console.log(`${me.debugLabel}: Result of scan:`, result); - console.log( - `${me.debugLabel}: It took ${_avScanTime} seconds to scan the file(s).` - ); - clearScanBenchmark(); - } - - // If the scan timed-out - if (result.timeout === true) this.emit('timeout'); - - // NOTE: "scan-complete" could be called by the `handleError` method. - // We don't want to to double-emit this message. - if (_scanComplete === false) { - _scanComplete = true; - this._clamavSocket.end(); - this.emit('scan-complete', result); - } - }) - // If connection timesout. - .on('timeout', () => { - this.emit('timeout', new Error('Connection to host/socket has timed out')); - this._clamavSocket.end(); - if (me.settings.debugMode) - console.log(`${me.debugLabel}: Connection to host/socket has timed out`); - }) - // When the ClamAV socket is ready to receive packets (this will probably never fire here) - .on('ready', () => { - if (me.settings.debugMode) - console.log(`${me.debugLabel}: ClamAV socket ready to receive`); - }) - // When we are officially connected to the ClamAV socket (probably will never fire here) - .on('connect', () => { - if (me.settings.debugMode) - console.log(`${me.debugLabel}: Connected to ClamAV socket`); - }) - // If an error is emitted from the ClamAV socket - .on('error', (err) => { - console.error(`${me.debugLabel}: Error emitted from ClamAV socket: `, err); - handleError(err); - }) - // If ClamAV is sending stuff to us (ie, an "OK", "Virus FOUND", or "ERROR") - .on('data', (cvChunk) => { - // Push this chunk to our results collection array - this._clamavResponseChunks.push(cvChunk); - if (me.settings.debugMode) - console.log(`${me.debugLabel}: Got result!`, cvChunk.toString()); - - // Parse what we've gotten back from ClamAV so far... - const response = Buffer.concat(this._clamavResponseChunks); - const result = me._processResult(response.toString(), null); - - // If there's an error supplied or if we detect a virus or timeout, stop stream immediately. - if ( - result instanceof NodeClamError || - (typeof result === 'object' && - (('isInfected' in result && result.isInfected === true) || - ('timeout' in result && result.timeout === true))) - ) { - // If a virus is detected... - if ( - typeof result === 'object' && - 'isInfected' in result && - result.isInfected === true - ) { - handleError(null, true, result); - } - - // If a timeout is detected... - else if ( - typeof result === 'object' && - 'isInfected' in result && - result.isInfected === true - ) { - this.emit('timeout'); - handleError(null, false, result); - } - - // If any other kind of error is detected... - else { - handleError(result); - } - } - // For debugging purposes, spit out what was processed (if anything). - else if (me.settings.debugMode) - console.log( - `${me.debugLabel}: Processed Result: `, - result, - response.toString() - ); - }); - - if (me.settings.debugMode) console.log(`${me.debugLabel}: Doing initial transform!`); - // Handle the chunk - doTransform(); - }, - (err) => { - // Close socket if it's currently valid - if ( - this._clamavSocket && - 'readyState' in this._clamavSocket && - this._clamavSocket.readyState - ) { - this._clamavSocket.end(); - } + // Process result to get the complete object + get result() { + return me._processResult(this.scanResult, null); + } - // If there's an issue connecting to the ClamAV socket, this is where that's handled - if (me.settings.debugMode) - console.error(`${me.debugLabel}: Error initiating socket to ClamAV: `, err); - handleError(err); - } - ); - } else { - // if (me.settings.debugMode) console.log(`${me.debugLabel}: Doing transform: ${++counter}`); - // Handle the chunk - doTransform(); + // This code was inspired by the clamav.js package by "yongtang" + // https://github.com/yongtang/clamav.js + // Specific Transform extension that coddles + // chunks into the correct format for a ClamAV socket. + transform(chunk) { + const chunkSize = Buffer.alloc(4); + chunkSize.writeUInt32BE(chunk ? chunk.length : 0, 0); + clamAVSocket.write(chunkSize); + + if (chunk) { + clamAVSocket.write(chunk); } - }, + } + + // This should be fired on each chunk received + _transform(chunk, encoding, cb) { + if (this.firstChunk) { + // To get an execution time in the _flush method + this.firstChunk = false; + this.startTime = performance.now(); + } + + this.transform(chunk); + cb(null, chunk); + } // This is what is called when the input stream has dried up - flush(cb) { + _flush(cb) { if (me.settings.debugMode) console.log(`${me.debugLabel}: Done with the full pipeline.`); - // Keep track of how long it's taking to scan a file.. - _avWaiting = null; - _avScanTime = 0; - if (me.settings.debugMode) { - _avWaiting = setInterval(() => { - _avScanTime += 1; - if (_avScanTime % 5 === 0) - console.log(`${me.debugLabel}: ClamAV has been scanning for ${_avScanTime} seconds...`); - }, 1000); - } + this.transform(); - // @todo: Investigate why this needs to be done in order - // for the ClamAV socket to be closed (why NodeClamTransform's - // `_flush` method isn't getting called) - // If the incoming stream is empty, transform() won't have been called, so we won't - // have a socket here. - if (this._clamavSocket && this._clamavSocket.writable === true) { - const size = Buffer.alloc(4); - size.writeInt32BE(0, 0); - this._clamavSocket.write(size, cb); - } - }, - }); + clamAVSocket + // 'data' event is triggered once because ClamDScan generates a report after all requested + // scanning has been completed by the daemon. + // https://docs.clamav.net/manual/Usage/Scanning.html?highlight=INSTREAM#clamdscan + .on('data', (cvChunk) => { + this.scanResult = cvChunk.toString(); + + if (me.settings.debugMode) { + console.log( + `${me.debugLabel}: Processed Result: `, + this.result, + this.scanResult + ); + // Keep track of how long it's taking to scan a file.. + const executionTime = ((performance.now() - this.startTime)).toFixed(0); + console.log(`${me.debugLabel}: ClamAV has been scanning for ${executionTime} ms`); + } + cb(); + }) + // When the socket closes prematurely + .on('close', (err) => { + if (me.settings.debugMode) { + console.log(`${me.debugLabel}: ClamAV socket has been closed! Because of Error: ${err}`); + } + cb(err); + }) + // When the ClamAV socket connection ends + .on('end', () => { + if (me.settings.debugMode) { + console.log(`${me.debugLabel}: ClamAV socket has received the last chunk!`); + } + }) + // If connection timeout. + .on('timeout', () => { + this.scanResult = 'COMMAND READ TIMED OUT' + if (me.settings.debugMode) { + console.log(`${me.debugLabel}: Connection to host/socket has timed out`); + } + cb(); + }) + // If an error is emitted from the ClamAV socket + .on('error', (err) => { + if (me.settings.debugMode) { + console.error(`${me.debugLabel}: Error emitted from ClamAV socket: `, err); + } + cb(err) + }); + } + }; + + return new ClamAVStream(); } /** diff --git a/tests/index.js b/tests/index.js index c56bedb..61cfb93 100755 --- a/tests/index.js +++ b/tests/index.js @@ -1,12 +1,14 @@ /* eslint-disable no-unused-vars */ /* eslint-env mocha */ const fs = require('fs'); +const { pipeline } = require("stream/promises") +const fsPromise = require("fs/promises") const path = require('path'); const { Agent } = require('https'); const axios = require('axios'); const chai = require('chai'); const { promisify } = require('util'); -const { PassThrough, Readable } = require('stream'); +const { PassThrough, Readable, Writable } = require('stream'); const chaiAsPromised = require('chai-as-promised'); const eicarGen = require('./eicargen'); @@ -1584,59 +1586,40 @@ describe('passthrough', () => { port: 65535, }, }); - - const input = fs.createReadStream(goodScanFile); - const output = fs.createWriteStream(passthruFile); - const av = clamav.passthrough(); - - input.pipe(av).pipe(output); - if (fs.existsSync(passthruFile)) fs.unlinkSync(passthruFile); - - av.on('error', (err) => { - expect(err).to.be.instanceof(Error); - }); } catch (err) { expect(err).to.be.instanceof(Error); } }); - it('should fire a "scan-complete" event when the stream has been fully scanned and provide a result object that contains "isInfected" and "viruses" properties', (done) => { + it('should get a "result" getter with the required properties after complete file scanning', async () => { const input = eicarGen.getStream(); const output = fs.createWriteStream(passthruFile); - const av = clamscan.passthrough(); + const av = await clamscan.passthrough(); - input.pipe(av).pipe(output); + await pipeline(input, av, output) if (fs.existsSync(passthruFile)) fs.unlinkSync(passthruFile); - av.on('scan-complete', (result) => { - check(done, () => { - expect(result) - .to.be.an('object') - .that.has.all.keys('isInfected', 'viruses', 'file', 'resultString', 'timeout'); - }); - }); + expect(av.result) + .to.be.an('object') + .that.has.all.keys('isInfected', 'viruses', 'file', 'resultString', 'timeout'); }); - it('should indicate that a stream was infected in the "scan-complete" event if the stream DOES contain a virus', (done) => { + it('should indicate that a stream was infected in the "result" property if the stream DOES contain a virus', async () => { const input = eicarGen.getStream(); const output = fs.createWriteStream(passthruFile); - const av = clamscan.passthrough(); + const av = await clamscan.passthrough(); - input.pipe(av).pipe(output); + await pipeline(input, av, output) if (fs.existsSync(passthruFile)) fs.unlinkSync(passthruFile); - av.on('scan-complete', (result) => { - check(done, () => { - const { isInfected, viruses } = result; - expect(isInfected).to.be.a('boolean'); - expect(isInfected).to.eql(true); - expect(viruses).to.be.an('array'); - expect(viruses).to.have.length(1); - }); - }); + const { isInfected, viruses } = av.result + expect(isInfected).to.be.a('boolean'); + expect(isInfected).to.eql(true); + expect(viruses).to.be.an('array'); + expect(viruses).to.have.length(1); }); - it('should indicate that a stream was NOT infected in the "scan-complete" event if the stream DOES NOT contain a virus', async () => { + it('should indicate that a stream was NOT infected in the "result" property if the stream DOES NOT contain a virus', async () => { const agent = new Agent({ rejectUnauthorized: false }); const input = await axios({ method: 'get', @@ -1645,88 +1628,79 @@ describe('passthrough', () => { httpsAgent: agent, }); const output = fs.createWriteStream(passthruFile); - const av = clamscan.passthrough(); + const av = await clamscan.passthrough(); - input.data.pipe(av).pipe(output); + await pipeline(input.data, av, output) if (fs.existsSync(passthruFile)) fs.unlinkSync(passthruFile); - av.on('scan-complete', (result) => { - const { isInfected, viruses } = result; - expect(isInfected).to.be.a('boolean'); - expect(isInfected).to.eql(false); - expect(viruses).to.be.an('array'); - expect(viruses).to.have.length(0); - }); + const { isInfected, viruses } = av.result; + expect(isInfected).to.be.a('boolean'); + expect(isInfected).to.eql(false); + expect(viruses).to.be.an('array'); + expect(viruses).to.have.length(0); }); - it('should (for example) have created the file that the stream is being piped to', (done) => { + it('should (for example) have created the file that the stream is being piped to', async () => { const input = fs.createReadStream(goodScanFile); const output = fs.createWriteStream(passthruFile); - const av = clamscan.passthrough(); + const av = await clamscan.passthrough(); - input.pipe(av).pipe(output); + await pipeline(input, av, output); - output.on('finish', () => { - Promise.all([ - expect(fsState(passthruFile), 'get passthru file stats').to.not.be.rejectedWith(Error), - expect(fsReadfile(passthruFile), 'get passthru file').to.not.be.rejectedWith(Error), - ]).should.notify(() => { - if (fs.existsSync(passthruFile)) fs.unlinkSync(passthruFile); - done(); - }); - }); + await expect(fsPromise.access(passthruFile)).to.not.be.rejected; }); - it('should have cleanly piped input to output', () => { + it('should have cleanly piped input to output', async () => { const input = fs.createReadStream(goodScanFile); const output = fs.createWriteStream(passthruFile); - const av = clamscan.passthrough(); + const av = await clamscan.passthrough(); - input.pipe(av).pipe(output); + await pipeline(input, av, output) - output.on('finish', () => { - const origFile = fs.readFileSync(goodScanFile); - const outFile = fs.readFileSync(passthruFile); - if (fs.existsSync(passthruFile)) fs.unlinkSync(passthruFile); + const [origFile, outFile] = await Promise.all([ + fsPromise.readFile(goodScanFile), + fsPromise.readFile(passthruFile), + ]) - expect(origFile).to.eql(outFile); - }); + expect(origFile).to.eql(outFile); }); // https://github.com/kylefarris/clamscan/issues/82 - it('should not throw multiple callback error', (done) => { + it('should not throw multiple callback error', async () => { // To reliably reproduce the issue in the broken code, it's important that this is an async generator // and it emits some chunks larger than the default highWaterMark of 16 KB. // eslint-disable-next-line require-jsdoc - async function* gen(i = 10) { + let i = 10; + async function* gen() { while (i < 25) { i += 1; yield Buffer.from(new Array(i * 1024).fill()); } } - - const input = Readable.from(gen()); - const av = clamscan.passthrough(); + const av = await clamscan.passthrough(); + const output = new Writable({ + write(chunk, _, callback) { + callback(null, chunk) + } + }) // The failure case will throw an error and not finish - input.pipe(av).on('end', done).resume(); + await pipeline(gen, av, output) }); if (!process.env.CI) { - it('should handle a 0-byte file', () => { + it('should handle a 0-byte file', async () => { const input = fs.createReadStream(emptyFile); const output = fs.createWriteStream(passthruFile); - const av = clamscan.passthrough(); - - input.pipe(av).pipe(output); + const av = await clamscan.passthrough(); - output.on('finish', () => { - const origFile = fs.readFileSync(emptyFile); - const outFile = fs.readFileSync(passthruFile); - if (fs.existsSync(passthruFile)) fs.unlinkSync(passthruFile); - - expect(origFile).to.eql(outFile); - }); + await pipeline(input, av, output) + const [origFile, outFile] = await Promise.all([ + fsPromise.readFile(emptyFile), + fsPromise.readFile(passthruFile), + ]) + + expect(origFile).to.eql(outFile); }); } });