Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "A go style channel implementation that works nicely with co",
"main": "index.js",
"scripts": {
"test": "mocha"
"test": "babel-node --stage=0 node_modules/.bin/blue-tape src/**/*.test.js"
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was on a kick where I really liked tape when I wrote this, but I am good with sticking with mocha

},
"repository": {
"type": "git",
Expand Down Expand Up @@ -37,12 +37,15 @@
},
"homepage": "https://github.com/brentburgoyne/chan",
"devDependencies": {
"babel": "^5.8.21",
"blue-tape": "^0.1.10",
"co": "^3.0.6",
"expect.js": "^0.3.1",
"mocha": "^1.20.1",
"should": "^4.0.4",
"sinon": "^1.10.3",
"split": "^0.3.0",
"superagent": "^0.18.0"
"superagent": "^0.18.0",
"tap-dot": "^1.0.0"
}
}
41 changes: 41 additions & 0 deletions src/buffer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
export class BufferBase {
values = []

constructor (size) {
this.size = parseInt(size, 10)
}

shift () {
return this.values.shift()
}
}

export class BufferBlocking extends BufferBase {
push (getValue) {
if (this.values.length < this.size) {
this.values.push(getValue())
return true
}
return false
}
}

export class BufferDropping extends BufferBase {
push (getValue) {
const value = getValue()
if (this.values.length < this.size) {
this.values.push(value)
}
return true
}
}

export class BufferSliding extends BufferBase {
push (getValue) {
this.values.push(getValue())
if (this.values.length > this.size) {
this.values.shift()
}
return true
}
}
95 changes: 95 additions & 0 deletions src/channel.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import {default as Deferred, DeferredPut} from './deferred'

const CLOSED_ERROR_MSG = 'Cannot add to closed channel'

export default class Channel {
pendingPuts = []
pendingTakes = []
isClosed = false
isDone = false
empty = {}

constructor (buffer) {
this.buffer = buffer
}

then (onFulfilled, onRejected) {
return this.take().then(onFulfilled, onRejected)
}

take () {
const deferred = new Deferred()
if (this.done()) {
this.resolveEmpty(deferred)
} else if (this.hasValues()) {
this.resolve(deferred, this.nextValue())
} else {
this.pendingTakes.push(deferred)
}
return deferred.promise
}

cancelableTake () {
const promise = this.take()
return [
promise,
() => this.removePendingTake(promise.deferred)
]
}

removePendingTake (deferred) {
const idx = this.pendingTakes.indexOf(deferred)
if (idx > -1) {
this.pendingTakes.splice(idx, 1)
}
}

hasValues () {
return this.buffer.length > 0 || this.pendingPuts.length > 0
}

nextValue () {
if (this.pendingPuts.length > 0) {
this.buffer.push(this.pendingPuts.shift().put())
}
return this.buffer.shift()
}

put (value) {
var deferred = new DeferredPut(value)
if (this.isClosed) {
deferred.reject(new Error(CLOSED_ERROR_MSG))
} else if (this.pendingTakes.length > 0) {
this.resolve(this.pendingTakes.shift(), deferred.put())
} else if (!this.buffer.push(deferred.put.bind(deferred))) {
this.pendingPuts.push(deferred)
}
return deferred.promise
}

resolve (deferred, value) {
deferred.resolve(value)
this.done()
}

resolveEmpty (deferred) {
this.resolve(deferred, this.empty)
}

close () {
this.isClosed = true
let receiver
while (receiver = this.pendingPuts.shift()) {
receiver.error(new Error(CLOSED_ERROR_MSG))
}
return this.done()
}

done () {
if (!this.isDone && this.isClosed && this.buffer.length === 0) {
this.isDone = true
this.pendingTakes.forEach(this.resolveEmpty, this)
}
return this.isDone
}
}
21 changes: 21 additions & 0 deletions src/deferred.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
export default class Deferred {
constructor () {
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve
this.reject = reject
})
this.promise.deferred = this
}
}

export class DeferredPut extends Deferred {
constructor (value) {
super()
this.value = value
}

put () {
this.resolve()
return this.value
}
}
14 changes: 14 additions & 0 deletions src/factory.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import Channel from './channel'
import {BufferBlocking, BufferSliding, BufferDropping} from './buffer'

export function blockingChannel (size) {
return new Channel(new BufferBlocking(size))
}

export function slidingChannel (size) {
return new Channel(new BufferSliding(size))
}

export function droppingChannel (size) {
return new Channel(new BufferDropping(size))
}
13 changes: 13 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import {blockingChannel, slidingChannel, droppingChannel} from './factory'
import Channel from './channel'
import timeout from './timeout'
import alts from './alts'

const chan = blockingChannel
chan.sliding = slidingChannel
chan.dropping = droppingChannel
chan.Channel = Channel
chan.timeout = timeout
chan.alts = alts
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be select, considered renaming, but I think it is better to keep go terminology since there is aleady a js library similar to core/async.


export default chan
30 changes: 30 additions & 0 deletions src/select.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import Deferred from './deferred'

function pairs ()
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, shouldn't have commited this line :(, thats what I get for just commiting stuff in a half complete state.


export function select (channels...) {
const deferred = new Deferred()
const nonEmpty = channels.filter(channel => channel.hasValues())
const cancels = []
let remaining = channels.length

const take = channel => {
const [promise, cancel] = channel.cancelableTake()
cancels.push(cancel)
promise.then(value => {
if (value === channel.empty && --remaining > 0) {
return
}
cancels.forEach(fn => fn())
deferred.resolve([channel, value])
})
}

if (nonEmpty.length > 1) {
take(nonEmpty[Math.random() * nonEmpty.length | 0])
} else {
channels.forEach(take)
}

return deferred.promise
}
12 changes: 12 additions & 0 deletions src/timeout.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import {blockingChannel} from './factory'

export default function timeout (ms) {
const ch = blockingChannel()
setTimeout(() => {
try {
ch.put(true)
ch.close()
} catch (err) {}
}, ms)
return ch
}