Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to @most/core v1+ #50

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,29 @@

Subjects for [`@most/core`](https://github.com/mostjs/core)

It provides a subjected `Stream` into which you can inject events, an error or the end imperatively. You can also imperatively assign another `Stream` as source, so that it's events, error or end are "handed over" to the subjected `Stream`.

### Example

```javascript
const
{ attach, create, event } = require('most-subject'),
{ periodic, runEffects, scan, take, tap } = require('@most/core'),
{ currentTime, newDefaultScheduler } = require('@most/scheduler'),
scheduler = newDefaultScheduler(),
[attachSink, subjStream] = create();

// write subjStream to console
runEffects(tap(console.log, subjStream), scheduler);

event(currentTime(scheduler), 99, attachSink); // console: 99

// 1 - 2 - 3 - 4 - 5 - …
const origin = scan(x => x + 1, 1, periodic(50));

attach(attachSink, take(4, origin)); // console: 1 2 3 4
```

## Get it
```sh
yarn add most-subject
Expand All @@ -14,7 +37,7 @@ npm install --save most-subject
#### create\<A\>(): Subject\<A, A\>
#### create\<A, B\>(f: (stream: Stream\<A\>) =\> Stream\<B\>): Subject\<A, B\>

Returns an tuple containing a `AttachSink` and a `Stream`. `AttachSink` can be
Returns a tuple containing a `AttachSink` and a `Stream`. `AttachSink` can be
used to imperatively control the events flowing through the `Stream` or
declaratively using `attach`. Optionally, a function can be applied to the Stream,
and the return value of that function will be returned as the second tuple value.
Expand Down
15 changes: 8 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@
"author": "Tylor Steinberger <[email protected]>",
"license": "MIT",
"devDependencies": {
"@typed/test": "3.6.0",
"@types/node": "8.0.44",
"@typed/test": "^9.5.0",
"@types/node": "^20.12.2",
"conventional-changelog-cli": "1.3.4",
"husky": "0.14.3",
"lint-staged": "4.2.3",
"prettier": "1.7.4",
"typescript": "2.5.2",
"validate-commit-message": "3.0.1"
"typescript": "^3.9.10",
"validate-commit-message": "^3.2.0"
},
"dependencies": {
"@most/core": "0.14.0",
"@most/prelude": "1.6.4",
"@most/types": "0.11.1"
"@most/core": "^1.6.1",
"@most/disposable": "^1.3.0",
"@most/prelude": "^1.8.0",
"@most/types": "^1.1.0"
},
"lint-staged": {
"*.ts": [
Expand Down
60 changes: 42 additions & 18 deletions src/ProxyStream.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
import { Disposable, Scheduler, Sink, Stream, Time } from '@most/types'
import { MulticastSource, never } from '@most/core'
import { disposeNone } from '@most/disposable'

export class ProxyStream<A> extends MulticastSource<A> {
export class ProxyStream<A> extends MulticastSource<A>
implements Stream<A>, Disposable, Sink<A> {
public attached: boolean = false
public running: boolean = false
public scheduler: Scheduler
private sinkCount: number = 0
private _source?: Stream<A>
private _disposable?: Disposable

constructor() {
super(never())
this._disposable = disposeNone()
}

public run(sink: Sink<A>, scheduler: Scheduler): Disposable {
this.scheduler = scheduler
this.add(sink)

const shouldRun = this.attached && !this.running
const shouldRunSource = this.attached && !this.running

if (shouldRun) {
if (shouldRunSource) {
this.running = true
this.disposable = this.source.run(this, scheduler)

return this.disposable
this._disposable = this._source.run(this as Sink<A>, scheduler)
}

return new ProxyDisposable(this, sink)
Expand All @@ -30,13 +34,16 @@ export class ProxyStream<A> extends MulticastSource<A> {
if (this.attached) throw new Error('Can only attach 1 stream')

this.attached = true
this.source = stream
this._source = stream

const hasMoreSinks = this.sinks.length > 0
const shouldRunSource = this.sinkCount > 0

if (hasMoreSinks) this.disposable = stream.run(this, this.scheduler)
if (shouldRunSource) {
this.running = true
this._disposable = this._source.run(this as Sink<A>, this.scheduler)
}

return stream
return this._source
}

public error(time: Time, error: Error): void {
Expand All @@ -55,28 +62,45 @@ export class ProxyStream<A> extends MulticastSource<A> {
this.attached = false
this.running = false
}

add(sink: Sink<A>): number {
this.sinkCount = super.add(sink)
return this.sinkCount
}

remove(sink: Sink<A>): number {
this.sinkCount = super.remove(sink)
return this.sinkCount
}

dispose(): void {
super.dispose()
const disposable = this._disposable
this._disposable = disposeNone()
return disposable.dispose()
}
}

class ProxyDisposable<A> implements Disposable {
private source: ProxyStream<A>
private proxyStream: ProxyStream<A>
private sink: Sink<A>
private disposed: boolean

constructor(source: ProxyStream<A>, sink: Sink<A>) {
this.source = source
this.proxyStream = source
this.sink = sink
this.disposed = false
}

public dispose() {
public dispose(): void {
if (this.disposed) return

const { source, sink } = this
const { proxyStream, sink } = this

this.disposed = true
const remainingSinks = source.remove(sink)
const hasNoMoreSinks = remainingSinks === 0

return hasNoMoreSinks && source.dispose()
const remainingSinks = proxyStream.remove(sink)
if (remainingSinks === 0) {
proxyStream.dispose()
}
}
}
87 changes: 65 additions & 22 deletions src/attach.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ import { Scheduler, Stream } from '@most/types'
import { Test, describe, given, it } from '@typed/test'
import {
at,
continueWith,
delay,
map,
mergeArray,
runEffects,
startWith,
take,
tap,
throwError,
} from '@most/core'

import { attach } from './attach'
Expand All @@ -17,49 +16,93 @@ import { newDefaultScheduler } from '@most/scheduler'

export const test: Test = describe(`attach`, [
given(`Sink<A> and Stream<A>`, [
it(`creates a circular dependency`, ({ equal }) => {
it(`attaches a Stream via Sink as source to the already subscribed-to 'create' Stream`, ({
equal,
}) => {
const expected = [0, 1, 2]

const scheduler = newDefaultScheduler()
const [sink, sut] = create<number>()
const stream = mergeArray<number>(expected.map(x => at(x, x)))
const stream = mergeArray<Stream<number>[]>(expected.map(x => at(x, x)))

const promise = collectEvents(scheduler, sut)

// stream 0 1 2 -> sut 0 1 2
attach(sink, stream)

return promise.then(equal(expected))
}),

it(`does not have a memory leak`, ({ notOk }, done) => {
const scheduler = newDefaultScheduler()
const [sink, stream] = create<number>()
it('the already subscribed-to source ends with the error of the attached origin', ({
equal,
notOk,
}) => {
const scheduler = newDefaultScheduler(),
[sinkStream, stream] = create<number>(),
sampleError = new Error('sample error'),
origin = continueWith(() => throwError(sampleError), at(10, 5)),
outcome = runEffects(tap(equal(5), stream), scheduler).then(
() => notOk(true),
equal(sampleError)
)

attach(sinkStream, origin)

return outcome
}),

function makeAssertions(currentValue: number) {
if (currentValue === 8) Promise.resolve(void 0).then(done)
it(`attaches a Stream via Sink as source to the 'create' Stream which is subscribed to afterwards`, ({
equal,
}) => {
const expected = [0, 1, 2]

notOk(currentValue > 8)
}
const scheduler = newDefaultScheduler()
const [sink, sut] = create<number>()
const stream = mergeArray<Stream<number>[]>(expected.map(x => at(x, x)))

const origin = map(x => x * 2, startWith(1, tap(makeAssertions, stream)))
// stream 0 1 2 -> sut 0 1 2
attach(sink, stream)

runEffects(take(3, attach(sink, delay(10, origin))), scheduler)
return collectEvents(scheduler, sut).then(equal(expected))
}),

it(`allows reattaching after completion`, ({ ok }) => {
const scheduler = newDefaultScheduler()
const [sink, stream] = create<number>()
it('ends with the error of the attached origin', ({ equal, notOk }) => {
const scheduler = newDefaultScheduler(),
[sinkStream, stream] = create<number>(),
sampleError = new Error('sample error'),
origin = continueWith(() => throwError(sampleError), at(10, 5))

const drain = <A>(stream: Stream<A>) => runEffects(stream, scheduler)
attach(sinkStream, origin)

return runEffects(tap(equal(5), stream), scheduler).then(
() => notOk(true),
equal(sampleError)
)
}),

const origin = mergeArray<number>([0, 1, 2].map(x => at(x, x)))
it(`allows reattaching after completion`, ({ equal }) => {
const scheduler = newDefaultScheduler(),
// Stream a -> Promise a[]
drain: <A>(s: Stream<A>) => Promise<A[]> = collectEvents.bind(
undefined,
scheduler
),
[sink, stream] = create<number>(),
// 10 - 11 - 12|
origin = mergeArray<Stream<number>[]>([10, 11, 12].map(x => at(x, x)))

attach(sink, origin)

return drain(stream)
.then(() => attach(sink, origin))
return drain(delay(100, stream))
.then(xs => new Promise<number[]>(res => setTimeout(res, 100, xs)))
.then(events => {
equal([10, 11, 12])(events)
attach(sink, origin)
return stream
})
.then(s => new Promise<Stream<number>>(res => setTimeout(res, 100, s)))
.then(drain)
.then(() => ok(true))
.then(equal([10, 11, 12]))
}),
]),
])
Expand Down
3 changes: 2 additions & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"noImplicitAny": true,
"sourceMap": true,
"noUnusedParameters": true,
"noUnusedLocals": true
"noUnusedLocals": true,
"types": []
},
"include": [
"src/**/*.ts"
Expand Down
Loading