diff --git a/benchmark/streams/transform-by.js b/benchmark/streams/transform-by.js
new file mode 100644
index 00000000000000..8c2afa0732c22f
--- /dev/null
+++ b/benchmark/streams/transform-by.js
@@ -0,0 +1,29 @@
+'use strict';
+
+const common = require('../common');
+const Transform = require('stream').Transform;
+
+const bench = common.createBenchmark(main, {
+ n: [1e6]
+});
+
+function main({ n }) {
+ const s = Transform.by(async function*(source) {
+ for await (const chunk of source) {
+ yield chunk.toUpperCase();
+ }
+ });
+ s.resume();
+
+ bench.start();
+
+ let k = 0;
+ function run() {
+ while (k++ < n && s.write(b));
+ if (k >= n)
+ s.end();
+ }
+ s.on('drain', run);
+ s.on('finish', () => bench.end(n));
+ run();
+}
diff --git a/doc/api/errors.md b/doc/api/errors.md
index 278e3386923e76..cf21c142d6dd97 100644
--- a/doc/api/errors.md
+++ b/doc/api/errors.md
@@ -630,12 +630,6 @@ display if `block` does not throw.
An iterable argument (i.e. a value that works with `for...of` loops) was
required, but not provided to a Node.js API.
-
-### ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE
-
-A function argument that returns an async iterable (i.e. a value that works
-with `for await...of` loops) was required, but not provided to a Node.js API.
-
### ERR_ASSERTION
diff --git a/doc/api/stream.md b/doc/api/stream.md
index 3526f869578c8e..c8857812a1a481 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -1646,14 +1646,14 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have
the strings or buffers be iterated to match the other streams semantics
for performance reasons.
-### stream.Transform.by(asyncGeneratorFunction[, options])
+### stream.Transform.by(transform[, options])
-* `asyncGeneratorFunction` {AsyncGeneratorFunction} A mapping function which
-accepts a `source` async iterable which can be used to read incoming data, while
-transformed data is pushed to the stream with the `yield` keyword.
+* `transform` {AsyncGeneratorFunction} A mapping function which accepts a `source`
+async iterable which can be used to read incoming data, while transformed data is
+pushed to the stream with the `yield` keyword.
* `options` {Object} Options provided to `new stream.Transform([options])`.
By default, `Transform.by()` will set `options.objectMode` to `true`,
unless this is explicitly opted out by setting `options.objectMode` to `false`.
diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js
index 0eee13369be307..ed25f7ea069113 100644
--- a/lib/_stream_transform.js
+++ b/lib/_stream_transform.js
@@ -71,22 +71,12 @@ const {
module.exports = Transform;
const {
- ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_TRANSFORM_ALREADY_TRANSFORMING,
ERR_TRANSFORM_WITH_LENGTH_0
} = require('internal/errors').codes;
const Duplex = require('_stream_duplex');
-const AsyncIteratorPrototype = ObjectGetPrototypeOf(
- ObjectGetPrototypeOf(async function* () {}).prototype);
-
-const kSourceIteratorPull = Symbol('kSourceIteratorPull');
-const kSourceIteratorResolve = Symbol('kSourceIteratorResolve');
-const kSourceIteratorChunk = Symbol('kSourceIteratorChunk');
-const kSourceIteratorStream = Symbol('kSourceIteratorStream');
-const kSourceIteratorPump = Symbol('kSourceIteratorPump');
-const kSourceIteratorGrabResolve = Symbol('kSourceIteratorGrabResolve');
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
ObjectSetPrototypeOf(Transform, Duplex);
@@ -232,110 +222,29 @@ function done(stream, er, data) {
return stream.push(null);
}
-function SourceIterator(asyncGeneratorFn, opts) {
- const source = this;
- const result = asyncGeneratorFn(this);
- if (typeof result[Symbol.asyncIterator] !== 'function') {
- throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn');
- }
- const iter = result[Symbol.asyncIterator]();
- if (typeof iter.next !== 'function') {
- throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn');
- }
-
- this[kSourceIteratorPull] = null;
- this[kSourceIteratorChunk] = null;
- this[kSourceIteratorResolve] = null;
- this[kSourceIteratorStream] = new Transform({
- objectMode: true,
- ...opts,
- transform(chunk, encoding, cb) {
- source.encoding = encoding;
- if (source[kSourceIteratorResolve] === null) {
- source[kSourceIteratorChunk] = chunk;
- source[kSourceIteratorPull] = cb;
- return;
- }
- source[kSourceIteratorResolve]({ value: chunk, done: false });
- source[kSourceIteratorResolve] = null;
- cb(null);
- }
- });
- this.encoding = this[kSourceIteratorStream]._transformState.writeencoding;
- this[kSourceIteratorGrabResolve] = (resolve) => {
- this[kSourceIteratorResolve] = resolve;
- };
- const first = iter.next();
- this[kSourceIteratorPump](iter, first);
-}
-
-SourceIterator.prototype[Symbol.asyncIterator] = function() {
- return this;
-};
-
-ObjectSetPrototypeOf(SourceIterator.prototype, AsyncIteratorPrototype);
+const from = require('internal/streams/from');
-SourceIterator.prototype.next = function next() {
- if (this[kSourceIteratorPull] === null || this[kSourceIteratorChunk] === null)
- return new Promise(this[kSourceIteratorGrabResolve]);
+Transform.by = function by(transform, opts) {
+ let _resolve;
+ let _promise = new Promise((resolve) => _resolve = resolve);
- this[kSourceIteratorPull](null);
- const result = Promise.resolve({
- value: this[kSourceIteratorChunk],
- done: false
- });
- this[kSourceIteratorChunk] = null;
- this[kSourceIteratorPull] = null;
- return result;
-};
+ if (typeof transform !== 'function') {
+ throw new ERR_INVALID_ARG_TYPE('transform', ['function'], iterable);
+ }
-SourceIterator.prototype[kSourceIteratorPump] = async function pump(iter, p) {
- const stream = this[kSourceIteratorStream];
- try {
- stream.removeListener('prefinish', prefinish);
- stream.on('prefinish', () => {
- if (this[kSourceIteratorResolve] !== null) {
- this[kSourceIteratorResolve]({ value: undefined, done: true });
- }
- });
- let next = await p;
+ return from(Duplex, transform(async function*() {
while (true) {
- const { done, value } = next;
- if (done) {
- if (value !== undefined) stream.push(value);
-
- // In the event of an early return we explicitly
- // discard any buffered state
- if (stream._writableState.length > 0) {
- const { length } = stream._writableState;
- const { transforming } = stream._transformState;
- stream._writableState.length = 0;
- stream._transformState.transforming = false;
- prefinish.call(stream);
- stream._writableState.length = length;
- stream._transformState.transforming = transforming;
- } else {
- prefinish.call(stream);
- }
- break;
- }
- stream.push(value);
- next = await iter.next();
+ const { chunk, done, cb } = await _promise;
+ if (done) return cb();
+ yield chunk;
+ _promise = new Promise((resolve) => _resolve = resolve);
+ cb();
}
- } catch (err) {
- process.nextTick(() => stream.destroy(err));
- } finally {
- this[kSourceIteratorPull] = null;
- this[kSourceIteratorChunk] = null;
- this[kSourceIteratorResolve] = null;
- this[kSourceIteratorStream] = null;
- }
-};
-
-
-Transform.by = function by(asyncGeneratorFn, opts) {
- const source = new SourceIterator(asyncGeneratorFn, opts);
- const stream = source[kSourceIteratorStream];
-
- return stream;
+ }()), {
+ objectMode: true,
+ autoDestroy: true,
+ ...opts,
+ write: (chunk, encoding, cb) => _resolve({ chunk, done: false, cb }),
+ final: (cb) => _resolve({ done: true, cb })
+ });
};
diff --git a/lib/internal/errors.js b/lib/internal/errors.js
index 39f580a609cdef..ad12d99c7cc49c 100644
--- a/lib/internal/errors.js
+++ b/lib/internal/errors.js
@@ -720,8 +720,6 @@ module.exports = {
// Note: Node.js specific errors must begin with the prefix ERR_
E('ERR_AMBIGUOUS_ARGUMENT', 'The "%s" argument is ambiguous. %s', TypeError);
E('ERR_ARG_NOT_ITERABLE', '%s must be iterable', TypeError);
-E('ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', '%s must return an async iterable',
- TypeError);
E('ERR_ASSERTION', '%s', Error);
E('ERR_ASYNC_CALLBACK', '%s must be a function', TypeError);
E('ERR_ASYNC_TYPE', 'Invalid name for async "type": %s', TypeError);
diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js
index ab6db00a125a0b..34712b3f6c6656 100644
--- a/lib/internal/streams/from.js
+++ b/lib/internal/streams/from.js
@@ -23,11 +23,12 @@ function from(Readable, iterable, opts) {
});
}
- if (iterable && iterable[SymbolAsyncIterator])
+ if (iterable && typeof iterable[SymbolAsyncIterator] === 'function')
iterator = iterable[SymbolAsyncIterator]();
- else if (iterable && iterable[SymbolIterator])
+ else if (iterable && typeof iterable[SymbolIterator] === 'function')
iterator = iterable[SymbolIterator]();
- else
+
+ if (!iterator || typeof iterator.next !== 'function')
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
const readable = new Readable({
diff --git a/test/parallel/test-transform-by.js b/test/parallel/test-transform-by.js
index fe01986b8d1a0b..42991d9537f97b 100644
--- a/test/parallel/test-transform-by.js
+++ b/test/parallel/test-transform-by.js
@@ -5,7 +5,7 @@ const { Readable, Transform } = require('stream');
const { strictEqual } = require('assert');
async function transformBy() {
- const readable = Readable.from('test');
+ const readable = Readable.from('test'.split(''));
async function * mapper(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
@@ -21,7 +21,7 @@ async function transformBy() {
}
async function transformByFuncReturnsObjectWithSymbolAsyncIterator() {
- const readable = Readable.from('test');
+ const readable = Readable.from('test'.split(''));
const mapper = (source) => ({
[Symbol.asyncIterator]() {
return {
@@ -41,26 +41,6 @@ async function transformByFuncReturnsObjectWithSymbolAsyncIterator() {
}
}
-async function
-transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext() {
- const mapper = (source) => ({
- [Symbol.asyncIterator]() {
- return {
- next() {
- const { done, value } = source.next();
- return { done, value: value ? value.toUpperCase() : value };
- }
- };
- }
- });
-
- expectsError(() => Transform.by(mapper), {
- message: 'asyncGeneratorFn must return an async iterable',
- code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE',
- type: TypeError
- });
-}
-
async function transformByObjReturnedWSymbolAsyncIteratorWithNoNext() {
const mapper = () => ({
[Symbol.asyncIterator]() {
@@ -69,8 +49,7 @@ async function transformByObjReturnedWSymbolAsyncIteratorWithNoNext() {
});
expectsError(() => Transform.by(mapper), {
- message: 'asyncGeneratorFn must return an async iterable',
- code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE',
+ code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
});
}
@@ -81,8 +60,7 @@ async function transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction() {
});
expectsError(() => Transform.by(mapper), {
- message: 'asyncGeneratorFn must return an async iterable',
- code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE',
+ code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
});
}
@@ -91,32 +69,13 @@ async function transformByFuncReturnsObjectWithoutSymbolAsyncIterator() {
const mapper = () => ({});
expectsError(() => Transform.by(mapper), {
- message: 'asyncGeneratorFn must return an async iterable',
- code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE',
+ code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
});
}
-async function transformByEncoding() {
- const readable = Readable.from('test');
- async function * mapper(source) {
- for await (const chunk of source) {
- strictEqual(source.encoding, 'ascii');
- yield chunk.toUpperCase();
- }
- }
- const stream = Transform.by(mapper);
- stream.setDefaultEncoding('ascii');
- readable.pipe(stream);
-
- const expected = ['T', 'E', 'S', 'T'];
- for await (const chunk of stream) {
- strictEqual(chunk, expected.shift());
- }
-}
-
async function transformBySourceIteratorCompletes() {
- const readable = Readable.from('test');
+ const readable = Readable.from('test'.split(''));
const mustReach = mustCall();
async function * mapper(source) {
for await (const chunk of source) {
@@ -134,7 +93,7 @@ async function transformBySourceIteratorCompletes() {
}
async function transformByYieldPlusReturn() {
- const readable = Readable.from('test');
+ const readable = Readable.from('test'.split(''));
async function * mapper(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
@@ -151,7 +110,7 @@ async function transformByYieldPlusReturn() {
}
async function transformByReturnEndsStream() {
- const readable = Readable.from('test');
+ const readable = Readable.from('test'.split(''));
async function * mapper(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
@@ -170,7 +129,7 @@ async function transformByReturnEndsStream() {
}
async function transformByOnData() {
- const readable = Readable.from('test');
+ const readable = Readable.from('test'.split(''));
async function * mapper(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
@@ -191,7 +150,7 @@ async function transformByOnData() {
}
async function transformByOnDataNonObject() {
- const readable = Readable.from('test', { objectMode: false });
+ const readable = Readable.from('test'.split(''), { objectMode: false });
async function * mapper(source) {
for await (const chunk of source) {
yield chunk.toString().toUpperCase();
@@ -212,7 +171,7 @@ async function transformByOnDataNonObject() {
}
async function transformByOnErrorAndDestroyed() {
- const stream = Readable.from('test').pipe(Transform.by(
+ const stream = Readable.from('test'.split('')).pipe(Transform.by(
async function * mapper(source) {
for await (const chunk of source) {
if (chunk === 'e') throw new Error('kaboom');
@@ -230,7 +189,7 @@ async function transformByOnErrorAndDestroyed() {
}
async function transformByErrorTryCatchAndDestroyed() {
- const stream = Readable.from('test').pipe(Transform.by(
+ const stream = Readable.from('test'.split('')).pipe(Transform.by(
async function * mapper(source) {
for await (const chunk of source) {
if (chunk === 'e') throw new Error('kaboom');
@@ -250,7 +209,7 @@ async function transformByErrorTryCatchAndDestroyed() {
}
async function transformByOnErrorAndTryCatchAndDestroyed() {
- const stream = Readable.from('test').pipe(Transform.by(
+ const stream = Readable.from('test'.split('')).pipe(Transform.by(
async function * mapper(source) {
for await (const chunk of source) {
if (chunk === 'e') throw new Error('kaboom');
@@ -286,17 +245,15 @@ async function transformByThrowPriorToForAwait() {
strictEqual(err.message, 'kaboom');
}));
- read.pipe(stream);
+ read.pipe(stream).resume();
}
Promise.all([
transformBy(),
transformByFuncReturnsObjectWithSymbolAsyncIterator(),
- transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext(),
transformByObjReturnedWSymbolAsyncIteratorWithNoNext(),
transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction(),
transformByFuncReturnsObjectWithoutSymbolAsyncIterator(),
- transformByEncoding(),
transformBySourceIteratorCompletes(),
transformByYieldPlusReturn(),
transformByReturnEndsStream(),