Skip to content

Commit

Permalink
Add JSDoc based types
Browse files Browse the repository at this point in the history
  • Loading branch information
wooorm committed Jul 9, 2021
1 parent 0ff5998 commit 25dd07a
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 113 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
coverage/
node_modules/
.DS_Store
*.d.ts
*.log
yarn.lock
255 changes: 166 additions & 89 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,100 +1,164 @@
import events from 'events'
import once from 'once'

/**
* @typedef {import('unified').Processor} Processor
* @typedef {import('unified').ProcessCallback} ProcessCallback
* @typedef {import('vfile').BufferEncoding} Encoding
* @typedef {import('vfile').VFileValue} Value
* @typedef {((error?: Error) => void)} Callback
* @typedef {Omit<NodeJS.ReadableStream & NodeJS.WritableStream, 'read'|'setEncoding'|'pause'|'resume'|'isPaused'|'unpipe'|'unshift'|'wrap'>} MinimalDuplex
*/

import {EventEmitter} from 'events'

/**
* @param {Processor} processor
* @returns {MinimalDuplex}
*/
export function stream(processor) {
/** @type {string[]} */
let chunks = []
const emitter = new events.EventEmitter()
/** @type {boolean|undefined} */
let ended

emitter.processor = processor
emitter.readable = true
emitter.writable = true
emitter.write = write
emitter.end = end
emitter.pipe = pipe

return emitter

// Write a chunk into memory.
function write(chunk, encoding, callback) {
if (typeof encoding === 'function') {
callback = encoding
encoding = null
}

if (ended) {
throw new Error('Did not expect `write` after `end`')
}

chunks.push((chunk || '').toString(encoding || 'utf8'))

if (callback) {
callback()
}

// Signal succesful write.
return true
}

// End the writing.
// Passes all arguments to a final `write`.
// Starts the process, which will trigger `error`, with a fatal error, if any;
// `data`, with the generated document in `string` form, if succesful.
// If messages are triggered during the process, those are triggerd as
// `warning`s.
function end() {
write(...arguments)

ended = true

processor.process(chunks.join(''), done)

return true
/**
* Write a chunk into memory.
*
* @param {Value} chunk
* @param {Encoding} encoding
* @param {Callback} callback
*/
const write =
/**
* @type {(
* ((value?: Value, encoding?: Encoding, callback?: Callback) => boolean) &
* ((value: Value, callback?: Callback) => boolean)
* )}
*/
(
/**
* @param {Value} [chunk]
* @param {Encoding} [encoding]
* @param {Callback} [callback]
*/
function (chunk, encoding, callback) {
if (typeof encoding === 'function') {
callback = encoding
encoding = undefined
}

function done(error, file) {
const messages = file ? file.messages : []
let index = -1
if (ended) {
throw new Error('Did not expect `write` after `end`')
}

chunks = null
// @ts-expect-error: passing `encoding` to string is fine.
chunks.push((chunk || '').toString(encoding || 'utf8'))

// Trigger messages as warnings, except for fatal error.
while (++index < messages.length) {
/* istanbul ignore else - shouldn’t happen. */
if (messages[index] !== error) {
emitter.emit('warning', messages[index])
if (callback) {
callback()
}

// Signal succesful write.
return true
}
)

/**
* End the writing.
* Passes all arguments to a final `write`.
* Starts the process, which will trigger `error`, with a fatal error, if any;
* `data`, with the generated document in `string` form, if succesful.
* If messages are triggered during the process, those are triggerd as
* `warning`s.
*
* @param {Value} chunk
* @param {Encoding} encoding
* @param {Callback} callback
*/
const end =
/**
* @type {(
* ((value?: Value, encoding?: Encoding, callback?: Callback) => boolean) &
* ((value: Value, callback?: Callback) => boolean)
* )}
*/
(
/**
* @param {Value} [chunk]
* @param {Encoding} [encoding]
* @param {Callback} [callback]
*/
function (chunk, encoding, callback) {
write(chunk, encoding, callback)

processor.process(chunks.join(''), done)

if (error) {
// Don’t enter an infinite error throwing loop.
setTimeout(() => {
emitter.emit('error', error)
}, 4)
} else {
emitter.emit('data', file.value)
emitter.emit('end')
ended = true
return true

/** @type {ProcessCallback} */
function done(error, file) {
const messages = file ? file.messages : []
let index = -1

// @ts-expect-error: clear memory.
chunks = undefined

// Trigger messages as warnings, except for fatal error.
while (++index < messages.length) {
/* istanbul ignore else - shouldn’t happen. */
if (messages[index] !== error) {
emitter.emit('warning', messages[index])
}
}

if (error) {
// Don’t enter an infinite error throwing loop.
setTimeout(() => {
emitter.emit('error', error)
}, 4)
} else {
emitter.emit('data', file.value)
emitter.emit('end')
}
}
}
}
}
)

/** @type {MinimalDuplex} */
// @ts-expect-error `addListener` is fine.
const emitter = Object.assign(new EventEmitter(), {
processor,
writable: true,
readable: true,
write,
end,
pipe
})

// Pipe the processor into a writable stream.
// Basically `Stream#pipe`, but inlined and simplified to keep the bundled
// size down.
// See: <https://github.com/nodejs/node/blob/43a5170/lib/internal/streams/legacy.js#L13>.
function pipe(dest, options) {
const settings = options || {}
const onend = once(onended)
return emitter

/**
* Pipe the processor into a writable stream.
* Basically `Stream#pipe`, but inlined and simplified to keep the bundled
* size down.
* See: <https://github.com/nodejs/node/blob/43a5170/lib/internal/streams/legacy.js#L13>.
*
* @template {NodeJS.WritableStream} T
* @param {T} dest
* @param {{end?: boolean}} [options]
* @returns {T}
*/
function pipe(dest, options) {
emitter.on('data', ondata)
emitter.on('error', onerror)
emitter.on('end', cleanup)
emitter.on('close', cleanup)

// If the `end` option is not supplied, `dest.end()` will be called when the
// `end` or `close` events are received
// Only `dest.end()` once.
if (!dest._isStdio && settings.end !== false) {
emitter.on('end', onend)
// If the `end` option is not supplied, `dest.end()` will be
// called when the `end` or `close` events are received.
// @ts-expect-error `_isStdio` is available on `std{err,out}`
if (!dest._isStdio && (!options || options.end !== false)) {
emitter.on('end', onended)
}

dest.on('error', onerror)
Expand All @@ -104,24 +168,37 @@ export function stream(processor) {

return dest

// End destination.
/**
* End destination.
*
* @returns {void}
*/
function onended() {
if (dest.end) {
dest.end()
}
}

// Handle data.
/**
* Handle data.
*
* @param {Value} chunk
* @returns {void}
*/
function ondata(chunk) {
if (dest.writable) {
dest.write(chunk)
}
}

// Clean listeners.
/**
* Clean listeners.
*
* @returns {void}
*/
function cleanup() {
emitter.removeListener('data', ondata)
emitter.removeListener('end', onend)
emitter.removeListener('end', onended)
emitter.removeListener('error', onerror)
emitter.removeListener('end', cleanup)
emitter.removeListener('close', cleanup)
Expand All @@ -130,16 +207,16 @@ export function stream(processor) {
dest.removeListener('close', cleanup)
}

// Close dangling pipes and handle unheard errors.
/**
* Close dangling pipes and handle unheard errors.
*
* @param {Error?} [error]
* @returns {void}
*/
function onerror(error) {
cleanup()

// Cannot use `listenerCount` in node <= 0.12.
if (
!emitter._events.error ||
emitter._events.error.length === 0 ||
emitter._events.error === onerror
) {
if (!emitter.listenerCount('error')) {
throw error // Unhandled stream error in pipe.
}
}
Expand Down
20 changes: 16 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,33 @@
"sideEffects": false,
"type": "module",
"main": "index.js",
"types": "index.d.ts",
"files": [
"index.d.ts",
"index.js"
],
"dependencies": {
"once": "^1.4.0"
"unified": "^10.0.0-beta.1",
"vfile": "^5.0.0"
},
"devDependencies": {
"@types/tape": "^4.0.0",
"c8": "^7.0.0",
"is-function": "^1.0.0",
"prettier": "^2.0.0",
"remark-cli": "^9.0.0",
"remark-preset-wooorm": "^8.0.0",
"rimraf": "^3.0.2",
"tape": "^5.0.0",
"unified": "^10.0.0-beta.1",
"type-coverage": "^2.0.0",
"typescript": "^4.0.0",
"xo": "^0.39.0"
},
"scripts": {
"build": "rimraf \"*.d.ts\" && tsc && type-coverage",
"format": "remark . -qfo && prettier . -w --loglevel warn && xo --fix",
"test-api": "node --conditions development test.js",
"test-coverage": "c8 --check-coverage --branches 100 --functions 100 --lines 100 --statements 100 --reporter lcov node --conditions development test.js",
"test": "npm run format && npm run test-coverage"
"test": "npm run build && npm run format && npm run test-coverage"
},
"prettier": {
"tabWidth": 2,
Expand All @@ -57,5 +63,11 @@
"plugins": [
"preset-wooorm"
]
},
"typeCoverage": {
"atLeast": 100,
"detail": true,
"strict": true,
"ignoreCatch": true
}
}
Loading

0 comments on commit 25dd07a

Please sign in to comment.