Skip to content

Commit

Permalink
Change to replace Buffer with Uint8Array
Browse files Browse the repository at this point in the history
  • Loading branch information
wooorm committed Aug 22, 2023
1 parent bf96d15 commit 90a2bb9
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 30 deletions.
56 changes: 39 additions & 17 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/**
* @typedef {import('unified').Processor} Processor
* @typedef {import('unified').ProcessCallback} ProcessCallback
* @typedef {import('vfile').Value} Value
*/

/**
Expand All @@ -18,8 +19,6 @@
* Configuration (optional).
* @property {boolean | null | undefined} [end=false]
* Whether to `end` if the other stream ends (default: `false`).
*
* @typedef {Buffer | string} Value
*/

import {EventEmitter} from 'node:events'
Expand All @@ -34,9 +33,11 @@ import {EventEmitter} from 'node:events'
* Duplex stream.
*/
export function stream(processor) {
/** @type {Array<string>} */
let chunks = []
let ended = false
/** @type {Array<string>} */
const chunks = []
const decoder = new TextDecoder()
let flush = false

/** @type {MinimalDuplex} */
// @ts-expect-error: `addEventListener` is fine.
Expand All @@ -46,7 +47,6 @@ export function stream(processor) {
emitter.pipe = pipe
emitter.readable = true
emitter.writable = true
// @ts-expect-error: fine.
emitter.write = write

return emitter
Expand Down Expand Up @@ -93,14 +93,34 @@ export function stream(processor) {
throw new Error('Did not expect `write` after `end`')
}

chunks.push(
typeof chunk === 'string'
? chunk
: chunk
? // @ts-expect-error: to do: uint8array.
chunk.toString(encoding || undefined)
: ''
)
if (typeof chunk === 'string') {
if (flush) chunks.push(decoder.decode())
chunks.push(chunk)
flush = false
} else if (chunk) {
let u8 = chunk

// See: <https://nodejs.org/api/util.html#whatwg-supported-encodings>
if (
encoding === null ||
encoding === 'utf8' ||
// eslint-disable-next-line unicorn/text-encoding-identifier-case
encoding === 'utf-8' ||
encoding === 'unicode-1-1-utf-8'
) {
encoding = undefined
}

if (u8) {
// Another encoding, turn into UTF-8.
if (encoding) {
u8 = new TextEncoder().encode(new TextDecoder(encoding).decode(u8))
}

chunks.push(decoder.decode(u8, {stream: true}))
flush = true
}
}

if (callback) {
callback()
Expand Down Expand Up @@ -163,7 +183,12 @@ export function stream(processor) {

write(chunk, encoding, callback)

processor.process(chunks.join(''), done)
if (flush) chunks.push(decoder.decode())
const combined = chunks.join('')
// Clear memory.
chunks.length = 0

processor.process(combined, done)

emitter.emit('end')
ended = true
Expand All @@ -174,9 +199,6 @@ export function stream(processor) {
const messages = file ? file.messages : []
let index = -1

// @ts-expect-error: clear memory.
chunks = undefined

// Trigger messages as warnings, except for fatal error.
while (++index < messages.length) {
if (messages[index] !== error) {
Expand Down
89 changes: 76 additions & 13 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/

import assert from 'node:assert/strict'
import {Buffer} from 'node:buffer'
import nodeStream from 'node:stream'
import test from 'node:test'
import {unified} from 'unified'
Expand Down Expand Up @@ -77,13 +76,71 @@ test('#end and #write', async function (t) {
await t.test('should honour encoding', async function () {
let called = false

// @ts-expect-error: TS is wrong on streams.
stream(proc)
.on('data', function (/** @type {string} */ value) {
assert.equal(value, 'brC!vo')
called = true
})
.end(Buffer.from([0x62, 0x72, 0xc3, 0xa1, 0x76, 0x6f]), 'ascii')
const s = stream(proc)

s.on('data', function (/** @type {string} */ value) {
assert.equal(value, 'abc')
called = true
})

// @ts-expect-error: TS is wrong on encoding.
s.end(new Uint8Array([0x61, 0x00, 0x62, 0x00, 0x63, 0x00]), 'utf-16le')

assert.equal(called, true)
})

await t.test('should support separate typed arrays', async function () {
let called = false

const tr = stream(proc).on('data', function (/** @type {string} */ value) {
assert.equal(value, '€')
called = true
})

tr.write(new Uint8Array([0xe2]))
tr.write(new Uint8Array([0x82]))
tr.write(new Uint8Array([0xac]))
tr.end()

assert.equal(called, true)
})

await t.test('should support separate typed arrays', async function () {
const family = '👨‍👨‍👧‍👦'
let called = false

const tr = stream(proc).on('data', function (/** @type {string} */ value) {
assert.equal(value, family)
called = true
})

let index = -1

while (++index < family.length) {
tr.write(family.slice(index, index + 1))
}

tr.end()

assert.equal(called, true)
})

await t.test('should support mixed data', async function () {
let called = false

const tr = stream(proc).on('data', function (/** @type {string} */ value) {
assert.equal(value, 'abcd')
called = true
})

// @ts-expect-error: TS is wrong on encoding.
tr.write(new Uint8Array([0x61, 0x00]), 'utf-16le')
// @ts-expect-error: TS is wrong on encoding.
tr.write(new Uint8Array([0x00, 0x62]), 'utf-16be')
// @ts-expect-error: TS is wrong on encoding.
tr.write(new Uint8Array([0x63]), 'unicode-1-1-utf-8')
tr.write('d')
tr.end()

assert.equal(called, true)
})
Expand All @@ -96,7 +153,7 @@ test('#end and #write', async function (t) {
assert.equal(value, 'brávo')
called = true
})
.end(Buffer.from([0x62, 0x72, 0xc3, 0xa1, 0x76, 0x6f]))
.end(new Uint8Array([0x62, 0x72, 0xc3, 0xa1, 0x76, 0x6f]))

assert.equal(called, true)
})
Expand Down Expand Up @@ -274,10 +331,16 @@ test('#pipe', async function (t) {
const tr = stream(proc)

tr.pipe(new nodeStream.PassThrough())
.on('data', function (/** @type {Buffer} */ buf) {
assert.equal(String(buf), 'alphabravocharlie')
called = true
})
.on(
'data',
/**
* @param {import('node:buffer').Buffer} buf
*/
function (buf) {
assert.equal(String(buf), 'alphabravocharlie')
called = true
}
)
.on('error', function () {
assert.fail()
})
Expand Down

0 comments on commit 90a2bb9

Please sign in to comment.