Skip to content

Commit

Permalink
Fixes #31
Browse files Browse the repository at this point in the history
  • Loading branch information
JAForbes committed Jan 1, 2023
1 parent 74161ae commit 0bc8c5f
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 36 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,15 @@ S.id(a); // kjhjkgasd

This id is not globally unique, or cryptographically random, its just a debugging thing.

You can also overwrite the generated id with your own to make debugging easier:

```js
S.id(a, 'a')
S.id() // 'a'
```

> 🤔 In future we may just expose the id map instead of having this getter setter
### `S.sample`

Read the value of a signal within a computation without treating it as a dependency of that computation.
Expand Down
126 changes: 90 additions & 36 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type SyncComputationInternal<T> = {
value?: T;
next?: T;
compute: () => void;
id: string;
};

type GeneratorComputationInternal<T> = {
Expand All @@ -14,13 +15,16 @@ type GeneratorComputationInternal<T> = {
value?: T;
next?: T;
compute: () => void;
id: string;
};

type ComputationInternal<T> =
| SyncComputationInternal<T>
| GeneratorComputationInternal<T>;

type DataInternal<T> = { type: "Stream"; tag: "Data"; value?: T; next?: T };
type DataInternal<T> = {
type: "Stream"; tag: "Data"; value?: T; next?: T, id: string
};

type StreamInternal<T> = DataInternal<T> | ComputationInternal<T>;

Expand Down Expand Up @@ -70,7 +74,21 @@ const rootOfStream = new WeakMap<ComputationInternal<unknown>, Root>();
/**
* The computations that will need to rerun next tick
*/
export let toRun = new Set<ComputationInternal<unknown>>();
export let runningNextTick = new Set<ComputationInternal<unknown>>();

/**
* The computations that are scheduled to run in the current propagation
*/
export let runningThisTick = new Set<ComputationInternal<unknown>>();

/**
* If a computation is nested within another computation
* and the parent is scheduled to run in this tick along with the child
* we only need to run the parent, as the child will automatically
* run when the parent compute runs, so we need to skip
* those child nodes to prevent double updates, that's what this set is for
*/
export let allChildren = new Set<ComputationInternal<unknown>>();

/**
* The direct dependencies of a stream, used in computeDependents
Expand Down Expand Up @@ -224,7 +242,7 @@ function computeDependents(stream: StreamInternal<unknown>) {
}
}

toRun.add(x);
runningNextTick.add(x);
}
}

Expand All @@ -245,6 +263,9 @@ export function data<T>(
tag: "Data",
next: value,
value,
get id(){
return ids.get( accessor )!
}
};

let accessor = (...args: T[] | [(x?: T) => T]) => {
Expand Down Expand Up @@ -325,6 +346,9 @@ export function computation<T>(
streamsToResolve.add(stream);
}
},
get id(){
return ids.get(accessor)!
}
};

// whatever active was when this was defined
Expand Down Expand Up @@ -365,15 +389,21 @@ export function computation<T>(
stream.compute();
}

return record(stream, () => {
const accessor = () => {
if ( state === 'propagating' && runningThisTick.has(stream) ) {
// compute now
tickOne(stream)
}
if (active[0]) {
xet(dependents, stream, () => new Set()).add(active[0]);

return stream.next!;
} else {
return stream.value!;
}
});
}

return record(stream, accessor);
}

// only defining these types as I couldn't
Expand Down Expand Up @@ -448,6 +478,9 @@ export function generator<T>(
}
});
},
get id(){
return ids.get(accessor)!
}
};

async function iterate<T>(it: StreamIterator<T>) {
Expand Down Expand Up @@ -489,14 +522,19 @@ export function generator<T>(

stream.compute();

return record(stream, () => {
const accessor = () => {
if ( state === 'propagating' && runningThisTick.has(stream) ) {
// compute now
tickOne(stream)
}
if (active[0]) {
xet(dependents, stream, () => new Set()).add(active[0]);

return stream.next;
}
return stream.value;
});
}
return record(stream, accessor);
}

export function freeze(f: VoidFunction): void {
Expand Down Expand Up @@ -548,7 +586,7 @@ export function root<T>(f: (dispose: VoidFunction) => T) {
cleanup();
}
dependents.delete(x);
toRun.delete(x);
runningNextTick.delete(x);
cleanups.delete(x);
parents.delete(x);
}
Expand All @@ -559,25 +597,57 @@ export function root<T>(f: (dispose: VoidFunction) => T) {
return out;
}

function tickOne(
stream: ComputationInternal<unknown>
){
if ( !runningThisTick.has(stream) ) {
// evalauted already by another compute
return;
}
for (let cleanupFn of xet(cleanups, stream, () => new Set())) {
cleanupFn();
cleanups.get(stream)!.delete(cleanupFn);
}

// if is a child, we only need to clean up
if (allChildren.has(stream)) {
return;
} else {
let oldActive = active;
active = [...(parents.get(stream) ?? [])];

let oldActiveRoot = activeRoot;
activeRoot = rootOfStream.get(stream) ?? null;

stream.compute();

active = oldActive;
activeRoot = oldActiveRoot;
stats.computations.evaluated++;
}
runningThisTick.delete(stream)
}

export function tick() {
if (state === "propagating" || state === "frozen") return;

stats.ticks++;

state = "propagating";

let oldToRun = new Set(toRun);
runningThisTick = new Set(runningNextTick);

runningNextTick.clear();

toRun.clear();
allChildren.clear()

let allChildren = new Set<ComputationInternal<unknown>>();
for (let x of oldToRun) {
for (let x of runningThisTick) {
for (let child of children.get(x) ?? []) {
// record the child exists
allChildren.add(child);
// add to the tick so we can run
// the clean up fns (at most once)
oldToRun.add(child);
runningThisTick.add(child);
parents.delete(child);
streamsToResolve.delete(child);
// should we delete the dependents too?
Expand All @@ -588,28 +658,8 @@ export function tick() {
children.delete(x);
}

for (let f of oldToRun) {
for (let cleanupFn of xet(cleanups, f, () => new Set())) {
cleanupFn();
cleanups.get(f)!.delete(cleanupFn);
}

// if is a child, we only need to clean up
if (allChildren.has(f)) {
continue;
} else {
let oldActive = active;
active = [...(parents.get(f) ?? [])];

let oldActiveRoot = activeRoot;
activeRoot = rootOfStream.get(f) ?? null;

f.compute();

active = oldActive;
activeRoot = oldActiveRoot;
stats.computations.evaluated++;
}
for (let stream of [...runningThisTick]) {
tickOne(stream)
}

for (let s of streamsToResolve) {
Expand All @@ -635,6 +685,7 @@ export function tick() {
nextTicks.length = 0;
for (let next of xs) {
if (runawayTicks > MAX_TICKS) {
state = 'idle'
throw new RunawayTicks();
}
next();
Expand All @@ -659,6 +710,9 @@ export function sample<T>(signal: Signal<T>) {
return value;
}

export function id(s: Signal<any>) {
export function id(s: Signal<any>, newId?: string) {
if (newId != null ) {
ids.set(s, newId)
}
return ids.get(s);
}
34 changes: 34 additions & 0 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1080,5 +1080,39 @@ test('Max ticks', t => {
}, S.RunawayTicks, 'Runaway ticks caught')

t.equals(S.stats.ticks, S.MAX_TICKS+1, 'Caught at max ticks')
t.end()
})

test('https://github.com/JAForbes/S/issues/31', t => {

const a = S.data(1)

const { b,c,d } = S.root(() => {

const b = S.computation(() => {
return a() * 1
})

const c = S.computation(() => {
return b()! * 2
})

const d = S.computation(() => {
return a() + c()!
})

return { b, c, d }
})

Object.entries({ a,b, c,d }).map( ([k,v]) => S.id(v,k) )

t.equals(c()!, 2, 'c propagated')
t.equals(d()!, 3, 'd propagated')

a(2)

t.equals(c()!, 4, 'c propagated')
t.equals(d()!, 6, 'd propagated')

t.end()
})

0 comments on commit 0bc8c5f

Please sign in to comment.