From 606cfec4cc136a5221912705727d7a9af00b63fa Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Sun, 23 Aug 2015 15:22:09 -0600 Subject: [PATCH 01/22] Initial es6 / promise refactor of channel class --- src/channel.js | 89 +++++++++++++++++++++++++++++++++++++++++++++++++ src/deferred.js | 30 +++++++++++++++++ 2 files changed, 119 insertions(+) create mode 100644 src/channel.js create mode 100644 src/deferred.js diff --git a/src/channel.js b/src/channel.js new file mode 100644 index 0000000..d0cedf4 --- /dev/null +++ b/src/channel.js @@ -0,0 +1,89 @@ +import {DeferredTake, DeferredPut} from './deferred' + +const CLOSED_ERROR_MSG = 'Cannot add to closed channel' + +export default class Channel { + static lastDeferredTake = null + + pendingPuts = [] + pendingTakes = [] + values = [] + isClosed = false + isDone = false + empty = {} + + constructor (bufferSize) { + this.bufferSize = parseInt(bufferSize, 10) || 0 + } + + then (onFulfilled, onRejected) { + return this.take.then(onFulfilled, onRejected) + } + + take () { + const deferred = new DeferredTake() + if (this.done()) { + this.resolveEmpty(deferred) + } else if (this.values.length > 0 || this.pendingPuts.length > 0) { + this.resolve(deferred, this.nextValue()) + } else { + this.pendingTakes.push(deferred) + } + return deferred.promise + } + + removeTake (deferred) { + const idx = this.pendingTakes.indexOf(deferred) + if (idx > -1) { + this.pendingTakes.splice(idx, 1) + } + } + + nextValue () { + if (this.pendingPuts.length > 0) { + this.values.push(this.pendingPuts.shift().add()) + } + return this.values.shift() + } + + put (value) { + var deferred = new DeferredPut(value) + if (this.isClosed) { + deferred.reject(new Error(CLOSED_ERRROR_MSG)) + } else if (this.pendingTakes.length > 0) { + this.resolve(this.pendingTakes.shift(), deferred.put()) + } else if (this.values.length < this.bufferSize) { + this.values.push(deferred.put()) + } else { + this.pendingPuts.push(deferred) + } + return deferred.promise + } + + resolve (deferred, value) { + Channel.lastDeferredTake = deferred + deferred.take(value) + this.done() + } + + resolveEmpty (deferred) { + this.resolve(deferred, this.empty) + } + + close () { + this.isClosed = true + let receiver + while (receiver = this.pendingPuts.shift()) { + receiver.error(new Error(CLOSED_ERROR_MSG)) + } + return this.done() + } + + done () { + if (!this.isDone && this.isClosed && this.values.length === 0) { + this.isDone = true + this.pendingTakes.forEach(this.resolveEmpty, this) + } + return this.isDone + } +} diff --git a/src/deferred.js b/src/deferred.js new file mode 100644 index 0000000..6aa1c25 --- /dev/null +++ b/src/deferred.js @@ -0,0 +1,30 @@ +export class DeferredBase { + constructor () { + this.promise = new Promise((resolve, reject) => { + this.resolve = resolve + this.reject = reject + }) + } +} + +export class DeferredTake extends DeferredBase { + take (value) { + if (value instanceof Error) { + this.reject(value) + } else { + this.resolve(value) + } + } +} + +export class DeferredPut extends DeferredBase { + constructor (value) { + super() + this.value = value + } + + put () { + this.resolve() + return this.value + } +} From 1884c9925c40d924a05da31e001adde9bef6c5ad Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Sun, 23 Aug 2015 15:30:22 -0600 Subject: [PATCH 02/22] Add support for dropping and sliding buffers --- src/buffer.js | 41 +++++++++++++++++++++++++++++++++++++++++ src/chan.js | 20 ++++++++++++++++++++ src/channel.js | 17 +++++++---------- 3 files changed, 68 insertions(+), 10 deletions(-) create mode 100644 src/buffer.js create mode 100644 src/chan.js diff --git a/src/buffer.js b/src/buffer.js new file mode 100644 index 0000000..7b57904 --- /dev/null +++ b/src/buffer.js @@ -0,0 +1,41 @@ +export class BufferBase { + values = [] + + constructor (size) { + this.size = parseInt(size, 10) + } + + shift () { + return this.values.shift() + } +} + +export class BufferBlocking extends BufferBase { + push (getValue) { + if (this.values.length < this.size) { + this.values.push(getValue()) + return true + } + return false + } +} + +export class BufferDropping extends BufferBase { + push (getValue) { + const value = getValue() + if (this.values.length < this.size) { + this.values.push(value) + } + return true + } +} + +export class BufferSliding extends BufferBase { + push (getValue) { + this.values.push(getValue()) + if (this.values.length > this.size) { + this.values.unshift() + } + return true + } +} diff --git a/src/chan.js b/src/chan.js new file mode 100644 index 0000000..159032b --- /dev/null +++ b/src/chan.js @@ -0,0 +1,20 @@ +import Channel from './channel' +import {BufferBlocking, BufferSliding, BufferDropping} from './buffer' + +function chan (size) { + return new Channel(new BufferBlocking(size)) +} + +function sliding (size) { + return new Channel(new BufferSliding(size)) +} + +function dropping (size) { + return new Channel(new BufferDropping(size)) +} + +chan.sliding = sliding +chan.dropping = dropping +chan.Channel = Channel + +export default chan diff --git a/src/channel.js b/src/channel.js index d0cedf4..dbc952a 100644 --- a/src/channel.js +++ b/src/channel.js @@ -7,13 +7,12 @@ export default class Channel { pendingPuts = [] pendingTakes = [] - values = [] isClosed = false isDone = false empty = {} - constructor (bufferSize) { - this.bufferSize = parseInt(bufferSize, 10) || 0 + constructor (buffer) { + this.buffer = buffer } then (onFulfilled, onRejected) { @@ -24,7 +23,7 @@ export default class Channel { const deferred = new DeferredTake() if (this.done()) { this.resolveEmpty(deferred) - } else if (this.values.length > 0 || this.pendingPuts.length > 0) { + } else if (this.buffer.length > 0 || this.pendingPuts.length > 0) { this.resolve(deferred, this.nextValue()) } else { this.pendingTakes.push(deferred) @@ -41,9 +40,9 @@ export default class Channel { nextValue () { if (this.pendingPuts.length > 0) { - this.values.push(this.pendingPuts.shift().add()) + this.buffer.push(this.pendingPuts.shift().add()) } - return this.values.shift() + return this.buffer.shift() } put (value) { @@ -52,9 +51,7 @@ export default class Channel { deferred.reject(new Error(CLOSED_ERRROR_MSG)) } else if (this.pendingTakes.length > 0) { this.resolve(this.pendingTakes.shift(), deferred.put()) - } else if (this.values.length < this.bufferSize) { - this.values.push(deferred.put()) - } else { + } else if (!this.buffer.push(deferred.put.bind(deferred))) { this.pendingPuts.push(deferred) } return deferred.promise @@ -80,7 +77,7 @@ export default class Channel { } done () { - if (!this.isDone && this.isClosed && this.values.length === 0) { + if (!this.isDone && this.isClosed && this.buffer.length === 0) { this.isDone = true this.pendingTakes.forEach(this.resolveEmpty, this) } From 80010f871b4a228e68a9b45c8b6edf0013e70105 Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Sun, 23 Aug 2015 16:03:52 -0600 Subject: [PATCH 03/22] Add timeout channel --- src/chan.js | 21 ++++++--------------- src/factory.js | 14 ++++++++++++++ src/timeout.js | 12 ++++++++++++ 3 files changed, 32 insertions(+), 15 deletions(-) create mode 100644 src/factory.js create mode 100644 src/timeout.js diff --git a/src/chan.js b/src/chan.js index 159032b..eb77b3f 100644 --- a/src/chan.js +++ b/src/chan.js @@ -1,20 +1,11 @@ +import {blockingChannel, slidingChannel, droppingChannel} from './factory' import Channel from './channel' -import {BufferBlocking, BufferSliding, BufferDropping} from './buffer' +import timeout from './timeout' -function chan (size) { - return new Channel(new BufferBlocking(size)) -} - -function sliding (size) { - return new Channel(new BufferSliding(size)) -} - -function dropping (size) { - return new Channel(new BufferDropping(size)) -} - -chan.sliding = sliding -chan.dropping = dropping +const chan = blockingChannel +chan.sliding = slidingChannel +chan.dropping = droppingChannel chan.Channel = Channel +chan.timeout = timeout export default chan diff --git a/src/factory.js b/src/factory.js new file mode 100644 index 0000000..6b02710 --- /dev/null +++ b/src/factory.js @@ -0,0 +1,14 @@ +import Channel from './channel' +import {BufferBlocking, BufferSliding, BufferDropping} from './buffer' + +export function blockingChannel (size) { + return new Channel(new BufferBlocking(size)) +} + +export function slidingChannel (size) { + return new Channel(new BufferSliding(size)) +} + +export function droppingChannel (size) { + return new Channel(new BufferDropping(size)) +} diff --git a/src/timeout.js b/src/timeout.js new file mode 100644 index 0000000..db9f098 --- /dev/null +++ b/src/timeout.js @@ -0,0 +1,12 @@ +import {blockingChannel} from './factory' + +export function timeout (ms) { + const ch = blockingChannel() + setTimeout(() => { + try { + ch(true) + ch.close() + } catch (err) {} + }, ms) + return ch +} From 60195230f2c9ea5a5fe0209d77c25c6d43eb0614 Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Sun, 23 Aug 2015 16:05:44 -0600 Subject: [PATCH 04/22] Rename entry file --- src/{chan.js => index.js} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/{chan.js => index.js} (100%) diff --git a/src/chan.js b/src/index.js similarity index 100% rename from src/chan.js rename to src/index.js From eee2023f8229a85a413fec349c870ae011519a62 Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Tue, 22 Mar 2016 11:13:13 -0600 Subject: [PATCH 05/22] implement new select function --- package.json | 7 +++++-- src/buffer.js | 2 +- src/channel.js | 31 ++++++++++++++++++++----------- src/deferred.js | 15 +++------------ src/index.js | 2 ++ src/select.js | 30 ++++++++++++++++++++++++++++++ src/timeout.js | 4 ++-- 7 files changed, 63 insertions(+), 28 deletions(-) create mode 100644 src/select.js diff --git a/package.json b/package.json index c255406..0558f8d 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "description": "A go style channel implementation that works nicely with co", "main": "index.js", "scripts": { - "test": "mocha" + "test": "babel-node --stage=0 node_modules/.bin/blue-tape src/**/*.test.js" }, "repository": { "type": "git", @@ -37,12 +37,15 @@ }, "homepage": "https://github.com/brentburgoyne/chan", "devDependencies": { + "babel": "^5.8.21", + "blue-tape": "^0.1.10", "co": "^3.0.6", "expect.js": "^0.3.1", "mocha": "^1.20.1", "should": "^4.0.4", "sinon": "^1.10.3", "split": "^0.3.0", - "superagent": "^0.18.0" + "superagent": "^0.18.0", + "tap-dot": "^1.0.0" } } diff --git a/src/buffer.js b/src/buffer.js index 7b57904..a404d68 100644 --- a/src/buffer.js +++ b/src/buffer.js @@ -34,7 +34,7 @@ export class BufferSliding extends BufferBase { push (getValue) { this.values.push(getValue()) if (this.values.length > this.size) { - this.values.unshift() + this.values.shift() } return true } diff --git a/src/channel.js b/src/channel.js index dbc952a..1d19f94 100644 --- a/src/channel.js +++ b/src/channel.js @@ -1,10 +1,8 @@ -import {DeferredTake, DeferredPut} from './deferred' +import {default as Deferred, DeferredPut} from './deferred' const CLOSED_ERROR_MSG = 'Cannot add to closed channel' export default class Channel { - static lastDeferredTake = null - pendingPuts = [] pendingTakes = [] isClosed = false @@ -16,14 +14,14 @@ export default class Channel { } then (onFulfilled, onRejected) { - return this.take.then(onFulfilled, onRejected) + return this.take().then(onFulfilled, onRejected) } take () { - const deferred = new DeferredTake() + const deferred = new Deferred() if (this.done()) { this.resolveEmpty(deferred) - } else if (this.buffer.length > 0 || this.pendingPuts.length > 0) { + } else if (this.hasValues()) { this.resolve(deferred, this.nextValue()) } else { this.pendingTakes.push(deferred) @@ -31,16 +29,28 @@ export default class Channel { return deferred.promise } - removeTake (deferred) { + cancelableTake () { + const promise = this.take() + return [ + promise, + () => this.removePendingTake(promise.deferred) + ] + } + + removePendingTake (deferred) { const idx = this.pendingTakes.indexOf(deferred) if (idx > -1) { this.pendingTakes.splice(idx, 1) } } + hasValues () { + return this.buffer.length > 0 || this.pendingPuts.length > 0 + } + nextValue () { if (this.pendingPuts.length > 0) { - this.buffer.push(this.pendingPuts.shift().add()) + this.buffer.push(this.pendingPuts.shift().put()) } return this.buffer.shift() } @@ -48,7 +58,7 @@ export default class Channel { put (value) { var deferred = new DeferredPut(value) if (this.isClosed) { - deferred.reject(new Error(CLOSED_ERRROR_MSG)) + deferred.reject(new Error(CLOSED_ERROR_MSG)) } else if (this.pendingTakes.length > 0) { this.resolve(this.pendingTakes.shift(), deferred.put()) } else if (!this.buffer.push(deferred.put.bind(deferred))) { @@ -58,8 +68,7 @@ export default class Channel { } resolve (deferred, value) { - Channel.lastDeferredTake = deferred - deferred.take(value) + deferred.resolve(value) this.done() } diff --git a/src/deferred.js b/src/deferred.js index 6aa1c25..703e1e9 100644 --- a/src/deferred.js +++ b/src/deferred.js @@ -1,23 +1,14 @@ -export class DeferredBase { +export default class Deferred { constructor () { this.promise = new Promise((resolve, reject) => { this.resolve = resolve this.reject = reject }) + this.promise.deferred = this } } -export class DeferredTake extends DeferredBase { - take (value) { - if (value instanceof Error) { - this.reject(value) - } else { - this.resolve(value) - } - } -} - -export class DeferredPut extends DeferredBase { +export class DeferredPut extends Deferred { constructor (value) { super() this.value = value diff --git a/src/index.js b/src/index.js index eb77b3f..8d9a044 100644 --- a/src/index.js +++ b/src/index.js @@ -1,11 +1,13 @@ import {blockingChannel, slidingChannel, droppingChannel} from './factory' import Channel from './channel' import timeout from './timeout' +import alts from './alts' const chan = blockingChannel chan.sliding = slidingChannel chan.dropping = droppingChannel chan.Channel = Channel chan.timeout = timeout +chan.alts = alts export default chan diff --git a/src/select.js b/src/select.js new file mode 100644 index 0000000..7ab59cb --- /dev/null +++ b/src/select.js @@ -0,0 +1,30 @@ +import Deferred from './deferred' + +function pairs () + +export function select (channels...) { + const deferred = new Deferred() + const nonEmpty = channels.filter(channel => channel.hasValues()) + const cancels = [] + let remaining = channels.length + + const take = channel => { + const [promise, cancel] = channel.cancelableTake() + cancels.push(cancel) + promise.then(value => { + if (value === channel.empty && --remaining > 0) { + return + } + cancels.forEach(fn => fn()) + deferred.resolve([channel, value]) + }) + } + + if (nonEmpty.length > 1) { + take(nonEmpty[Math.random() * nonEmpty.length | 0]) + } else { + channels.forEach(take) + } + + return deferred.promise +} diff --git a/src/timeout.js b/src/timeout.js index db9f098..f9d732c 100644 --- a/src/timeout.js +++ b/src/timeout.js @@ -1,10 +1,10 @@ import {blockingChannel} from './factory' -export function timeout (ms) { +export default function timeout (ms) { const ch = blockingChannel() setTimeout(() => { try { - ch(true) + ch.put(true) ch.close() } catch (err) {} }, ms) From 33131c30857f9abd4a91c668cdef585d4358afdb Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Fri, 8 Jul 2016 12:45:23 -0600 Subject: [PATCH 06/22] updated dependencies and linting --- .babelrc | 6 ++++++ .eslintrc | 7 +++++++ .jshintrc | 24 ------------------------ package.json | 25 ++++++++++++++----------- src/buffer.js | 7 +++++-- src/channel.js | 40 +++++++++++++++++++++++++++------------- src/deferred.js | 9 ++++----- src/index.js | 16 ++++++++-------- 8 files changed, 71 insertions(+), 63 deletions(-) create mode 100644 .babelrc create mode 100644 .eslintrc delete mode 100644 .jshintrc diff --git a/.babelrc b/.babelrc new file mode 100644 index 0000000..d383c3e --- /dev/null +++ b/.babelrc @@ -0,0 +1,6 @@ +{ + "presets": [ + "es2015-node", + "stage-3" + ] +} diff --git a/.eslintrc b/.eslintrc new file mode 100644 index 0000000..74fb26d --- /dev/null +++ b/.eslintrc @@ -0,0 +1,7 @@ +{ + "extends": "standard", + "parser": "babel-eslint", + "env": { + "mocha": true + } +} diff --git a/.jshintrc b/.jshintrc deleted file mode 100644 index 205790a..0000000 --- a/.jshintrc +++ /dev/null @@ -1,24 +0,0 @@ -{ - "curly": true, - "eqeqeq": true, - "forin": true, - "freeze": true, - "indent": 2, - "latedef": "nofunc", - "noarg": true, - "noempty": true, - "quotmark": "single", - "undef": true, - "strict": false, - "trailing": true, - "maxparams": 5, - "maxdepth": 5, - "maxstatements": 15, - "maxcomplexity": 7, - "maxlen": 80, - "asi": true, - "boss": true, - "eqnull": true, - "browser": true, - "node": true -} diff --git a/package.json b/package.json index 0558f8d..52d3898 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,8 @@ "description": "A go style channel implementation that works nicely with co", "main": "index.js", "scripts": { - "test": "babel-node --stage=0 node_modules/.bin/blue-tape src/**/*.test.js" + "test": "eslint src && mocha --compilers js:babel-register 'src/test/*.test.js'", + "test-debug": "mocha debug --compilers js:babel-register 'src/test/*.test.js'" }, "repository": { "type": "git", @@ -37,15 +38,17 @@ }, "homepage": "https://github.com/brentburgoyne/chan", "devDependencies": { - "babel": "^5.8.21", - "blue-tape": "^0.1.10", - "co": "^3.0.6", - "expect.js": "^0.3.1", - "mocha": "^1.20.1", - "should": "^4.0.4", - "sinon": "^1.10.3", - "split": "^0.3.0", - "superagent": "^0.18.0", - "tap-dot": "^1.0.0" + "babel-eslint": "^6.1.1", + "babel-preset-es2015": "^6.9.0", + "babel-preset-es2015-node": "^6.1.0", + "babel-preset-stage-3": "^6.11.0", + "babel-register": "^6.9.0", + "eslint": "^2.13.1", + "eslint-config-standard": "^5.3.1", + "eslint-plugin-promise": "^1.3.2", + "eslint-plugin-standard": "^1.3.2", + "mocha": "^2.5.3", + "mochon": "^1.0.0", + "sinon": "^1.17.4" } } diff --git a/src/buffer.js b/src/buffer.js index a404d68..ce510d1 100644 --- a/src/buffer.js +++ b/src/buffer.js @@ -1,13 +1,16 @@ export class BufferBase { - values = [] - constructor (size) { + this.values = [] this.size = parseInt(size, 10) } shift () { return this.values.shift() } + + hasValues () { + return this.values.length > 0 + } } export class BufferBlocking extends BufferBase { diff --git a/src/channel.js b/src/channel.js index 1d19f94..5b29028 100644 --- a/src/channel.js +++ b/src/channel.js @@ -1,16 +1,16 @@ import {default as Deferred, DeferredPut} from './deferred' const CLOSED_ERROR_MSG = 'Cannot add to closed channel' +const CANCELED_TAKE_ERROR_MSG = 'Pending take from channel was canceled' export default class Channel { - pendingPuts = [] - pendingTakes = [] - isClosed = false - isDone = false - empty = {} - constructor (buffer) { this.buffer = buffer + this.pendingPuts = [] + this.pendingTakes = [] + this.isClosed = false + this.isDone = false + this.empty = {} } then (onFulfilled, onRejected) { @@ -37,17 +37,24 @@ export default class Channel { ] } + /** + * @api private + */ removePendingTake (deferred) { const idx = this.pendingTakes.indexOf(deferred) if (idx > -1) { this.pendingTakes.splice(idx, 1) + deferred.reject(new Error(CANCELED_TAKE_ERROR_MSG)) } } hasValues () { - return this.buffer.length > 0 || this.pendingPuts.length > 0 + return this.buffer.hasValues() || this.pendingPuts.length > 0 } + /** + * @api private + */ nextValue () { if (this.pendingPuts.length > 0) { this.buffer.push(this.pendingPuts.shift().put()) @@ -61,32 +68,39 @@ export default class Channel { deferred.reject(new Error(CLOSED_ERROR_MSG)) } else if (this.pendingTakes.length > 0) { this.resolve(this.pendingTakes.shift(), deferred.put()) - } else if (!this.buffer.push(deferred.put.bind(deferred))) { + } else if (!this.buffer.push(deferred.put)) { this.pendingPuts.push(deferred) } return deferred.promise } + /** + * @api private + */ resolve (deferred, value) { deferred.resolve(value) this.done() } + /** + * @api private + */ resolveEmpty (deferred) { this.resolve(deferred, this.empty) } close () { this.isClosed = true - let receiver - while (receiver = this.pendingPuts.shift()) { - receiver.error(new Error(CLOSED_ERROR_MSG)) - } + const err = new Error(CLOSED_ERROR_MSG) + this.pendingPuts.forEach((deferred) => deferred.reject(err)) return this.done() } + /** + * @api private + */ done () { - if (!this.isDone && this.isClosed && this.buffer.length === 0) { + if (!this.isDone && this.isClosed && !this.buffer.hasValues()) { this.isDone = true this.pendingTakes.forEach(this.resolveEmpty, this) } diff --git a/src/deferred.js b/src/deferred.js index 703e1e9..7b54fa0 100644 --- a/src/deferred.js +++ b/src/deferred.js @@ -12,10 +12,9 @@ export class DeferredPut extends Deferred { constructor (value) { super() this.value = value - } - - put () { - this.resolve() - return this.value + this.put = () => { + this.resolve() + return this.value + } } } diff --git a/src/index.js b/src/index.js index 8d9a044..b39331a 100644 --- a/src/index.js +++ b/src/index.js @@ -2,12 +2,12 @@ import {blockingChannel, slidingChannel, droppingChannel} from './factory' import Channel from './channel' import timeout from './timeout' import alts from './alts' +import select from './select' -const chan = blockingChannel -chan.sliding = slidingChannel -chan.dropping = droppingChannel -chan.Channel = Channel -chan.timeout = timeout -chan.alts = alts - -export default chan +exports = module.exports = blockingChannel +exports.sliding = slidingChannel +exports.dropping = droppingChannel +exports.Channel = Channel +exports.timeout = timeout +exports.alts = alts +exports.select = select From c78fb145cc02ae5badae4291d6c1b4f48a4041f2 Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Fri, 8 Jul 2016 12:46:36 -0600 Subject: [PATCH 07/22] remove old code and tests --- lib/async.js | 44 ------------- lib/channel.js | 163 ----------------------------------------------- lib/interval.js | 31 --------- lib/make.js | 52 --------------- lib/receiver.js | 63 ------------------ lib/select.js | 62 ------------------ lib/timeout.js | 29 --------- test/async.js | 71 --------------------- test/buffered.js | 116 --------------------------------- test/chan.js | 112 -------------------------------- test/close.js | 68 -------------------- test/interval.js | 40 ------------ test/select.js | 143 ----------------------------------------- test/timeout.js | 27 -------- 14 files changed, 1021 deletions(-) delete mode 100644 lib/async.js delete mode 100644 lib/channel.js delete mode 100644 lib/interval.js delete mode 100644 lib/make.js delete mode 100644 lib/receiver.js delete mode 100644 lib/select.js delete mode 100644 lib/timeout.js delete mode 100644 test/async.js delete mode 100644 test/buffered.js delete mode 100644 test/chan.js delete mode 100644 test/close.js delete mode 100644 test/interval.js delete mode 100644 test/select.js delete mode 100644 test/timeout.js diff --git a/lib/async.js b/lib/async.js deleted file mode 100644 index b64e4e5..0000000 --- a/lib/async.js +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Module dependencies. - */ -var Receiver = require('./receiver') - -/** - * Expose `async`. - */ -module.exports = async - -/** - * Add value to channel via node-style async function. - * - * @param {Function} channel - * @param {Function|Object} fn async function or object with async method - * @param {String} method name only if fn is an object - * @param {mixed} args async function arguments without callback - * @return {Function} thunk - */ -function async(ch, fn/*, args...*/) { - var args = [].slice.call(arguments, 2) - var receiver = new Receiver() - var context = null - - if (typeof fn === 'object') { - context = fn - fn = fn[args.shift()] - } - - args.push(function (err, val) { - if (arguments.length > 2) { - val = [].slice.call(arguments, 1) - } - ch(err, val)(function (err) { - receiver[err ? 'error' : 'add'](err) - }) - }) - - fn.apply(context, args) - - return function (cb) { - receiver.callback(cb) - } -} diff --git a/lib/channel.js b/lib/channel.js deleted file mode 100644 index 4998690..0000000 --- a/lib/channel.js +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Module dependencies. - */ -var Receiver = require('./receiver') - -/** - * Expose `Channel`. - */ -module.exports = Channel - -/** - * Constants. - */ -var CLOSED_ERROR_MSG = 'Cannot add to closed channel' - -/** - * Initialize a `Channel`. - * - * @param {Function|Object} [empty=Object] - * @api private - */ -function Channel(bufferSize) { - this.pendingAdds = [] - this.pendingGets = [] - this.items = [] - this.bufferSize = parseInt(bufferSize, 10) || 0 - this.isClosed = false - this.isDone = false - this.empty = {} -} - -/** - * Static reference to the most recently called callback - */ -Channel.lastCalled = null - -/** - * Get an item with `cb`. - * - * @param {Function} cb - * @api private - */ -Channel.prototype.get = function (cb){ - if (this.done()) { - this.callEmpty(cb) - } else if (this.items.length > 0 || this.pendingAdds.length > 0) { - this.call(cb, this.nextItem()) - } else { - this.pendingGets.push(cb) - } -} - -/** - * Remove `cb` from the queue. - * - * @param {Function} cb - * @api private - */ -Channel.prototype.removeGet = function (cb) { - var idx = this.pendingGets.indexOf(cb) - if (idx > -1) { - this.pendingGets.splice(idx, 1) - } -} - -/** - * Get the next item and pull from pendingAdds to fill the buffer. - * - * @return {Mixed} - * @api private - */ -Channel.prototype.nextItem = function () { - if (this.pendingAdds.length > 0) { - this.items.push(this.pendingAdds.shift().add()) - } - return this.items.shift() -} - -/** - * Add `val` to the channel. - * - * @param {Mixed} val - * @return {Function} thunk - * @api private - */ -Channel.prototype.add = function (val){ - var receiver = new Receiver(val) - - if (this.isClosed) { - receiver.error(Error(CLOSED_ERROR_MSG)) - } else if (this.pendingGets.length > 0) { - this.call(this.pendingGets.shift(), receiver.add()) - } else if (this.items.length < this.bufferSize) { - this.items.push(receiver.add()) - } else { - this.pendingAdds.push(receiver) - } - - return function (cb) { - receiver.callback(cb) - } -} - -/** - * Invoke `cb` with `val` facilitate both - * `chan(value)` and the `chan(error, value)` - * use-cases. - * - * @param {Function} cb - * @param {Mixed} val - * @api private - */ -Channel.prototype.call = function (cb, val) { - Channel.lastCalled = this.func - if (val instanceof Error) { - cb(val) - } else { - cb(null, val) - } - this.done() -} - -/** - * Invoke `cb` callback with the empty value. - * - * @param {Function} cb - * @api private - */ -Channel.prototype.callEmpty = function (cb) { - this.call(cb, this.empty) -} - -/** - * Prevennt future values from being added to - * the channel. - * - * @return {Boolean} - * @api public - */ -Channel.prototype.close = function () { - this.isClosed = true - var receiver - while (receiver = this.pendingAdds.shift()) { - receiver.error(Error(CLOSED_ERROR_MSG)) - } - return this.done() -} - -/** - * Check to see if the channel is done and - * call pending callbacks if necessary. - * - * @return {Boolean} - * @api private - */ -Channel.prototype.done = function () { - if (!this.isDone && this.isClosed && this.items.length === 0) { - this.isDone = true - // call each pending callback with the empty value - this.pendingGets.forEach(function (cb) { this.callEmpty(cb) }, this) - } - return this.isDone -} diff --git a/lib/interval.js b/lib/interval.js deleted file mode 100644 index 828997a..0000000 --- a/lib/interval.js +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Module dependencies. - */ -var make = require('./make') - -/** - * Expose `intervalChan`. - */ -module.exports = intervalChan - -/** - * Make a interval channel that receives a count every number of milliseconds. - * - * @param {Number} ms - * @returns {Function} channel - * @api public - */ -function intervalChan(ms) { - var ch = make() - var count = 0; - - var int = setInterval(function () { - try { - ch(++count) - } catch (err) { - clearInterval(int) - } - }, ms) - - return ch -} diff --git a/lib/make.js b/lib/make.js deleted file mode 100644 index c82cb16..0000000 --- a/lib/make.js +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Module dependencies. - */ -var Channel = require('./channel') -var async = require('./async') - -/** - * Expose `make`. - */ -module.exports = make - -/** - * Make a channel. - * - * @param {Number} bufferSize optional default=0 - * @return {Function} - * @api public - */ -function make(bufferSize) { - var chan = new Channel(bufferSize) - - var func = function (a, b) { - // yielded - if (typeof a === 'function') { - return chan.get(a) - } - - // (err, res) - if (a === null && typeof b !== 'undefined') { - a = b - } - - // value - return chan.add(a) - } - - // expose public channel methods - func.close = chan.close.bind(chan) - func.done = chan.done.bind(chan) - - // bind async helper - func.async = async.bind(null, func) - - // expose empty value - func.empty = chan.empty - - // cross reference the channel object and function for internal use - func.__chan = chan - chan.func = func - - return func -} diff --git a/lib/receiver.js b/lib/receiver.js deleted file mode 100644 index 0ee6fb8..0000000 --- a/lib/receiver.js +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Expose `Receiver`. - */ -module.exports = Receiver - -/** - * Initialize a `Receiver`. - * - * @param {Mixed} val - * @api private - */ -function Receiver(val) { - this.val = val - this.isAdded = false - this.err = null - this.cb = null - this.isDone = false -} - -/** - * Call the callback if the pending add is complete. - * - * @api private - */ -Receiver.prototype.attemptNotify = function () { - if ((this.isAdded || this.err) && this.cb && !this.isDone) { - this.isDone = true - setImmediate(function () { this.cb(this.err) }.bind(this)) - } -} - -/** - * Reject the pending add with an error. - * - * @param {Error} err - * @api private - */ -Receiver.prototype.error = function (err) { - this.err = err - this.attemptNotify() -} - -/** - * Get the `val` and set the state of the value to added - * - * @return {Mixed} val - * @api private - */ -Receiver.prototype.add = function () { - this.isAdded = true - this.attemptNotify() - return this.val -} - -/** - * Register the callback. - * - * @api private - */ -Receiver.prototype.callback = function (cb) { - this.cb = cb - this.attemptNotify() -} diff --git a/lib/select.js b/lib/select.js deleted file mode 100644 index 2bd8f59..0000000 --- a/lib/select.js +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Module dependencies. - */ -var make = require('./make') -var Channel = require('./channel') - -/** - * Expose `select`. - */ -module.exports = select - -/** - * Return the first of the given channels with a value. - * - * @param {Function} channels... - * @return {Function} - * @api public - */ -function select(/*channels...*/) { - var selectCh = make(arguments.length) - var chans = [].slice.call(arguments, 0) - var remaining = chans.length - - // get all channels with values waiting - var full = chans.filter(function (ch) { - return ch.__chan.items.length + ch.__chan.pendingAdds.length > 0 - }) - - // define get callback - var get = function (err, value) { - var args = arguments - var ch = Channel.lastCalled - - // don't select an channel returning an empty value, unless it is last - if (value === ch.empty && --remaining > 0) { - return - } - - // remove get callback from all selected channels - chans.forEach(function (ch) { ch.__chan.removeGet(get) }) - - // add temporary selected yieldable function - ch.selected = function (cb) { - delete ch.selected - cb.apply(null, args) - } - - // added the selected channel to the select channel - selectCh(null, ch) - selectCh.close() - } - - if (full.length > 1) { - // multiple channels with waiting values, pick one at random - full[Math.floor(Math.random() * full.length)](get) - } else { - // add get callback to all channels - chans.forEach(function (ch) { ch(get) }) - } - - return selectCh -} diff --git a/lib/timeout.js b/lib/timeout.js deleted file mode 100644 index cf3d960..0000000 --- a/lib/timeout.js +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Module dependencies. - */ -var make = require('./make') - -/** - * Expose `timeoutChan`. - */ -module.exports = timeoutChan - -/** - * Make a timeout channel that receives `true` after a number of milliseconds. - * - * @param {Number} ms - * @returns {Function} channel - * @api public - */ -function timeoutChan(ms) { - var ch = make() - - setTimeout(function () { - try { - ch(true) - ch.close() - } catch(err) {} - }, ms) - - return ch -} diff --git a/test/async.js b/test/async.js deleted file mode 100644 index 0ccf741..0000000 --- a/test/async.js +++ /dev/null @@ -1,71 +0,0 @@ -/* jshint expr:true */ -/* global describe:true, beforeEach:true, afterEach:true, it:true */ - -var async = require('../lib/async') -var should = require('should') -var sinon = require('sinon') - -describe('Async helper', function () { - - var err = {} - var val = {} - var ch - var fn - - beforeEach(function () { - ch = sinon.stub().returns(function (cb) { cb() }) - fn = sinon.stub().yields(err, val) - }) - - it( - 'should return a function with an arity of 1', - function () { - var thunk = async(ch, fn) - thunk.should.be.a.Function - thunk.length.should.be.exactly(1) - } - ) - - it( - 'should call fn with args plus a callback', - function () { - async(ch, fn, 1, 2, 3, 'foo') - var argsWithoutCb = fn.firstCall.args.slice(0, -1) - argsWithoutCb.should.eql([1, 2, 3, 'foo']) - } - ) - - it( - 'should call a method of an object with the third argument as the name', - function () { - var ob = { foo: fn } - async(ch, ob, 'foo', 1, 2, 3) - var argsWithoutCb = fn.firstCall.args.slice(0, -1) - argsWithoutCb.should.eql([1, 2, 3]) - fn.firstCall.calledOn(ob).should.be.true - } - ) - - it( - 'should call channel with arguments of the async function callback', - function () { - async(ch, fn) - ch.firstCall.args.length.should.be.exactly(2) - ch.firstCall.args[0].should.be.exactly(err) - ch.firstCall.args[1].should.be.exactly(val) - } - ) - - it( - 'should call callback given to returned function', - function (done) { - var cb = sinon.spy() - async(ch, fn)(cb) - setImmediate(function () { - cb.callCount.should.be.exactly(1) - done() - }) - } - ) - -}) diff --git a/test/buffered.js b/test/buffered.js deleted file mode 100644 index 8f1e8de..0000000 --- a/test/buffered.js +++ /dev/null @@ -1,116 +0,0 @@ -/* jshint loopfunc: true */ -/* global describe:true, beforeEach:true, it:true */ - -var chan = require('..') -var expect = require('expect.js') - -describe('A unbuffered channel', function () { - - it( - 'should not call the added callback until the value is removed', - function (done) { - var ch = chan(0) // unbuffered - var cbCalled = false - ch('foo')(function () { - cbCalled = true - }) - setImmediate(function () { - expect(cbCalled).to.not.be.ok() - ch(function (err, val) { - setImmediate(function () { - expect(cbCalled).to.be.ok() - done() - }) - }) - }) - } - ) - -}) - -describe('A buffered channel', function () { - - it( - 'should pull values from the buffer when yielded', - function (done) { - var ch = chan(1) - var cbCalled = false - var testValue = 'foo' - ch(testValue) - ch(function (err, val) { - cbCalled = true - expect(val).to.be(testValue) - }) - setImmediate(function () { - expect(cbCalled).to.be.ok() - done() - }) - } - ) - - describe('with a non-full buffer', function () { - - it( - 'should call added callback as soon as it is given to the returned thunk', - function (done) { - var buffer = 3 - var ch = chan(buffer) - var called = 0 - var added = 0 - while (++added <= buffer + 10) { - ch(added)(function (err) { - called++ - }) - } - setImmediate(function () { - expect(called).to.be(buffer) - done() - }) - } - ) - - }) - - describe('with a full buffer', function () { - - it( - 'should not add another value untill a value has been removed', - function (done) { - var ch = chan(1) - var cbCalled = false - ch('foo') - ch('bar')(function () { - cbCalled = true - }) - setImmediate(function () { - expect(cbCalled).to.not.be.ok() - ch(function (err, val) { - setImmediate(function () { - expect(cbCalled).to.be.ok() - done() - }) - }) - }) - } - ) - - it( - 'should call cb with an error when the channel is closed before adding', - function (done) { - var ch = chan(0) - var cbCalled = false - ch('foo')(function (err) { - cbCalled = true - expect(err).to.be.an(Error) - }) - ch.close() - setImmediate(function () { - expect(cbCalled).to.be.ok() - done() - }) - } - ) - - }) - -}) diff --git a/test/chan.js b/test/chan.js deleted file mode 100644 index 010306e..0000000 --- a/test/chan.js +++ /dev/null @@ -1,112 +0,0 @@ -/* global describe:true, beforeEach:true, it:true */ - -var chan = require('..') -var expect = require('expect.js') -var fs = require('fs') - -describe('Channel make', function () { - - it( - 'should return a channel function', - function () { - var ch = chan() - expect(ch).to.be.a(Function) - } - ) - -}) - -describe('A channel', function () { - - var ch - - beforeEach(function () { - ch = chan() - }) - - it( - 'should receive a value of any non-function type as the first argument', - function () { - var typeCases = [ - 1, - 'foo', - [1, 2 , 3], - {foo: 'bar'}, - true, - false, - null, - void 0 - ] - typeCases.forEach(function (val) { - ch(val) - ch(function (err, result) { - expect(result).to.be(val) - }) - }) - } - ) - - it( - 'should receive a function value as a second argument if the first is null', - function () { - ch(null, function () {}) - ch(function (err, result) { - expect(result).to.be.a(Function) - }) - } - ) - - it( - 'should queue values until they are yielded/removed', - function () { - var values = [1, 2, 3, 4, 5] - values.forEach(function (value) { - ch(value) - }) - values.forEach(function (value) { - ch(function (err, result) { - expect(result).to.be(value) - }) - }) - } - ) - - it( - 'should queue callbacks until values are added', - function () { - var values = [1, 2, 3, 4, 5] - values.forEach(function (value) { - ch(function (err, result) { - expect(result).to.be(value) - }) - }) - values.forEach(function (value) { - ch(value) - }) - } - ) - - it( - 'should pass errors as the first argument to callbacks', - function () { - var e = new Error('Foo') - ch(e) - ch(function (err) { - expect(err).to.be(e) - }) - } - ) - - it( - 'should be useable directly as a callback for node style async functions', - function (done) { - ch(function (err, contents) { - expect(err).to.be(null) - expect(contents).to.be.a(Buffer) - done() - }) - fs.readFile(__filename, ch) - } - ) - -}) diff --git a/test/close.js b/test/close.js deleted file mode 100644 index 40dbcb3..0000000 --- a/test/close.js +++ /dev/null @@ -1,68 +0,0 @@ -/* global describe:true, beforeEach:true, it:true */ - -var chan = require('..') -var expect = require('expect.js') - -describe('A closed channel', function () { - - it( - 'should yield an error when attempting to add a value', - function () { - var ch = chan() - ch.close() - ch('foo')(function (err) { - expect(err).to.be.an(Error) - }) - } - ) - - describe('that is has items in the buffer', function () { - - it( - 'should return `false` when the `done()` method is called', - function () { - var ch = chan(1) - ch('foo') - ch.close() - expect(ch.done()).to.be(false) - } - ) - - }) - - describe('that is empty', function () { - - it( - 'should invoke peding callbacks with empty value', - function () { - var ch = chan() - ch(function (err, value) { - expect(value).to.be(ch.empty) - }) - ch.close() - } - ) - - it( - 'should return `true` when the `done()` method is called', - function () { - var ch = chan() - ch.close() - expect(ch.done()).to.be(true) - } - ) - - it( - 'should immediately invoke any callback added with the empty value', - function () { - var ch = chan() - ch.close() - ch(function (err, value) { - expect(value).to.be(ch.empty) - }) - } - ) - - }) - -}) diff --git a/test/interval.js b/test/interval.js deleted file mode 100644 index a836ede..0000000 --- a/test/interval.js +++ /dev/null @@ -1,40 +0,0 @@ -/* jshint expr:true */ -/* global describe:true, beforeEach:true, afterEach:true, it:true */ - -var interval = require('../lib/interval') -var should = require('should') -var sinon = require('sinon') - -describe('Interval channel make', function () { - - it('should return a function', function () { - var int = interval(500) - int.should.be.a.Function - }) - - it('should should call the callback after a number of ms', function () { - var clock = sinon.useFakeTimers() - var cb = sinon.spy() - var ms = 500 - var int = interval(ms) - int(cb) - clock.tick(ms - 1) - cb.called.should.be.false - clock.tick(1) - cb.called.should.be.true - }) - - it('should call the callback after number of ms', function () { - var clock = sinon.useFakeTimers() - var cb = sinon.spy() - var ms = 500 - var int = interval(ms) - int(cb) - clock.tick(ms - 1) - cb.called.should.be.false - clock.tick(1) - cb.called.should.be.true - }) - -}) - diff --git a/test/select.js b/test/select.js deleted file mode 100644 index bbcc251..0000000 --- a/test/select.js +++ /dev/null @@ -1,143 +0,0 @@ -/* jshint loopfunc:true */ -/* global describe:true, beforeEach:true, afterEach:true, it:true */ - -var chan = require('..') -var expect = require('expect.js') - -describe('Channel select', function () { - var random - beforeEach(function (done) { - // save Math.random - random = Math.random - done() - }) - - afterEach(function (done) { - // restore Math.random - Math.random = random - done() - }) - - it( - 'should be able to select on channels', - function (done) { - var ch1 = chan() - var ch2 = chan() - chan.select(ch1, ch2)(function (err, ch) { - expect(ch).to.equal(ch2) - ch2.selected(function (err, val) { - expect(val).to.equal(42) - done() - }) - }) - ch2(42) - } - ) - - it( - 'should be able to select on multiple channels', - function (done) { - var chs = [chan(), chan()] - var remaining = chs.length - chs.forEach(function (needle, i) { - chan.select.apply(null, chs)(function (err, ch) { - expect(ch).to.equal(needle) - ch.selected(function (err, val) { - expect(val).to.equal(i*10) - if (--remaining === 0) { - done() - } - }) - }) - }) - chs.forEach(function (ch, i) { - ch(i*10) - }) - } - ) - - it( - 'should be able to select with queued messages', - function (done) { - var chs = [chan(), chan()] - var remaining = chs.length - var i = -1 - while (++i < 10) { - (function (i) { - chan.select.apply(null, chs)(function (err, ch) { - expect(ch).to.equal(chs[0]) - ch.selected(function (err, val) { - expect(val).to.equal(i * 10) - if (--remaining === 0) { - done() - } - }) - }) - })(i) - } - var j = -1 - while (++j < 10) { - chs[0](j * 10) - } - } - ) - - it( - 'should be able to select with existing messages on the channels', - function (done) { - var ch1 = chan() - var ch2 = chan() - ch2(42) - chan.select(ch1, ch2)(function (err, ch) { - expect(ch).to.equal(ch2) - ch2.selected(function (err, val) { - expect(val).to.equal(42) - done() - }) - }) - } - ) - - it( - 'should randomly choose a channel to return with multiple full channels', - function (done) { - var ch1 = chan() - var ch2 = chan() - - // force the random selection to be the second channel - Math.random = function () { return 0.5 } - - // fill up both the channels - ch1(21) - ch2(42) - - // random selection should choose the second channel "randomly" - chan.select(ch1, ch2)(function (err, ch) { - expect(ch).to.equal(ch2) - ch2.selected(function (err, val) { - expect(val).to.equal(42) - done() - }) - }) - } - ) - - it ( - 'should wait for previously queued callbacks before selecting', - function (done) { - var ch1 = chan() - var ch2 = chan() - - // queue a callback for ch1 - ch1(function () {}) - - chan.select(ch1, ch2)(function (err, ch) { - expect(ch).to.be(ch2) - done() - }) - - ch1(74) - ch2(47) - } - ) -}) diff --git a/test/timeout.js b/test/timeout.js deleted file mode 100644 index 8723ccf..0000000 --- a/test/timeout.js +++ /dev/null @@ -1,27 +0,0 @@ -/* jshint expr:true */ -/* global describe:true, beforeEach:true, afterEach:true, it:true */ - -var timeout = require('../lib/timeout') -var should = require('should') -var sinon = require('sinon') - -describe('Timeout channel make', function () { - - it('should return a function', function () { - var to = timeout(500) - to.should.be.a.Function - }) - - it('should should call the callback after a number of ms', function () { - var clock = sinon.useFakeTimers() - var cb = sinon.spy() - var ms = 500 - var to = timeout(ms) - to(cb) - clock.tick(ms - 1) - cb.called.should.be.false - clock.tick(1) - cb.called.should.be.true - }) - -}) From 361362aabf2f62360d7055fe29ef4d02fd440e60 Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Fri, 8 Jul 2016 12:48:31 -0600 Subject: [PATCH 08/22] add tests for channel class --- package.json | 4 +- test/channel.test.js | 191 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 193 insertions(+), 2 deletions(-) create mode 100644 test/channel.test.js diff --git a/package.json b/package.json index 52d3898..4483c3b 100644 --- a/package.json +++ b/package.json @@ -4,8 +4,8 @@ "description": "A go style channel implementation that works nicely with co", "main": "index.js", "scripts": { - "test": "eslint src && mocha --compilers js:babel-register 'src/test/*.test.js'", - "test-debug": "mocha debug --compilers js:babel-register 'src/test/*.test.js'" + "test": "eslint src test && mocha --compilers js:babel-register 'test/*.test.js'", + "test-debug": "mocha debug --compilers js:babel-register 'test/*.test.js'" }, "repository": { "type": "git", diff --git a/test/channel.test.js b/test/channel.test.js new file mode 100644 index 0000000..ebd67ff --- /dev/null +++ b/test/channel.test.js @@ -0,0 +1,191 @@ +import {ok, equal, deepEqual} from 'assert' +import mochon from 'mochon' +import Channel from '../src/channel' +import Deferred, * as allDeferred from '../src/deferred' +import {BufferBlocking} from '../src/buffer' + +describe('Channel', () => { + const sinon = mochon() + + let ch, buffer, deferred + + beforeEach(() => { + deferred = new Deferred() + buffer = new BufferBlocking(5) + ch = new Channel(buffer) + sinon.stub(allDeferred, 'default').returns(deferred) + }) + + describe('constructor', () => { + it('sets buffer prop from arg', () => { + equal(ch.buffer, buffer) + }) + + it('sets initial properties', () => { + deepEqual(ch.pendingPuts, []) + deepEqual(ch.pendingTakes, []) + equal(ch.isClosed, false) + equal(ch.isDone, false) + deepEqual(ch.empty, {}) + }) + }) + + describe('take', () => { + it('returns deferred promise', () => { + equal(ch.take(), deferred.promise) + }) + + it('resolves with the first value in the buffer', async () => { + const val = {} + ch.put(val) + ch.put({}) + equal(await ch.take(), val) + }) + + it('resolves when a value is added after the take', async () => { + const val = {} + setImmediate(() => ch.put(val)) + equal(await ch.take(), val) + }) + + it('resolve with empty when called on a closed channel', async () => { + ch.close() + equal(await ch.take(), ch.empty) + }) + }) + + describe('then', () => { + it('proxies call to then of promise returned by take', () => { + const ret = {} + const then = sinon.stub().returns(ret) + sinon.stub(ch, 'take').returns({then}) + const onFullfilled = {} + const onRejected = {} + equal(ch.then(onFullfilled, onRejected), ret) + sinon.assert.calledWithExactly(then, onFullfilled, onRejected) + }) + }) + + describe('cancelableTake', () => { + it('returns a promise and a cancel function', () => { + const [promise, cancel] = ch.cancelableTake() + ok(promise instanceof Promise) + equal(typeof cancel, 'function') + }) + + it('promise comes from call to take', () => { + const expected = {} + sinon.stub(ch, 'take').returns(expected) + const [promise] = ch.cancelableTake() + equal(promise, expected) + }) + + it('cancel function removes pending take', async () => { + Deferred.restore() + const [, cancel] = ch.cancelableTake() + const second = ch.take() + cancel() + const val = {} + ch.put(val) + equal(await second, val) + }) + + it('promise for pending take is rejected', async () => { + let rejectedErr + const [promise, cancel] = ch.cancelableTake() + cancel() + try { + await promise + } catch (err) { + rejectedErr = err + } + ok(rejectedErr) + }) + }) + + describe('hasValues', () => { + it('returns true if the buffer has values', () => { + ch.buffer = {hasValues: () => true} + ch.pendingPuts = {length: 0} + ok(ch.hasValues()) + }) + + it('returns true if the channel has pending puts', () => { + ch.buffer = {hasValues: () => false} + ch.pendingPuts = {length: 1} + ok(ch.hasValues()) + }) + + it('returns false if buffer and pending puts are empty', () => { + ch.buffer = {hasValues: () => false} + ch.pendingPuts = {length: 0} + ok(!ch.hasValues()) + }) + }) + + describe('put', () => { + it('resolves the first pending take', async () => { + const take1 = ch.take() + ch.take() + const val = {} + await ch.put(val) + equal(await take1, val) + }) + + it('puts deferred put in the buffer', () => { + sinon.stub(ch.buffer, 'push').returns(true) + const val = {} + ch.put(val) + equal(ch.buffer.push.firstCall.args[0](), val) + }) + + it('puts deferred in pending puts if buffer is full', () => { + sinon.stub(ch.buffer, 'push').returns(false) + const deferredPut = {} + sinon.stub(allDeferred, 'DeferredPut').returns(deferredPut) + ch.put({}) + equal(ch.pendingPuts[0], deferredPut) + }) + + it('is rejected if called on a closed channel', async () => { + ch.close() + let rejectedErr + try { + await ch.put({}) + } catch (err) { + rejectedErr = err + } + ok(rejectedErr) + }) + }) + + describe('close', () => { + it('sets isClosed to true', () => { + ch.close() + ok(ch.isClosed) + }) + + it('sets isDone to true', () => { + ch.close() + ok(ch.isDone) + }) + + it('rejects promises for any pending puts', async () => { + ch = new Channel(new BufferBlocking(0)) + const promise = ch.put({}) + ch.close() + let rejectedErr + try { + await promise + } catch (err) { + rejectedErr = err + } + ok(rejectedErr) + }) + + it('resolves any pending takes with empty value', async () => { + setImmediate(() => ch.close()) + equal(await ch.take(), ch.empty) + }) + }) +}) From 16db99b1adb41c8a9fe2007760c746af70101303 Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Fri, 8 Jul 2016 12:49:42 -0600 Subject: [PATCH 09/22] introduce new select dsl existing functionality moved to alts function --- src/alts.js | 28 ++++++++++++++++++++++++++++ src/select.js | 38 ++++++++++++++------------------------ 2 files changed, 42 insertions(+), 24 deletions(-) create mode 100644 src/alts.js diff --git a/src/alts.js b/src/alts.js new file mode 100644 index 0000000..701d483 --- /dev/null +++ b/src/alts.js @@ -0,0 +1,28 @@ +import Deferred from './deferred' + +export default function alts (...channels) { + const deferred = new Deferred() + const nonEmpty = channels.filter(c => c.hasValues()) + const cancels = [] + let remaining = channels.length + + const take = channel => { + const [promise, cancel] = channel.cancelableTake() + cancels.push(cancel) + promise.then(value => { + if (value === channel.empty && --remaining > 0) { + return + } + cancels.forEach(fn => fn()) + deferred.resolve([channel, value]) + }) + } + + if (nonEmpty.length > 1) { + take(nonEmpty[Math.random() * nonEmpty.length | 0]) + } else { + channels.forEach(take) + } + + return deferred.promise +} diff --git a/src/select.js b/src/select.js index 7ab59cb..b28d835 100644 --- a/src/select.js +++ b/src/select.js @@ -1,30 +1,20 @@ -import Deferred from './deferred' +import alts from './alts' -function pairs () +const DEFAULT = {hasValues: () => false} -export function select (channels...) { - const deferred = new Deferred() - const nonEmpty = channels.filter(channel => channel.hasValues()) - const cancels = [] - let remaining = channels.length - - const take = channel => { - const [promise, cancel] = channel.cancelableTake() - cancels.push(cancel) - promise.then(value => { - if (value === channel.empty && --remaining > 0) { - return - } - cancels.forEach(fn => fn()) - deferred.resolve([channel, value]) - }) +export default function select (...args) { + const handlers = new Map() + for (let i = 0, len = args.length; i < len; i += 2) { + if (args[i + 1]) { + handlers.set(args[i], args[i + 1]) + } else { + handlers.set(DEFAULT, args[i]) + } } - - if (nonEmpty.length > 1) { - take(nonEmpty[Math.random() * nonEmpty.length | 0]) + const channels = handlers.keys() + if (handlers.get(DEFAULT) && !channels.some(c => c.hasValues()).length) { + return Promise.resove(handlers.get(DEFAULT)()) } else { - channels.forEach(take) + return alts(channels).then(([ch, val]) => handlers.get(ch)(val)) } - - return deferred.promise } From c0a3b6f57a50ad789a7db9c02455e2f5f911188a Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Fri, 8 Jul 2016 12:56:33 -0600 Subject: [PATCH 10/22] update node version in travis config --- .travis.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 1932865..3f308e8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,5 @@ language: node_js node_js: - - "0.11" + - "4" + - "5" + - "6" From 23ead8d5bc8f583f250c3d3a7d747fe3892a22e4 Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Sat, 9 Jul 2016 23:33:18 -0600 Subject: [PATCH 11/22] add tests for buffers --- src/buffer.js | 2 +- test/buffer.test.js | 91 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 1 deletion(-) create mode 100644 test/buffer.test.js diff --git a/src/buffer.js b/src/buffer.js index ce510d1..1d6b7a6 100644 --- a/src/buffer.js +++ b/src/buffer.js @@ -1,7 +1,7 @@ export class BufferBase { constructor (size) { this.values = [] - this.size = parseInt(size, 10) + this.size = parseInt(size, 10) || 0 } shift () { diff --git a/test/buffer.test.js b/test/buffer.test.js new file mode 100644 index 0000000..4cec8aa --- /dev/null +++ b/test/buffer.test.js @@ -0,0 +1,91 @@ +import {ok, equal, notEqual, deepEqual} from 'assert' +import * as buffer from '../src/buffer' + +describe('BufferBase', () => { + describe('constructor', () => { + it('sets size from argument', () => { + const size = 47 + const b = new buffer.BufferBase(size) + equal(b.size, size) + }) + + it('sets size to 0 if argument is not a number', () => { + const size = 'cat' + const b = new buffer.BufferBase(size) + equal(b.size, 0) + }) + }) + + describe('shift', () => { + it('removes the first value from the buffer', () => { + const b = new buffer.BufferBase(47) + b.values = [1, 2, 3] + equal(b.shift(), 1) + }) + }) + + describe('hasValues', () => { + it('returns true if the buffer has values', () => { + const b = new buffer.BufferBase(47) + b.values = [1] + ok(b.hasValues()) + }) + + it('returns false if the buffer has no values', () => { + const b = new buffer.BufferBase(47) + b.values = [] + ok(!b.hasValues()) + }) + }) + + describe('BufferBlocking push', () => { + it('puts getValue() on buffer and returns true if there is room', () => { + const b = new buffer.BufferBlocking(1) + const val = {} + equal(b.push(() => val), true) + equal(b.values[0], val) + }) + + it('returns false if there is not room', () => { + const b = new buffer.BufferBlocking(1) + const val = {} + b.push(() => {}) + ok(!b.push(() => val)) + notEqual(b.values[1], val) + }) + }) + + describe('BufferSliding push', () => { + it('puts getValue() on buffer and returns true if there is room', () => { + const b = new buffer.BufferSliding(1) + const val = {} + equal(b.push(() => val), true) + equal(b.values[0], val) + }) + + it('puts getValue() on buffer and removes first value if no room', () => { + const b = new buffer.BufferSliding(2) + b.push(() => 1) + b.push(() => 2) + equal(b.push(() => 3), true) + deepEqual(b.values, [2, 3]) + }) + }) + + describe('BufferDropping push', () => { + it('puts getValue() on buffer and returns true if there is room', () => { + const b = new buffer.BufferDropping(1) + const val = {} + equal(b.push(() => val), true) + equal(b.values[0], val) + }) + + it('does not put value on buffer and returns true if no room', () => { + const b = new buffer.BufferDropping(2) + b.push(() => 1) + b.push(() => 2) + equal(b.push(() => 3), true) + deepEqual(b.values, [1, 2]) + }) + }) +}) From 1dd1ea103d19f669f78b5401f6115a99979cced3 Mon Sep 17 00:00:00 2001 From: haoxin Date: Wed, 16 Nov 2016 18:01:03 +0800 Subject: [PATCH 12/22] scripts: add build and prepublish; fix entry gitignore: add lib fix entry --- .gitignore | 1 + index.js | 27 --------------------------- package.json | 4 +++- 3 files changed, 4 insertions(+), 28 deletions(-) delete mode 100644 index.js diff --git a/.gitignore b/.gitignore index 660471e..0fc3ecc 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ .DS_Store node_modules examples/node_modules +lib diff --git a/index.js b/index.js deleted file mode 100644 index 9a26a95..0000000 --- a/index.js +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Module dependencies. - */ -var make = require('./lib/make') -var select = require('./lib/select') -var timeout = require('./lib/timeout') -var interval = require('./lib/interval') - -/** - * Expose `make`. - */ -module.exports = make - -/** - * Expose `select`. - */ -module.exports.select = select - -/** - * Expose `interval`. - */ -module.exports.interval = interval - -/** - * Expose `timeout`. - */ -module.exports.timeout = timeout diff --git a/package.json b/package.json index 4483c3b..94537f0 100644 --- a/package.json +++ b/package.json @@ -2,8 +2,10 @@ "name": "chan", "version": "0.6.1", "description": "A go style channel implementation that works nicely with co", - "main": "index.js", + "main": "lib/index.js", "scripts": { + "build": "babel src --out-dir lib", + "prepublish": "npm run build", "test": "eslint src test && mocha --compilers js:babel-register 'test/*.test.js'", "test-debug": "mocha debug --compilers js:babel-register 'test/*.test.js'" }, From fc009a7d427eac774f8559b97d701ec6764f9faf Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Wed, 14 Dec 2016 06:39:06 -0700 Subject: [PATCH 13/22] convert src directory to typescript including adding type annotations for everything as well as some refactoring/api changes to make it fit better with static types. why typescript? i really like typescript and wanted to al least publish a typescript definition file with the package. it will be much easier to keep in sync if i write the src in typescript as well. it works out really nice for a lot of things. for example: directional channels (similar to golang) can now be acheived and checked at compile time by using specialized directional interface types `InChannel` and `OutChannel`. chan will continue to be just as usable from javascript. updates to the build and tests as well as removal of the old babel dependencies will come in future commits. this is a big enough chunck already. --- .vscode/settings.json | 5 ++ src/alts.js | 28 -------- src/buffer.js | 44 ------------- src/buffer.ts | 54 ++++++++++++++++ src/channel.js | 109 ------------------------------- src/channel.ts | 145 ++++++++++++++++++++++++++++++++++++++++++ src/deferred.js | 20 ------ src/deferred.ts | 29 +++++++++ src/error.ts | 44 +++++++++++++ src/factory.js | 14 ---- src/factory.ts | 35 ++++++++++ src/index.js | 13 ---- src/index.ts | 12 ++++ src/queue.ts | 34 ++++++++++ src/select.js | 20 ------ src/select.ts | 103 ++++++++++++++++++++++++++++++ src/timeout.js | 12 ---- src/timeout.ts | 17 +++++ 18 files changed, 478 insertions(+), 260 deletions(-) create mode 100644 .vscode/settings.json delete mode 100644 src/alts.js delete mode 100644 src/buffer.js create mode 100644 src/buffer.ts delete mode 100644 src/channel.js create mode 100644 src/channel.ts delete mode 100644 src/deferred.js create mode 100644 src/deferred.ts create mode 100644 src/error.ts delete mode 100644 src/factory.js create mode 100644 src/factory.ts delete mode 100644 src/index.js create mode 100644 src/index.ts create mode 100644 src/queue.ts delete mode 100644 src/select.js create mode 100644 src/select.ts delete mode 100644 src/timeout.js create mode 100644 src/timeout.ts diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..7d6d886 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +// Place your settings in this file to overwrite default and user settings. +{ + "typescript.tsdk": "./node_modules/typescript/lib", + "editor.tabSize": 2 +} \ No newline at end of file diff --git a/src/alts.js b/src/alts.js deleted file mode 100644 index 701d483..0000000 --- a/src/alts.js +++ /dev/null @@ -1,28 +0,0 @@ -import Deferred from './deferred' - -export default function alts (...channels) { - const deferred = new Deferred() - const nonEmpty = channels.filter(c => c.hasValues()) - const cancels = [] - let remaining = channels.length - - const take = channel => { - const [promise, cancel] = channel.cancelableTake() - cancels.push(cancel) - promise.then(value => { - if (value === channel.empty && --remaining > 0) { - return - } - cancels.forEach(fn => fn()) - deferred.resolve([channel, value]) - }) - } - - if (nonEmpty.length > 1) { - take(nonEmpty[Math.random() * nonEmpty.length | 0]) - } else { - channels.forEach(take) - } - - return deferred.promise -} diff --git a/src/buffer.js b/src/buffer.js deleted file mode 100644 index 1d6b7a6..0000000 --- a/src/buffer.js +++ /dev/null @@ -1,44 +0,0 @@ -export class BufferBase { - constructor (size) { - this.values = [] - this.size = parseInt(size, 10) || 0 - } - - shift () { - return this.values.shift() - } - - hasValues () { - return this.values.length > 0 - } -} - -export class BufferBlocking extends BufferBase { - push (getValue) { - if (this.values.length < this.size) { - this.values.push(getValue()) - return true - } - return false - } -} - -export class BufferDropping extends BufferBase { - push (getValue) { - const value = getValue() - if (this.values.length < this.size) { - this.values.push(value) - } - return true - } -} - -export class BufferSliding extends BufferBase { - push (getValue) { - this.values.push(getValue()) - if (this.values.length > this.size) { - this.values.shift() - } - return true - } -} diff --git a/src/buffer.ts b/src/buffer.ts new file mode 100644 index 0000000..882efd4 --- /dev/null +++ b/src/buffer.ts @@ -0,0 +1,54 @@ +import {Queue} from './queue' + +export interface IBuffer { + shift (): T + hasValues (): boolean + push (getValue: () => T): boolean +} + +abstract class BufferBase implements IBuffer { + // TODO: Use ring buffer instead of queue + protected values = new Queue() + + constructor (protected size: number) {} + + public shift (): T { + return this.values.shift() + } + + public hasValues (): boolean { + return this.values.empty() + } + + abstract push (getValue: () => T): boolean +} + +export class BufferBlocking extends BufferBase { + public push (getValue: () => T): boolean { + if (this.values.size() < this.size) { + this.values.push(getValue()) + return true + } + return false + } +} + +export class BufferDropping extends BufferBase { + public push (getValue: () => T): boolean { + const value = getValue() + if (this.values.size() < this.size) { + this.values.push(value) + } + return true + } +} + +export class BufferSliding extends BufferBase { + public push (getValue: () => T): boolean { + this.values.push(getValue()) + if (this.values.size() > this.size) { + this.values.shift() + } + return true + } +} diff --git a/src/channel.js b/src/channel.js deleted file mode 100644 index 5b29028..0000000 --- a/src/channel.js +++ /dev/null @@ -1,109 +0,0 @@ -import {default as Deferred, DeferredPut} from './deferred' - -const CLOSED_ERROR_MSG = 'Cannot add to closed channel' -const CANCELED_TAKE_ERROR_MSG = 'Pending take from channel was canceled' - -export default class Channel { - constructor (buffer) { - this.buffer = buffer - this.pendingPuts = [] - this.pendingTakes = [] - this.isClosed = false - this.isDone = false - this.empty = {} - } - - then (onFulfilled, onRejected) { - return this.take().then(onFulfilled, onRejected) - } - - take () { - const deferred = new Deferred() - if (this.done()) { - this.resolveEmpty(deferred) - } else if (this.hasValues()) { - this.resolve(deferred, this.nextValue()) - } else { - this.pendingTakes.push(deferred) - } - return deferred.promise - } - - cancelableTake () { - const promise = this.take() - return [ - promise, - () => this.removePendingTake(promise.deferred) - ] - } - - /** - * @api private - */ - removePendingTake (deferred) { - const idx = this.pendingTakes.indexOf(deferred) - if (idx > -1) { - this.pendingTakes.splice(idx, 1) - deferred.reject(new Error(CANCELED_TAKE_ERROR_MSG)) - } - } - - hasValues () { - return this.buffer.hasValues() || this.pendingPuts.length > 0 - } - - /** - * @api private - */ - nextValue () { - if (this.pendingPuts.length > 0) { - this.buffer.push(this.pendingPuts.shift().put()) - } - return this.buffer.shift() - } - - put (value) { - var deferred = new DeferredPut(value) - if (this.isClosed) { - deferred.reject(new Error(CLOSED_ERROR_MSG)) - } else if (this.pendingTakes.length > 0) { - this.resolve(this.pendingTakes.shift(), deferred.put()) - } else if (!this.buffer.push(deferred.put)) { - this.pendingPuts.push(deferred) - } - return deferred.promise - } - - /** - * @api private - */ - resolve (deferred, value) { - deferred.resolve(value) - this.done() - } - - /** - * @api private - */ - resolveEmpty (deferred) { - this.resolve(deferred, this.empty) - } - - close () { - this.isClosed = true - const err = new Error(CLOSED_ERROR_MSG) - this.pendingPuts.forEach((deferred) => deferred.reject(err)) - return this.done() - } - - /** - * @api private - */ - done () { - if (!this.isDone && this.isClosed && !this.buffer.hasValues()) { - this.isDone = true - this.pendingTakes.forEach(this.resolveEmpty, this) - } - return this.isDone - } -} diff --git a/src/channel.ts b/src/channel.ts new file mode 100644 index 0000000..e2fbfe1 --- /dev/null +++ b/src/channel.ts @@ -0,0 +1,145 @@ +import {IBuffer} from './buffer' +import {Deferred, DeferredPut, PromiseWithDeferred} from './deferred' +import {CanceledTakeError, ClosedTakeError, ClosedPutError} from './error' +import {Queue} from './queue' + +export interface OutChannel extends PromiseLike { + take (): PromiseWithDeferred + cancelableTake (): [PromiseWithDeferred, () => void] + hasValues (): boolean +} + +export interface InChannel { + put (T): Promise + close (): void +} + +/** + * A buffered asynchronous queue of values which can be used to coordinate + * between multiple async functions. + */ +export class Channel implements InChannel, OutChannel { + private pendingPuts = new Queue>() + private pendingTakes = new Queue>() + private isClosed = false + private isDone = false + + /** + * Creates a new Channel using any buffer that satisfies the required + * interface. + */ + constructor (private buffer: IBuffer) {} + + /** + * A shortcut for calling `then` on a take, allowing a channel to be awated + * directly. If called multiple times each call with trigger a new take + * from the channel. + */ + public then (onFulfilled, onRejected) { + return this.take().then(onFulfilled, onRejected) + } + + /** + * Return a promise that will be resolved with the next value put on the + * channel. If there are existing values in the channel's buffer, the promise + * will be resolved with the first value. + * + * If called on a channel that is closed and has an empty buffer the promise + * will be rejected with a `ClosedTakeError`. + */ + public take (): PromiseWithDeferred { + const deferred = new Deferred() + if (this.done()) { + deferred.reject(new ClosedTakeError()) + } else if (this.hasValues()) { + this.resolve(deferred, this.nextValue()) + } else { + this.pendingTakes.push(deferred) + } + return deferred.promise + } + + /** + * Initial a new take from the channel returning a promise/cancel function + * pair. If the cancel function is called before the promise resolves, the + * take will be canceled and the next value put on the channel with be + * handled by the next take instead. + */ + public cancelableTake (): [PromiseWithDeferred, () => void] { + const promise = this.take() + return [ + promise, + () => this.removePendingTake(promise.deferred) + ] + } + + /** + * Return a boolean indicating if the channel's buffer has values available + * to be taken. + */ + public hasValues (): boolean { + return this.buffer.hasValues() || this.pendingPuts.notEmpty() + } + + /** + * Put a new value on the channel returning a promise that will be resolved + * once the value has be added to the buffer. + * + * If the channel is closed before the value is in the buffer, the returned + * promise will be rejected with a `ClosedPutError`. + */ + public put (value: T): Promise { + var deferred = new DeferredPut(value) + if (this.isClosed) { + deferred.reject(new ClosedPutError()) + } else if (this.pendingTakes.notEmpty()) { + this.resolve(this.pendingTakes.shift(), deferred.put()) + } else if (!this.buffer.push(deferred.put)) { + this.pendingPuts.push(deferred) + } + return deferred.promise + } + + /** + * Mark a channel as closed. All pending puts on the channel will be rejected + * immediately. Pending takes will be rejected once all values in the buffer + * have been taken. + */ + public close (): void { + this.isClosed = true + const err = new ClosedPutError() + while (this.pendingPuts.notEmpty()) { + this.pendingTakes.shift().reject(err) + } + this.done() + } + + private removePendingTake (deferred: Deferred): void { + if (this.pendingTakes.remove(deferred)) { + deferred.reject(new CanceledTakeError()) + } + } + + private nextValue (): T { + if (this.pendingPuts.notEmpty()) { + this.buffer.push(this.pendingPuts.shift().put) + } + return this.buffer.shift() + } + + private resolve (deferred: Deferred, value: T): void { + deferred.resolve(value) + this.done() + } + + private done (): boolean { + if (!this.isDone && this.isClosed && !this.buffer.hasValues()) { + this.isDone = true + const err = new ClosedTakeError() + while (this.pendingTakes.notEmpty()) { + this.pendingTakes.shift().reject(err) + } + } + return this.isDone + } +} diff --git a/src/deferred.js b/src/deferred.js deleted file mode 100644 index 7b54fa0..0000000 --- a/src/deferred.js +++ /dev/null @@ -1,20 +0,0 @@ -export default class Deferred { - constructor () { - this.promise = new Promise((resolve, reject) => { - this.resolve = resolve - this.reject = reject - }) - this.promise.deferred = this - } -} - -export class DeferredPut extends Deferred { - constructor (value) { - super() - this.value = value - this.put = () => { - this.resolve() - return this.value - } - } -} diff --git a/src/deferred.ts b/src/deferred.ts new file mode 100644 index 0000000..8b4aced --- /dev/null +++ b/src/deferred.ts @@ -0,0 +1,29 @@ +export class PromiseWithDeferred extends Promise { + public deferred: Deferred +} + +export class Deferred { + public promise: PromiseWithDeferred + public resolve: (value: T) => void + public reject: (err: Error) => void + + constructor () { + this.promise = new PromiseWithDeferred((resolve, reject) => { + this.resolve = resolve + this.reject = reject + }) + this.promise.deferred = this + } +} + +export class DeferredPut extends Deferred { + public put: () => T + + constructor (private value: T) { + super() + this.put = () => { + this.resolve(null) + return this.value + } + } +} diff --git a/src/error.ts b/src/error.ts new file mode 100644 index 0000000..9ae1423 --- /dev/null +++ b/src/error.ts @@ -0,0 +1,44 @@ +/** + * A custom error type used when rejecting the promise for a canceled take. + */ +export class CanceledTakeError extends Error { + constructor () { + super() + const err = Object.create(CanceledTakeError.prototype) + err.message = 'Pending take from channel was canceled.' + err.name = 'CanceledTakeError' + err.stack = this.stack + return err + } +} + +/** + * A custom error type used when rejecting the promise for a take when the + * channel is closed and there are not any values left in the buffer to be + * taken. + */ +export class ClosedTakeError extends Error { + constructor () { + super() + const err = Object.create(ClosedTakeError.prototype) + err.message = 'Cannot take a value from an empty closed channel.' + err.name = 'ClosedTakeError' + err.stack = this.stack + return err + } +} + +/** + * A custom error type used when rejecting the promise for a put when the + * channel is closed before the value can be put in the buffer. + */ +export class ClosedPutError extends Error { + constructor () { + super() + const err = Object.create(ClosedPutError.prototype) + err.message = 'Cannot put a value on a closed channel.' + err.name = 'ClosedPutError' + err.stack = this.stack + return err + } +} diff --git a/src/factory.js b/src/factory.js deleted file mode 100644 index 6b02710..0000000 --- a/src/factory.js +++ /dev/null @@ -1,14 +0,0 @@ -import Channel from './channel' -import {BufferBlocking, BufferSliding, BufferDropping} from './buffer' - -export function blockingChannel (size) { - return new Channel(new BufferBlocking(size)) -} - -export function slidingChannel (size) { - return new Channel(new BufferSliding(size)) -} - -export function droppingChannel (size) { - return new Channel(new BufferDropping(size)) -} diff --git a/src/factory.ts b/src/factory.ts new file mode 100644 index 0000000..bdd38c0 --- /dev/null +++ b/src/factory.ts @@ -0,0 +1,35 @@ +import {BufferBlocking, BufferSliding, BufferDropping} from './buffer' +import {Channel} from './channel' + +/** + * Create a channel with a blocking buffer of a given size. If the buffer is + * full, calls to `put` will block until there is room in the buffer. Once + * space if available the value will be added to the end of the buffer and + * the returned promise will resolve allowing execution of the calling async + * function to continue. + * + * If called without a buffer size, it will default to `0` and will function as + * an unbuffered channel. Calls to `put` will block until another async + * function calls `take` on the channel. + */ +export function blockingChannel (size: number = 0): Channel { + return new Channel(new BufferBlocking(size)) +} + +/** + * Creata a channel with a sliding buffer of a given size. If the buffer is + * full, calls to `put` will cause the first value in the buffer to drop and + * the new value will be added to the end of the buffer. + */ +export function slidingChannel (size: number): Channel { + return new Channel(new BufferSliding(size)) +} + +/** + * Create a channel with a dropping buffer of a given size. If the buffer is + * full, calls to `put` will be ignored and the value will not be added to the + * channel. + */ +export function droppingChannel (size: number): Channel { + return new Channel(new BufferDropping(size)) +} diff --git a/src/index.js b/src/index.js deleted file mode 100644 index b39331a..0000000 --- a/src/index.js +++ /dev/null @@ -1,13 +0,0 @@ -import {blockingChannel, slidingChannel, droppingChannel} from './factory' -import Channel from './channel' -import timeout from './timeout' -import alts from './alts' -import select from './select' - -exports = module.exports = blockingChannel -exports.sliding = slidingChannel -exports.dropping = droppingChannel -exports.Channel = Channel -exports.timeout = timeout -exports.alts = alts -exports.select = select diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..8557f8d --- /dev/null +++ b/src/index.ts @@ -0,0 +1,12 @@ +import {Channel} from './channel' +import {blockingChannel, slidingChannel, droppingChannel} from './factory' +import {select} from './select' +import {timeout} from './timeout' + +export = Object.assign(blockingChannel, { + sliding: slidingChannel, + dropping: droppingChannel, + Channel, + timeout, + select +}) diff --git a/src/queue.ts b/src/queue.ts new file mode 100644 index 0000000..ccb58b4 --- /dev/null +++ b/src/queue.ts @@ -0,0 +1,34 @@ +// TODO: implement proper queue with O(1) push and shift +// Array based queue has O(n) shifts +export class Queue { + private items: Array = [] + + public push (value: T): void { + this.items.push(value) + } + + public shift (): T { + return this.items.shift(); + } + + public size (): number { + return this.items.length + } + + public empty (): boolean { + return this.size() === 0 + } + + public notEmpty (): boolean { + return this.size() > 0 + } + + public remove (value: T): boolean { + const idx = this.items.indexOf(value) + if (idx > -1) { + this.items.splice(idx, 1) + return true + } + return false + } +} \ No newline at end of file diff --git a/src/select.js b/src/select.js deleted file mode 100644 index b28d835..0000000 --- a/src/select.js +++ /dev/null @@ -1,20 +0,0 @@ -import alts from './alts' - -const DEFAULT = {hasValues: () => false} - -export default function select (...args) { - const handlers = new Map() - for (let i = 0, len = args.length; i < len; i += 2) { - if (args[i + 1]) { - handlers.set(args[i], args[i + 1]) - } else { - handlers.set(DEFAULT, args[i]) - } - } - const channels = handlers.keys() - if (handlers.get(DEFAULT) && !channels.some(c => c.hasValues()).length) { - return Promise.resove(handlers.get(DEFAULT)()) - } else { - return alts(channels).then(([ch, val]) => handlers.get(ch)(val)) - } -} diff --git a/src/select.ts b/src/select.ts new file mode 100644 index 0000000..2a2c2ff --- /dev/null +++ b/src/select.ts @@ -0,0 +1,103 @@ +import {OutChannel} from './channel' +import {Deferred} from './deferred' +import {ClosedTakeError} from './error' +import {timeout} from './timeout' + +/** + * Create a new thenable select with support for chaning `case`, `timeout` and + * `default` methods. + */ +export function select (): Selector { + return new Selector() +} + +/** + * A thenable builder that allows adding handlers for multiple channels as well + * as a timeout or default handler. The thenable resolves once on of the + * handlers has been called. If the handler is asynchronous (returns a promise) + * the selector will resolve once the handler is resolved. + * + * If a default is specified, it will be called immediatly if non of the + * channels have values ready to take. If multiple channels have a pending + * value, one channel will be chosen at random. + */ +export class Selector implements PromiseLike { + private handlers = new Map() + private defaultFn: (() => Promise | void) | undefined + private cancels: Array<() => void> = [] + private deferred = new Deferred() + private remaining = 0 + private executed = false + + /** + * Add a channel to the select along with the function to be called with its + * value if the channel is selected. If the function is an async function or + * any function that returns a promise. The select will not resolve until + * the returned promise resolves. + */ + public case (ch: OutChannel, fn: (T) => Promise | void): this { + this.handlers.set(ch, (value: any) => fn(value as T)) + return this + } + + /** + * A shortcut for adding a case to the select with a timeout channel. If + * another channel in the select does not have a value to take within the + * given number of milliseconds, the timeout handler will be called instead. + */ + public timeout (ms: number, fn: () => Promise | undefined): this { + this.case(timeout(ms), fn) + return this + } + + /** + * Register a handler function to be called if none of the channels have a + * value to take at the time of the select. If the select has a default it + * will always resolve immediatly. + */ + public default (fn: () => Promise | void): this { + this.defaultFn = fn + return this + } + + /** + * Executes the select by taking from one or more channels, or by calling the + * default handler. If a take is initiated from more than one channel, all + * other takes will be canceled after the first one resolves. + */ + public then (onFulfilled, onRejected) { + if (!this.executed) { + this.execute() + } + return this.deferred.promise.then(onFulfilled, onRejected) + } + + private execute ():void { + const channels = Array.from(this.handlers.keys()) + const nonEmpty = channels.filter(c => c.hasValues()) + if (this.defaultFn && nonEmpty.length === 0) { + const done = Promise.resolve(this.defaultFn()) + done.then(this.deferred.resolve, this.deferred.reject) + } else if (nonEmpty.length > 0) { + this.take(nonEmpty[Math.random() * nonEmpty.length | 0]) + } else { + this.remaining = channels.length + channels.forEach(c => this.take(c)) + } + this.executed = true + } + + private take (channel: OutChannel): void { + const [promise, cancel] = channel.cancelableTake() + this.cancels.push(cancel) + promise.then((value) => { + this.cancels.forEach(fn => fn()) + const done = Promise.resolve(this.handlers.get(promise)(value)) + done.then(this.deferred.resolve, this.deferred.reject) + }).catch((err) => { + if (err instanceof ClosedTakeError === false || --this.remaining === 0) { + this.deferred.reject(err) + } + }) + } +} \ No newline at end of file diff --git a/src/timeout.js b/src/timeout.js deleted file mode 100644 index f9d732c..0000000 --- a/src/timeout.js +++ /dev/null @@ -1,12 +0,0 @@ -import {blockingChannel} from './factory' - -export default function timeout (ms) { - const ch = blockingChannel() - setTimeout(() => { - try { - ch.put(true) - ch.close() - } catch (err) {} - }, ms) - return ch -} diff --git a/src/timeout.ts b/src/timeout.ts new file mode 100644 index 0000000..af21a18 --- /dev/null +++ b/src/timeout.ts @@ -0,0 +1,17 @@ +import {blockingChannel} from './factory' +import {Channel} from './channel' + +/** + * Create a new channel that will receive a single value after a given number + * of milliseconds. The channel will be closed and cannot be reused. + */ +export function timeout (ms: number): Channel { + const ch = blockingChannel() + setTimeout(() => { + try { + ch.put(null) + ch.close() + } catch (err) {} + }, ms) + return ch +} From bf8a2ba40d5b5b68d073ca03f429601cf2e0f128 Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Sat, 11 Feb 2017 23:29:16 -0700 Subject: [PATCH 14/22] setup typescript build / config --- .babelrc | 6 ------ .eslintrc | 7 ------- .gitignore | 6 ++---- .vscode/launch.json | 28 ++++++++++++++++++++++++++++ .vscode/settings.json | 5 +++-- .vscode/tasks.json | 27 +++++++++++++++++++++++++++ package.json | 36 +++++++++++++++++------------------- tsconfig.json | 15 +++++++++++++++ tslint.json | 8 ++++++++ 9 files changed, 100 insertions(+), 38 deletions(-) delete mode 100644 .babelrc delete mode 100644 .eslintrc create mode 100644 .vscode/launch.json create mode 100644 .vscode/tasks.json create mode 100644 tsconfig.json create mode 100644 tslint.json diff --git a/.babelrc b/.babelrc deleted file mode 100644 index d383c3e..0000000 --- a/.babelrc +++ /dev/null @@ -1,6 +0,0 @@ -{ - "presets": [ - "es2015-node", - "stage-3" - ] -} diff --git a/.eslintrc b/.eslintrc deleted file mode 100644 index 74fb26d..0000000 --- a/.eslintrc +++ /dev/null @@ -1,7 +0,0 @@ -{ - "extends": "standard", - "parser": "babel-eslint", - "env": { - "mocha": true - } -} diff --git a/.gitignore b/.gitignore index 0fc3ecc..d55a721 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,3 @@ -*.swp -.DS_Store node_modules -examples/node_modules -lib +dist +types \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..3c9e73a --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,28 @@ +{ + // Use IntelliSense to learn about possible Node.js debug attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "node", + "request": "launch", + "name": "Tests", + "program": "${workspaceRoot}/node_modules/.bin/_mocha", + "args": [ + "--require", + "source-map-support/register", + "--no-timeouts", + "--colors", + "${workspaceRoot}/dist/**/*.test.js" + ], + "preLaunchTask": "build", + "cwd": "${workspaceRoot}", + "sourceMaps": true, + "outFiles": [ + "${workspaceRoot}/dist/**/*.js" + ], + "internalConsoleOptions": "openOnSessionStart" + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index 7d6d886..11c5516 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,5 +1,6 @@ // Place your settings in this file to overwrite default and user settings. { "typescript.tsdk": "./node_modules/typescript/lib", - "editor.tabSize": 2 -} \ No newline at end of file + "editor.tabSize": 2, + "vsicons.presets.angular": false + } \ No newline at end of file diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..c5c00ea --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,27 @@ +{ + // See https://go.microsoft.com/fwlink/?LinkId=733558 + // for the documentation about the tasks.json format + "version": "0.1.0", + "command": "npm", + "isShellCommand": true, + "showOutput": "always", + "suppressTaskName": true, + "tasks": [ + { + "taskName": "install", + "args": ["install"] + }, + { + "taskName": "update", + "args": ["update"] + }, + { + "taskName": "test", + "args": ["run", "test"] + }, + { + "taskName": "build", + "args": ["run", "build"] + } + ] +} \ No newline at end of file diff --git a/package.json b/package.json index 94537f0..582c59f 100644 --- a/package.json +++ b/package.json @@ -1,13 +1,13 @@ { "name": "chan", - "version": "0.6.1", - "description": "A go style channel implementation that works nicely with co", - "main": "lib/index.js", + "version": "1.0.0", + "description": "Go-like channels for TypeScript and JavaScript.", + "main": "out/src/index.js", "scripts": { - "build": "babel src --out-dir lib", + "build": "tsc", "prepublish": "npm run build", - "test": "eslint src test && mocha --compilers js:babel-register 'test/*.test.js'", - "test-debug": "mocha debug --compilers js:babel-register 'test/*.test.js'" + "test": "npm run lint && npm run build && mocha 'dist/**/*.test.js'", + "lint": "tslint 'src/**/*.ts'" }, "repository": { "type": "git", @@ -15,10 +15,12 @@ }, "keywords": [ "async", - "go", + "await", "channel", - "co", - "generator" + "csp", + "buffer", + "golang", + "go" ], "author": "Brent Burgoyne", "contributors": [ @@ -40,17 +42,13 @@ }, "homepage": "https://github.com/brentburgoyne/chan", "devDependencies": { - "babel-eslint": "^6.1.1", - "babel-preset-es2015": "^6.9.0", - "babel-preset-es2015-node": "^6.1.0", - "babel-preset-stage-3": "^6.11.0", - "babel-register": "^6.9.0", - "eslint": "^2.13.1", - "eslint-config-standard": "^5.3.1", - "eslint-plugin-promise": "^1.3.2", - "eslint-plugin-standard": "^1.3.2", + "@types/node": "^6.0.52", "mocha": "^2.5.3", "mochon": "^1.0.0", - "sinon": "^1.17.4" + "sinon": "^1.17.4", + "source-map-support": "^0.4.7", + "tslint": "^4.1.1", + "tslint-config-standard": "^2.0.0", + "typescript": "^2.1.4" } } diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..207b277 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,15 @@ +{ + "compileOnSave": false, + "compilerOptions": { + "strictNullChecks": true, + "module": "commonjs", + "outDir": "dist", + "sourceMap": true, + "declaration": true, + "declarationDir": "types", + "target": "es6" + }, + "include": [ + "src/**/*.ts" + ] +} \ No newline at end of file diff --git a/tslint.json b/tslint.json new file mode 100644 index 0000000..e9b8434 --- /dev/null +++ b/tslint.json @@ -0,0 +1,8 @@ +{ + "extends": [ + "tslint-config-standard" + ], + "rules": { + "no-conditional-assignment": false + } +} \ No newline at end of file From 7fe424fe7a53c5732e7c833c8fab6bfa07889d1c Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Sat, 11 Feb 2017 23:31:11 -0700 Subject: [PATCH 15/22] fix lint and compiles issues --- src/buffer.ts | 10 +++++----- src/channel.ts | 29 +++++++++++++++++------------ src/deferred.ts | 4 ++-- src/index.ts | 5 +++-- src/queue.ts | 12 ++++++++---- src/select.ts | 4 ++-- src/timeout.ts | 16 ++++++++++------ 7 files changed, 47 insertions(+), 33 deletions(-) diff --git a/src/buffer.ts b/src/buffer.ts index 882efd4..6780856 100644 --- a/src/buffer.ts +++ b/src/buffer.ts @@ -1,23 +1,23 @@ import {Queue} from './queue' export interface IBuffer { - shift (): T + shift (): T | undefined hasValues (): boolean push (getValue: () => T): boolean } -abstract class BufferBase implements IBuffer { +export abstract class BufferBase implements IBuffer { // TODO: Use ring buffer instead of queue protected values = new Queue() - + constructor (protected size: number) {} - public shift (): T { + public shift (): T | undefined { return this.values.shift() } public hasValues (): boolean { - return this.values.empty() + return this.values.notEmpty() } abstract push (getValue: () => T): boolean diff --git a/src/channel.ts b/src/channel.ts index e2fbfe1..33bb64a 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -4,7 +4,7 @@ import {CanceledTakeError, ClosedTakeError, ClosedPutError} from './error' import {Queue} from './queue' export interface OutChannel extends PromiseLike { - take (): PromiseWithDeferred + take (): PromiseWithDeferred cancelableTake (): [PromiseWithDeferred, () => void] hasValues (): boolean } @@ -89,7 +89,7 @@ export class Channel implements InChannel, OutChannel { * promise will be rejected with a `ClosedPutError`. */ public put (value: T): Promise { - var deferred = new DeferredPut(value) + const deferred = new DeferredPut(value) if (this.isClosed) { deferred.reject(new ClosedPutError()) } else if (this.pendingTakes.notEmpty()) { @@ -107,9 +107,10 @@ export class Channel implements InChannel, OutChannel { */ public close (): void { this.isClosed = true - const err = new ClosedPutError() - while (this.pendingPuts.notEmpty()) { - this.pendingTakes.shift().reject(err) + const err = new ClosedPutError() + let pendingPut + while (pendingPut = this.pendingPuts.shift()) { + pendingPut.reject(err) } this.done() } @@ -120,15 +121,18 @@ export class Channel implements InChannel, OutChannel { } } - private nextValue (): T { - if (this.pendingPuts.notEmpty()) { - this.buffer.push(this.pendingPuts.shift().put) + private nextValue (): T | undefined { + const pendingPut = this.pendingPuts.shift() + if (pendingPut) { + this.buffer.push(pendingPut.put) } return this.buffer.shift() } - private resolve (deferred: Deferred, value: T): void { - deferred.resolve(value) + private resolve (deferred: Deferred | undefined, value: T | undefined): void { + if (deferred && value) { + deferred.resolve(value) + } this.done() } @@ -136,8 +140,9 @@ export class Channel implements InChannel, OutChannel { if (!this.isDone && this.isClosed && !this.buffer.hasValues()) { this.isDone = true const err = new ClosedTakeError() - while (this.pendingTakes.notEmpty()) { - this.pendingTakes.shift().reject(err) + let pendingTake: Deferred | undefined + while (pendingTake = this.pendingTakes.shift()) { + pendingTake.reject(err) } } return this.isDone diff --git a/src/deferred.ts b/src/deferred.ts index 8b4aced..f3708ba 100644 --- a/src/deferred.ts +++ b/src/deferred.ts @@ -6,7 +6,7 @@ export class Deferred { public promise: PromiseWithDeferred public resolve: (value: T) => void public reject: (err: Error) => void - + constructor () { this.promise = new PromiseWithDeferred((resolve, reject) => { this.resolve = resolve @@ -22,7 +22,7 @@ export class DeferredPut extends Deferred { constructor (private value: T) { super() this.put = () => { - this.resolve(null) + this.resolve(undefined) return this.value } } diff --git a/src/index.ts b/src/index.ts index 8557f8d..66cff27 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,6 @@ import {Channel} from './channel' import {blockingChannel, slidingChannel, droppingChannel} from './factory' -import {select} from './select' +import {select, Selector} from './select' import {timeout} from './timeout' export = Object.assign(blockingChannel, { @@ -8,5 +8,6 @@ export = Object.assign(blockingChannel, { dropping: droppingChannel, Channel, timeout, - select + select, + Selector }) diff --git a/src/queue.ts b/src/queue.ts index ccb58b4..eef98aa 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -7,8 +7,12 @@ export class Queue { this.items.push(value) } - public shift (): T { - return this.items.shift(); + public shift (): T | undefined { + return this.items.shift() + } + + public peek (): T | undefined { + return this.items[0] } public size (): number { @@ -22,7 +26,7 @@ export class Queue { public notEmpty (): boolean { return this.size() > 0 } - + public remove (value: T): boolean { const idx = this.items.indexOf(value) if (idx > -1) { @@ -31,4 +35,4 @@ export class Queue { } return false } -} \ No newline at end of file +} diff --git a/src/select.ts b/src/select.ts index 2a2c2ff..7b5c495 100644 --- a/src/select.ts +++ b/src/select.ts @@ -72,7 +72,7 @@ export class Selector implements PromiseLike { return this.deferred.promise.then(onFulfilled, onRejected) } - private execute ():void { + private execute (): void { const channels = Array.from(this.handlers.keys()) const nonEmpty = channels.filter(c => c.hasValues()) if (this.defaultFn && nonEmpty.length === 0) { @@ -100,4 +100,4 @@ export class Selector implements PromiseLike { } }) } -} \ No newline at end of file +} diff --git a/src/timeout.ts b/src/timeout.ts index af21a18..9ca4fe0 100644 --- a/src/timeout.ts +++ b/src/timeout.ts @@ -7,11 +7,15 @@ import {Channel} from './channel' */ export function timeout (ms: number): Channel { const ch = blockingChannel() - setTimeout(() => { - try { - ch.put(null) - ch.close() - } catch (err) {} - }, ms) + setTimeout( + () => { + try { + ch.put(null) + ch.close() + // tslint:disable-next-line:no-empty + } catch (err) {} + }, + ms + ) return ch } From 0a15b132e4e8d737d1a6a249f5e5b5ef45d3d689 Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Sat, 11 Feb 2017 23:31:39 -0700 Subject: [PATCH 16/22] add tests for channel --- src/channel.test.ts | 201 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 201 insertions(+) create mode 100644 src/channel.test.ts diff --git a/src/channel.test.ts b/src/channel.test.ts new file mode 100644 index 0000000..2328aa5 --- /dev/null +++ b/src/channel.test.ts @@ -0,0 +1,201 @@ +import {equal, ok} from 'assert' +import * as sinon from 'sinon' +import {describe, it, beforeEach, afterEach} from 'mocha' +import {Channel} from './channel' +import {BufferBlocking} from './buffer' +import * as Deferred from './deferred' + +describe('Channel', () => { + let buffer + let ch + let deferred + let deferredStub + + beforeEach(() => { + deferred = new Deferred.Deferred() + buffer = new BufferBlocking(5) + deferredStub = sinon.stub(Deferred, 'Deferred').returns(deferred) + ch = new Channel(buffer) + }) + + afterEach(() => { + deferredStub.restore() + }) + + describe('take', () => { + it('returns deferred promise', () => { + equal(ch.take(), deferred.promise) + }) + + it('resolves with the first value in the buffer', async () => { + const val = {} + ch.put(val) + ch.put({}) + equal(await ch.take(), val) + }) + + it('resolves when a value is added after the take', async () => { + const val = {} + setImmediate(() => ch.put(val)) + equal(await ch.take(), val) + }) + + it('rejects when called on a closed channel', async () => { + ch.close() + let err + try { + await ch.take() + } catch (e) { + err = e + } + ok(err) + }) + }) + + describe('then', () => { + it('proxies call to then of promise returned by take', () => { + const ret = {} + const then = sinon.stub().returns(ret) + sinon.stub(ch, 'take').returns({then}) + const onFullfilled = {} + const onRejected = {} + equal(ch.then(onFullfilled, onRejected), ret) + sinon.assert.calledWithExactly(then, onFullfilled, onRejected) + }) + }) + + describe('cancelableTake', () => { + it('returns a promise and a cancel function', () => { + const [promise, cancel] = ch.cancelableTake() + ok(promise instanceof Promise) + equal(typeof cancel, 'function') + }) + + it('promise comes from call to take', () => { + const expected = {} + sinon.stub(ch, 'take').returns(expected) + const [promise] = ch.cancelableTake() + equal(promise, expected) + }) + + it('cancel function removes pending take', async () => { + deferredStub.restore() + const [first, cancel] = ch.cancelableTake() + first.catch(() => null) + const second = ch.take() + cancel() + const val = {} + ch.put(val) + equal(await second, val) + }) + + it('promise for pending take is rejected', async () => { + let rejectedErr + const [promise, cancel] = ch.cancelableTake() + cancel() + try { + await promise + } catch (err) { + rejectedErr = err + } + ok(rejectedErr) + }) + }) + + describe('hasValues', () => { + it('returns true if the buffer has values', () => { + ch.buffer = {hasValues: () => true} + ch.pendingPuts = {notEmpty: () => false} + ok(ch.hasValues()) + }) + + it('returns true if the channel has pending puts', () => { + ch.buffer = {hasValues: () => false} + ch.pendingPuts = {notEmpty: () => true} + ok(ch.hasValues()) + }) + + it('returns false if buffer and pending puts are empty', () => { + ch.buffer = {hasValues: () => false} + ch.pendingPuts = {notEmpty: () => false} + ok(!ch.hasValues()) + }) + }) + + describe('put', () => { + it('resolves the first pending take', async () => { + const take1 = ch.take() + ch.take() + const val = {} + await ch.put(val) + equal(await take1, val) + }) + + it('puts deferred put in the buffer', () => { + sinon.stub(ch.buffer, 'push').returns(true) + const val = {} + ch.put(val) + equal(ch.buffer.push.firstCall.args[0](), val) + }) + + it('puts deferred in pending puts if buffer is full', () => { + sinon.stub(ch.buffer, 'push').returns(false) + const deferredPut = {} + const stub = sinon.stub(Deferred, 'DeferredPut').returns(deferredPut) + ch.put({}) + equal(ch.pendingPuts.peek(), deferredPut) + stub.restore() + }) + + it('is rejected if called on a closed channel', async () => { + ch.close() + let rejectedErr + try { + await ch.put({}) + } catch (err) { + rejectedErr = err + } + ok(rejectedErr) + }) + }) + + describe('close', () => { + beforeEach(() => { + deferredStub.restore() + }) + + it('sets isClosed to true', () => { + ch.close() + ok(ch.isClosed) + }) + + it('sets isDone to true', () => { + ch.close() + ok(ch.isDone) + }) + + it('rejects promises for any pending puts', async () => { + ch = new Channel(new BufferBlocking(0)) + const promise = ch.put({}) + ch.close() + let rejectedErr + try { + await promise + } catch (err) { + rejectedErr = err + } + ok(rejectedErr) + }) + + it('rejects any pending takes', async () => { + setImmediate(() => ch.close()) + let takeErr + try { + await ch.take() + } catch (err) { + takeErr = err + } + ok(takeErr) + }) + }) +}) From 2a0874a5921a4c062e40bdb2d7fda666562d7f7c Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Sun, 12 Feb 2017 06:40:39 -0700 Subject: [PATCH 17/22] upgrade mocha and configure debug script --- package.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index 582c59f..ebc4247 100644 --- a/package.json +++ b/package.json @@ -7,6 +7,7 @@ "build": "tsc", "prepublish": "npm run build", "test": "npm run lint && npm run build && mocha 'dist/**/*.test.js'", + "debug": "npm run build && mocha debug --no-timeouts 'dist/**/*.test.js'", "lint": "tslint 'src/**/*.ts'" }, "repository": { @@ -43,7 +44,7 @@ "homepage": "https://github.com/brentburgoyne/chan", "devDependencies": { "@types/node": "^6.0.52", - "mocha": "^2.5.3", + "mocha": "^3.1.0", "mochon": "^1.0.0", "sinon": "^1.17.4", "source-map-support": "^0.4.7", From d341e54c02d87c5d217a9f7a3489cc697b0c1394 Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Sun, 12 Feb 2017 07:30:00 -0700 Subject: [PATCH 18/22] converted buffer tests to typescript --- src/buffer.test.ts | 97 +++++++++++++++++++++++++++++++++++++++++++++ test/buffer.test.js | 91 ------------------------------------------ 2 files changed, 97 insertions(+), 91 deletions(-) create mode 100644 src/buffer.test.ts delete mode 100644 test/buffer.test.js diff --git a/src/buffer.test.ts b/src/buffer.test.ts new file mode 100644 index 0000000..ee16c43 --- /dev/null +++ b/src/buffer.test.ts @@ -0,0 +1,97 @@ +import {ok, equal, notEqual} from 'assert' +import {describe, it, beforeEach, afterEach} from 'mocha' +import * as buffer from './buffer' + +describe('Buffer', () => { + function common (Buffer) { + describe(`${Buffer.name} common`, () => { + describe('constructor', () => { + it('sets size from argument', () => { + const size = 47 + const b = new Buffer(size) + equal(b.size, size) + }) + }) + + describe('shift', () => { + it('removes the first value from the buffer', () => { + const b = new Buffer(47) + b.push(() => 1) + b.push(() => 2) + equal(b.shift(), 1) + }) + }) + + describe('hasValues', () => { + it('returns true if the buffer has values', () => { + const b = new Buffer(47) + b.push(() => 1) + ok(b.hasValues()) + }) + + it('returns false if the buffer has no values', () => { + const b = new Buffer(47) + ok(!b.hasValues()) + }) + }) + }) + } + + common(buffer.BufferBlocking) + common(buffer.BufferSliding) + common(buffer.BufferDropping) + + describe('BufferBlocking push', () => { + it('puts getValue() on buffer and returns true if there is room', () => { + const b = new buffer.BufferBlocking(1) + const val = {} + equal(b.push(() => val), true) + equal(b.shift(), val) + }) + + it('returns false if there is not room', () => { + const b = new buffer.BufferBlocking(1) + const val = {} + b.push(() => ({})) + ok(!b.push(() => val)) + b.shift() + notEqual(b.shift(), val) + }) + }) + + describe('BufferSliding push', () => { + it('puts getValue() on buffer and returns true if there is room', () => { + const b = new buffer.BufferSliding(1) + const val = {} + equal(b.push(() => val), true) + equal(b.shift(), val) + }) + + it('puts getValue() on buffer and removes first value if no room', () => { + const b = new buffer.BufferSliding(2) + b.push(() => 1) + b.push(() => 2) + equal(b.push(() => 3), true) + equal(b.shift(), 2) + equal(b.shift(), 3) + }) + }) + + describe('BufferDropping push', () => { + it('puts getValue() on buffer and returns true if there is room', () => { + const b = new buffer.BufferDropping(1) + const val = {} + equal(b.push(() => val), true) + equal(b.shift(), val) + }) + + it('does not put value on buffer and returns true if no room', () => { + const b = new buffer.BufferDropping(2) + b.push(() => 1) + b.push(() => 2) + equal(b.push(() => 3), true) + equal(b.shift(), 1) + equal(b.shift(), 2) + }) + }) +}) diff --git a/test/buffer.test.js b/test/buffer.test.js deleted file mode 100644 index 4cec8aa..0000000 --- a/test/buffer.test.js +++ /dev/null @@ -1,91 +0,0 @@ -import {ok, equal, notEqual, deepEqual} from 'assert' -import * as buffer from '../src/buffer' - -describe('BufferBase', () => { - describe('constructor', () => { - it('sets size from argument', () => { - const size = 47 - const b = new buffer.BufferBase(size) - equal(b.size, size) - }) - - it('sets size to 0 if argument is not a number', () => { - const size = 'cat' - const b = new buffer.BufferBase(size) - equal(b.size, 0) - }) - }) - - describe('shift', () => { - it('removes the first value from the buffer', () => { - const b = new buffer.BufferBase(47) - b.values = [1, 2, 3] - equal(b.shift(), 1) - }) - }) - - describe('hasValues', () => { - it('returns true if the buffer has values', () => { - const b = new buffer.BufferBase(47) - b.values = [1] - ok(b.hasValues()) - }) - - it('returns false if the buffer has no values', () => { - const b = new buffer.BufferBase(47) - b.values = [] - ok(!b.hasValues()) - }) - }) - - describe('BufferBlocking push', () => { - it('puts getValue() on buffer and returns true if there is room', () => { - const b = new buffer.BufferBlocking(1) - const val = {} - equal(b.push(() => val), true) - equal(b.values[0], val) - }) - - it('returns false if there is not room', () => { - const b = new buffer.BufferBlocking(1) - const val = {} - b.push(() => {}) - ok(!b.push(() => val)) - notEqual(b.values[1], val) - }) - }) - - describe('BufferSliding push', () => { - it('puts getValue() on buffer and returns true if there is room', () => { - const b = new buffer.BufferSliding(1) - const val = {} - equal(b.push(() => val), true) - equal(b.values[0], val) - }) - - it('puts getValue() on buffer and removes first value if no room', () => { - const b = new buffer.BufferSliding(2) - b.push(() => 1) - b.push(() => 2) - equal(b.push(() => 3), true) - deepEqual(b.values, [2, 3]) - }) - }) - - describe('BufferDropping push', () => { - it('puts getValue() on buffer and returns true if there is room', () => { - const b = new buffer.BufferDropping(1) - const val = {} - equal(b.push(() => val), true) - equal(b.values[0], val) - }) - - it('does not put value on buffer and returns true if no room', () => { - const b = new buffer.BufferDropping(2) - b.push(() => 1) - b.push(() => 2) - equal(b.push(() => 3), true) - deepEqual(b.values, [1, 2]) - }) - }) -}) From 5a23d577d2d98854f7007274dad775d1cd568050 Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Tue, 14 Feb 2017 00:01:37 -0700 Subject: [PATCH 19/22] add tests for deferred --- package.json | 4 ++-- src/deferred.test.ts | 56 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) create mode 100644 src/deferred.test.ts diff --git a/package.json b/package.json index ebc4247..0d8797e 100644 --- a/package.json +++ b/package.json @@ -6,8 +6,8 @@ "scripts": { "build": "tsc", "prepublish": "npm run build", - "test": "npm run lint && npm run build && mocha 'dist/**/*.test.js'", - "debug": "npm run build && mocha debug --no-timeouts 'dist/**/*.test.js'", + "test": "npm run lint && npm run build && mocha --require source-map-support/register 'dist/**/*.test.js'", + "debug": "npm run build && mocha debug --require source-map-support/register --no-timeouts 'dist/**/*.test.js'", "lint": "tslint 'src/**/*.ts'" }, "repository": { diff --git a/src/deferred.test.ts b/src/deferred.test.ts new file mode 100644 index 0000000..c03fa65 --- /dev/null +++ b/src/deferred.test.ts @@ -0,0 +1,56 @@ +import {equal, ok} from 'assert' +import * as sinon from 'sinon' +import {describe, it, beforeEach, afterEach} from 'mocha' +import {Deferred, DeferredPut} from './deferred' + +describe('Deferred', () => { + + describe('constructor', () => { + let deferred + + beforeEach(() => { + deferred = new Deferred() + }) + + it('creates a new promise', () => { + ok(deferred.promise instanceof Promise) + }) + + it('resolve fulfills the promise with value', async () => { + const expected = 'value' + deferred.resolve(expected) + equal(await deferred.promise, expected) + }) + + it('reject rejects the promise with error', async () => { + const expected = {} + deferred.reject(expected) + let err + try { + await deferred.promise + } catch (e) { + err = e + } + equal(err, expected) + }) + }) + + describe('DeferredPut', () => { + let value + let deferred + + beforeEach(() => { + value = 'something' + deferred = new DeferredPut(value) + }) + + it('resolves the promise with undefined', async () => { + deferred.put() + equal(await deferred.promise, undefined) + }) + + it('returns the value', async () => { + equal(deferred.put(), value) + }) + }) +}) From 5ed2d303aa5743ed6ffad313ebe9a2ff75ad21fb Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Tue, 14 Feb 2017 22:06:05 -0700 Subject: [PATCH 20/22] added tests for error types --- src/error.test.ts | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 src/error.test.ts diff --git a/src/error.test.ts b/src/error.test.ts new file mode 100644 index 0000000..07d2295 --- /dev/null +++ b/src/error.test.ts @@ -0,0 +1,36 @@ +import {equal, ok} from 'assert' +import * as sinon from 'sinon' +import {describe, it, beforeEach, afterEach} from 'mocha' +import {CanceledTakeError, ClosedTakeError, ClosedPutError} from './error' + +describe.only('Errors', () => { + function errorTests (ErrorClass, name) { + describe(name, () => { + let error + + beforeEach(() => { + error = new ErrorClass() + }) + + it('is an instance of Error', () => { + ok(error instanceof Error) + }) + + it('is an instance of its own class', () => { + ok(error instanceof ErrorClass) + }) + + it('has a stack trace', () => { + ok(/beforeEach/.test(error.stack)) + }) + + it('has the correct name', () => { + equal(error.name, name) + }) + }) + } + + errorTests(CanceledTakeError, 'CanceledTakeError') + errorTests(ClosedTakeError, 'ClosedTakeError') + errorTests(ClosedPutError, 'ClosedPutError') +}) From 251e2160791e0be04548317effbffa338c14bdeb Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Tue, 14 Feb 2017 22:32:07 -0700 Subject: [PATCH 21/22] add tests for factory functions --- src/error.test.ts | 2 +- src/factory.test.ts | 61 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 src/factory.test.ts diff --git a/src/error.test.ts b/src/error.test.ts index 07d2295..9bcff01 100644 --- a/src/error.test.ts +++ b/src/error.test.ts @@ -3,7 +3,7 @@ import * as sinon from 'sinon' import {describe, it, beforeEach, afterEach} from 'mocha' import {CanceledTakeError, ClosedTakeError, ClosedPutError} from './error' -describe.only('Errors', () => { +describe('Errors', () => { function errorTests (ErrorClass, name) { describe(name, () => { let error diff --git a/src/factory.test.ts b/src/factory.test.ts new file mode 100644 index 0000000..fddab72 --- /dev/null +++ b/src/factory.test.ts @@ -0,0 +1,61 @@ +import {equal, ok} from 'assert' +import * as sinon from 'sinon' +import {describe, it, beforeEach, afterEach} from 'mocha' +import {blockingChannel, slidingChannel, droppingChannel} from './factory' +import * as bufferModule from './buffer' +import * as channelModule from './channel' + +describe('Factory functions', () => { + let channel + let channelStub + let buffer + let bufferStub + + beforeEach(() => { + channel = {} + buffer = {} + channelStub = sinon.stub(channelModule, 'Channel').returns(channel) + }) + + afterEach(() => { + channelStub.restore() + }) + + function testFactory (factory, bufferType, defaultSize) { + describe(factory.name, () => { + beforeEach(() => { + bufferStub = sinon.stub(bufferModule, bufferType).returns(buffer) + }) + + afterEach(() => { + bufferStub.restore() + }) + + it('constructs a new buffer with size', () => { + const size = 5 + factory(size) + sinon.assert.calledWithExactly(bufferStub, size) + }) + + if (defaultSize != null) { + it(`has a default buffer size of ${defaultSize}`, () => { + factory() + sinon.assert.calledWithExactly(bufferStub, defaultSize) + }) + } + + it('constructs a new channel with buffer', () => { + factory() + sinon.assert.calledWithExactly(channelStub, buffer) + }) + + it('returns the new channel', () => { + equal(factory(), channel) + }) + }) + } + + testFactory(blockingChannel, 'BufferBlocking', 0) + testFactory(slidingChannel, 'BufferSliding', null) + testFactory(blockingChannel, 'BufferBlocking', null) +}) From d33b344659a42a1c3de89ec83d0f4898c76f2f9e Mon Sep 17 00:00:00 2001 From: Brent Burgoyne Date: Wed, 15 Feb 2017 06:06:09 -0700 Subject: [PATCH 22/22] add tests for queue --- src/queue.test.ts | 104 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 src/queue.test.ts diff --git a/src/queue.test.ts b/src/queue.test.ts new file mode 100644 index 0000000..4f5dfb5 --- /dev/null +++ b/src/queue.test.ts @@ -0,0 +1,104 @@ +import {equal, ok} from 'assert' +import * as sinon from 'sinon' +import {describe, it, beforeEach, afterEach} from 'mocha' +import {Queue} from './queue' + +describe('Queue', () => { + let queue: Queue + + beforeEach(() => { + queue = new Queue() + }) + + describe('push/shift', () => { + it('is first in first out', () => { + queue.push(1) + queue.push(2) + queue.push(3) + equal(queue.shift(), 1) + equal(queue.shift(), 2) + equal(queue.shift(), 3) + }) + }) + + describe('peek', () => { + it('returns the first value', () => { + queue.push(1) + equal(queue.peek(), 1) + }) + + it('does not remove the first value from the queue', () => { + queue.push(1) + queue.peek() + equal(queue.shift(), 1) + }) + }) + + describe('size', () => { + it('returns the number of values push on the queue', () => { + queue.push(1) + queue.push(2) + equal(queue.size(), 2) + }) + + it('returns zero for empty queues', () => { + equal(queue.size(), 0) + }) + + it('returns the size reduced by the number of values shifted', () => { + queue.push(1) + queue.push(2) + queue.shift() + equal(queue.size(), 1) + }) + }) + + describe('empty', () => { + it('returns true when no values are in the queue', () => { + equal(queue.empty(), true) + queue.push(1) + queue.shift() + equal(queue.empty(), true) + }) + + it('returns false when values are in the queue', () => { + queue.push(1) + equal(queue.empty(), false) + }) + }) + + describe('notEmpty', () => { + it('returns false when no values are in the queue', () => { + equal(queue.notEmpty(), false) + queue.push(1) + queue.shift() + equal(queue.notEmpty(), false) + }) + + it('returns true when values are in the queue', () => { + queue.push(1) + equal(queue.notEmpty(), true) + }) + }) + + describe('remove', () => { + it('removes value from the queue', () => { + queue.push(1) + queue.push(2) + queue.push(3) + queue.remove(2) + equal(queue.shift(), 1) + equal(queue.shift(), 3) + }) + + it('returns true if value was removed', () => { + queue.push(1) + equal(queue.remove(1), true) + }) + + it('returns false if value was not found', () => { + queue.push(1) + equal(queue.remove(2), false) + }) + }) +})