diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..c6c8b36 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,9 @@ +root = true + +[*] +indent_style = space +indent_size = 2 +end_of_line = lf +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = true diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6b2d41a --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.DS_Store +*.log +.nyc_output/ +coverage/ +node_modules/ diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..e7d7f65 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,13 @@ +language: node_js +node_js: +- '4.0' +- '7.0' +after_script: bash <(curl -s https://codecov.io/bash) +deploy: + provider: npm + email: tituswormer@gmail.com + api_key: + secure: ZPg2hxMgoFgL9ByaALk+oCOa3EV6z1I6xrqMHhLEx5JVyRcLZYKbz6hbv/wOwizh+LiFN9Lg49bkvbA5c0ouQ82/ghSpinrvgL8hmNucueZ/LPIKbfXn0sKbroJORnzMsA+ycBfZ6w1XS8BVSRpDUpbBqN2tO9JnzofcyUbL7QHSUiWp08umOcMGy2K05mOlx11gGWbS6RKuvkL0RcGyRRL7RO/YN4By7ky4lc6J5ZyQex/T/B2gPWlIhmoUfSbk0vHs6xeXHW8+Cgfjk5Wvb3nEu4z+hpeG3KbgvYyd1G8hYqwd6eS6p6Ntk8jd8/UHJUOmNAFCEk+c4piKk4VOClL48GmbgWkerN2xinSXhg1DOt190YzZeKo3oivYLWPsejs+MRYJ/zCEd2X3hVL62eR57ZMynFp233TQdjHiq4l3XwwZtnFrynh2slTAswJl5SSu1Sye0krlY814BoeWBSrPAeBESXVqV6rtNmd0oMvAqTuqqzA4n4bxFYSJvm+zkqF9AhXXlUm/a5FompNPzIgZDIchBDKBRl9tWHIYsDmU8K46S2D7lApoasgou8Ez5A7PDVKMu4gOY/9QqQlCM3N1FWfpaNFtAsPjHjqJs5Pt5/Hl05yeQ1ao0zLUOwtlQkvcHpuK7tgKuzW6BYSpjvYiQmxi8wT+XarvwIgSRC0= + on: + tags: true + node: '4.0' diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..045ffe0 --- /dev/null +++ b/LICENSE @@ -0,0 +1,22 @@ +(The MIT License) + +Copyright (c) 2017 Titus Wormer + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +'Software'), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/index.js b/index.js new file mode 100644 index 0000000..97712cf --- /dev/null +++ b/index.js @@ -0,0 +1,152 @@ +'use strict'; + +var events = require('events'); +var once = require('once'); +var func = require('is-function'); + +module.exports = stream; + +function stream(processor) { + var chunks = []; + var emitter = new events.EventEmitter(); + var ended = false; + + emitter.processor = processor; + emitter.readable = emitter.writable = true; + emitter.write = write; + emitter.end = end; + emitter.pipe = pipe; + + return emitter; + + /* Write a chunk into memory. */ + function write(chunk, encoding, callback) { + if (func(encoding)) { + callback = encoding; + encoding = null; + } + + if (ended) { + throw new Error('Did not expect `write` after `end`'); + } + + chunks.push((chunk || '').toString(encoding || 'utf8')); + + if (callback) { + callback(); + } + + /* Signal succesful write. */ + return true; + } + + /* End the writing. Passes all arguments to a final + * `write`. Starts the process, which will trigger + * `error`, with a fatal error, if any; `data`, with + * the generated document in `string` form, if + * succesful. If messages are triggered during the + * process, those are triggerd as `warning`s. */ + function end() { + write.apply(null, arguments); + + ended = true; + + processor.process(chunks.join(''), done); + + return true; + + function done(err, file) { + var messages = file ? file.messages : []; + var length = messages.length; + var index = -1; + + chunks = null; + + /* Trigger messages as warnings, except for fatal error. */ + while (++index < length) { + /* istanbul ignore else - shouldn’t happen. */ + if (messages[index] !== err) { + emitter.emit('warning', messages[index]); + } + } + + if (err) { + /* Don’t enter an infinite error throwing loop. */ + global.setTimeout(function () { + emitter.emit('error', err); + }, 4); + } else { + emitter.emit('data', file.contents); + emitter.emit('end'); + } + } + } + + /* Pipe the processor into a writable stream. + * + * Basically `Stream#pipe`, but inlined and + * simplified to keep the bundled size down. + * + * See https://github.com/nodejs/node/blob/master/lib/stream.js#L26. */ + function pipe(dest, options) { + var settings = options || {}; + var onend = once(onended); + + emitter.on('data', ondata); + emitter.on('error', onerror); + emitter.on('end', cleanup); + emitter.on('close', cleanup); + + /* If the 'end' option is not supplied, dest.end() will be + * called when the 'end' or 'close' events are received. + * Only dest.end() once. */ + if (!dest._isStdio && settings.end !== false) { + emitter.on('end', onend); + } + + dest.on('error', onerror); + dest.on('close', cleanup); + + dest.emit('pipe', emitter); + + return dest; + + /* End destination. */ + function onended() { + if (dest.end) { + dest.end(); + } + } + + /* Handle data. */ + function ondata(chunk) { + if (dest.writable) { + dest.write(chunk); + } + } + + /* Clean listeners. */ + function cleanup() { + emitter.removeListener('data', ondata); + emitter.removeListener('end', onend); + emitter.removeListener('error', onerror); + emitter.removeListener('end', cleanup); + emitter.removeListener('close', cleanup); + + dest.removeListener('error', onerror); + dest.removeListener('close', cleanup); + } + + /* Close dangling pipes and handle unheard errors. */ + function onerror(err) { + var handlers = emitter._events.error; + + cleanup(); + + /* Cannot use `listenerCount` in node <= 0.12. */ + if (!handlers || handlers.length === 0 || handlers === onerror) { + throw err; /* Unhandled stream error in pipe. */ + } + } + } +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..d11c908 --- /dev/null +++ b/package.json @@ -0,0 +1,51 @@ +{ + "name": "unified-stream", + "version": "0.0.0", + "description": "Streaming interface for unified processors", + "license": "MIT", + "keywords": [ + "unified", + "stream" + ], + "repository": "https://github.com/unifiedjs/unified-stream", + "bugs": "https://github.com/unifiedjs/unified-stream/issues", + "author": "Titus Wormer (http://wooorm.com)", + "contributors": [ + "Titus Wormer (http://wooorm.com)" + ], + "files": [ + "index.js" + ], + "dependencies": { + "is-function": "^1.0.1", + "once": "^1.4.0" + }, + "devDependencies": { + "nyc": "^10.1.0", + "remark-cli": "^2.0.0", + "remark-preset-wooorm": "^1.0.0", + "tape": "^4.6.3", + "unified": "^6.0.0", + "xo": "^0.17.1" + }, + "scripts": { + "build-md": "remark . -qfo", + "build": "npm run build-md", + "lint": "xo", + "test-api": "node test", + "test-coverage": "nyc --reporter lcov tape test.js", + "test": "npm run build && npm run lint && npm run test-coverage" + }, + "nyc": { + "check-coverage": true, + "lines": 100, + "functions": 100, + "branches": 100 + }, + "xo": { + "space": true + }, + "remarkConfig": { + "presets": "wooorm" + } +} diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..23fce95 --- /dev/null +++ b/readme.md @@ -0,0 +1,55 @@ +# unified-stream [![Build Status][travis-badge]][travis] [![Coverage Status][codecov-badge]][codecov] + +Module to add a streaming interface to [unified][] processors. + +Note that the interface is streaming, but the code buffers. + +## Installation + +[npm][]: + +```bash +npm install unified-stream +``` + +## Usage + +The below example pipes stdin, into an HTML formatter, to stdout. + +```js +var stream = require('unified-stream'); +var rehype = require('rehype'); +var format = require('rehype-format'); + +process.stdin + .pipe(stream(rehype().use(format))) + .pipe(process.stdout); +``` + +## API + +### `createStream(processor)` + +Create a readable/writable stream that transforms with `processor`. + +## License + +[MIT][license] © [Titus Wormer][author] + + + +[travis-badge]: https://img.shields.io/travis/unifiedjs/unified-stream.svg + +[travis]: https://travis-ci.org/unifiedjs/unified-stream + +[codecov-badge]: https://img.shields.io/codecov/c/github/unifiedjs/unified-stream.svg + +[codecov]: https://codecov.io/github/unifiedjs/unified-stream + +[npm]: https://docs.npmjs.com/cli/install + +[license]: LICENSE + +[author]: http://wooorm.com + +[unified]: https://github.com/unifiedjs/unified diff --git a/test.js b/test.js new file mode 100644 index 0000000..c7a6bde --- /dev/null +++ b/test.js @@ -0,0 +1,199 @@ +'use strict'; + +var stream = require('stream'); +var test = require('tape'); +var unified = require('unified'); +var func = require('is-function'); +var createStream = require('./'); + +test('createStream', function (t) { + var proc = unified().use(parse).use(stringify); + + t.test('interface', function (st) { + var tr = createStream(proc); + st.equal(tr.readable, true, 'should be readable'); + st.equal(tr.writable, true, 'should be writable'); + st.ok(func(tr.write), 'should have a `write` method'); + st.ok(func(tr.end), 'should have an `end` method'); + st.ok(func(tr.pipe), 'should have a `pipe` method'); + st.end(); + }); + + t.test('#end and #write', function (st) { + var phase; + var exception; + + st.plan(10); + + st.equal(createStream(proc).end(), true, 'should return true'); + + st.throws( + function () { + var tr = createStream(proc); + tr.end(); + tr.end(); + }, + /^Error: Did not expect `write` after `end`$/, + 'should throw on end after end' + ); + + createStream(proc) + .on('data', function (value) { + st.equal(value, '', 'should emit processed `data`'); + }) + .end(); + + createStream(proc) + .on('data', function (value) { + st.equal(value, 'alpha', 'should emit given `data`'); + }) + .end('alpha'); + + createStream(proc) + .on('data', function (value) { + st.equal(value, 'brC!vo', 'should honour encoding'); + }) + .end(new Buffer([0x62, 0x72, 0xc3, 0xa1, 0x76, 0x6f]), 'ascii'); + + phase = 0; + + createStream(proc) + .on('data', function () { + st.equal(phase, 1, 'should trigger data after callback'); + phase++; + }) + .end('charlie', function () { + st.equal(phase, 0, 'should trigger callback before data'); + phase++; + }); + + exception = new Error('alpha'); + + createStream(proc().use(function () { + return transformer; + function transformer() { + return exception; + } + })) + .on('error', function (err) { + st.equal( + err, + exception, + 'should trigger `error` if an error occurs' + ); + }) + .on('data', /* istanbul ignore next */ function () { + st.fail('should not trigger `data` if an error occurs'); + }) + .end(); + + createStream(proc().use(function () { + return transformer; + function transformer(tree, file) { + file.message(exception); + } + })) + .on('warning', function (err) { + st.equal( + err.reason, + 'alpha', + 'should trigger `warning` if an messages are emitted' + ); + }) + .on('data', function (data) { + st.equal(data, '', 'should not fail if warnings are emitted'); + }) + .end(); + }); + + t.test('#pipe', function (st) { + var tr; + var s; + + st.plan(5); + + st.doesNotThrow( + function () { + /* Not writable. */ + var tr = createStream(proc); + tr.pipe(new stream.Readable()); + tr.end('foo'); + }, + 'should not throw when piping to a non-writable stream' + ); + + tr = createStream(proc); + s = new stream.PassThrough(); + s._isStdio = true; + + tr.pipe(s); + + tr.write('alpha'); + tr.write('bravo'); + tr.end('charlie'); + + st.doesNotThrow( + function () { + s.write('delta'); + }, + 'should not `end` stdio streams' + ); + + tr = createStream(proc).on('error', function (err) { + st.equal( + err.message, + 'Whoops!', + 'should pass errors' + ); + }); + + tr.pipe(new stream.PassThrough()); + tr.emit('error', new Error('Whoops!')); + + tr = createStream(proc); + tr.pipe(new stream.PassThrough()); + + st.throws( + function () { + tr.emit('error', new Error('Whoops!')); + }, + /Whoops!/, + 'should throw if errors are not listened to' + ); + + tr = createStream(proc); + + tr + .pipe(new stream.PassThrough()) + .on('data', function (buf) { + st.equal( + String(buf), + 'alphabravocharlie', + 'should trigger `data` with the processed result' + ); + }) + .on('error', /* istanbul ignore next */ function () { + st.fail('should not trigger `error`'); + }); + + tr.write('alpha'); + tr.write('bravo'); + tr.end('charlie'); + }); +}); + +function parse() { + this.Parser = parser; + + function parser(doc) { + return {type: 'root', value: doc}; + } +} + +function stringify() { + this.Compiler = compiler; + + function compiler(tree) { + return tree.value; + } +}