Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat - Shared Workers #402

Draft
wants to merge 50 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
a00b452
chore: Add a stub for shared worker code
bebraw Oct 5, 2021
9dea04c
chore: Refine shared worker implementation
bebraw Oct 5, 2021
98555a8
chore: Add initial docs
bebraw Oct 5, 2021
f224972
test: Add initial tests
bebraw Oct 5, 2021
2f5d1c0
chore: Sketch out spawn for shared workers
bebraw Oct 6, 2021
3e74bd8
chore: Refine type check
bebraw Oct 6, 2021
e807f5b
fix: Use instanceof for shared worker checks
bebraw Oct 6, 2021
9a0c7ab
fix: Start shared worker when events are added
bebraw Oct 6, 2021
66d9e2d
chore: Add a todo
bebraw Oct 6, 2021
dc8f1df
fix: Make shared optional
bebraw Oct 11, 2021
c742173
fix: Add a check against possibly missing port
bebraw Oct 11, 2021
bebb1af
fix: Include shared worker to ts
bebraw Oct 11, 2021
5a9b2a9
fix: Add a missing browser field
bebraw Oct 12, 2021
aed0dda
fix: Drop @types/sharedworker
bebraw Oct 12, 2021
ad356b1
fix: Fix test paths
bebraw Oct 12, 2021
6a70a09
chore: Add a todo
bebraw Oct 12, 2021
cf06ee9
fix: Implement terminate for shared workers
bebraw Oct 12, 2021
ebecb9e
chore: Improve spacing
bebraw Oct 12, 2021
e373b20
chore: Clean up formatting
bebraw Oct 18, 2021
b661764
chore: Remove unnecessary formatting from readme
bebraw Oct 18, 2021
1c22039
chore: Drop ;'s
bebraw Oct 18, 2021
9828320
chore: Drop ;'s
bebraw Oct 18, 2021
f594466
chore: Remove unnecessary formatting from readme
bebraw Oct 18, 2021
bdee01a
chore: Drop ,'s
bebraw Oct 18, 2021
8073c52
chore: Revert formatting
bebraw Oct 18, 2021
2d004ab
chore: Revert formatting
bebraw Oct 18, 2021
5c1f8ac
chore: Revert formatting
bebraw Oct 18, 2021
e232ee6
chore: Revert formatting
bebraw Oct 18, 2021
1594e2c
chore: Drop semicolons
bebraw Oct 20, 2021
127c55d
chore: Simplify code
bebraw Oct 20, 2021
2b7d881
refactor: Simplify implementation
bebraw Oct 20, 2021
04554a8
chore: Drop formatting
bebraw Oct 20, 2021
a906d77
chore: Drop formatting
bebraw Oct 20, 2021
42d15bf
chore: Drop formatting
bebraw Oct 20, 2021
9138fd5
chore: Drop formatting
bebraw Oct 20, 2021
fdf90a3
chore: Drop formatting
bebraw Oct 20, 2021
a1b837b
chore: Drop formatting
bebraw Oct 20, 2021
4606977
chore: Drop formatting
bebraw Oct 20, 2021
7f1a625
chore: Drop formatting
bebraw Oct 20, 2021
8663a6d
chore: Drop formatting
bebraw Oct 20, 2021
cf5cf41
chore: Mark a todo
bebraw Oct 20, 2021
2ea1d55
chore: Drop a reference
bebraw Oct 20, 2021
4c9d072
chore: Drop formatting
bebraw Oct 20, 2021
5aef0bd
chore: Drop formatting
bebraw Oct 20, 2021
3c00490
chore: Drop formatting
bebraw Oct 20, 2021
8e008d5
refactor: Merge shared worker tests with spawn tests
bebraw Oct 22, 2021
de025a1
fix: Solve the proxy type issue
bebraw Oct 22, 2021
b3dfead
fix: Fix shared worker checks within a worker
bebraw Oct 25, 2021
aceed4b
chore: Add a debug print
bebraw Oct 25, 2021
cffac58
chore: Sketch out onconnect
bebraw Oct 25, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,40 @@ Use `expose()` to make a function or an object containing methods callable from

In case of exposing an object, `spawn()` will asynchronously return an object exposing all the object's functions. If you `expose()` a function, `spawn` will also return a callable function, not an object.

## Shared Workers

[Shared Workers](https://developer.mozilla.org/en-US/docs/Web/API/SharedWorker) can be accessed from multiple browsing contexts (windows, iframes, other workers) making them useful for sharing tasks such as synchronization and avoiding redundant work.

In threads.js, the functionality is exposed as follows:

```js
// master.js
import { spawn, SharedWorker, Thread } from "threads"

const auth = await spawn(new SharedWorker("./workers/auth"))
const hashed = await auth.hashPassword("Super secret password", "1234")

console.log("Hashed password:", hashed)

await Thread.terminate(auth)
```

```js
// workers/auth.js
import sha256 from "js-sha256"
import { expose } from "threads/worker"

expose({
hashPassword(password, salt) {
return sha256(password + salt)
},
})
```

As you might notice, compared to the original example, only the imports (`Worker` -> `SharedWorker` and `expose` path) have changed.

Note that as the functionality makes sense only in the browser, it's available only there. Based on [caniuse](https://caniuse.com/sharedworkers), the functionality is widely supported [Safari being a notable exception](https://bugs.webkit.org/show_bug.cgi?id=149850).

## Usage

<p>
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"build:es": "tsc -p tsconfig-esm.json",
"postbuild": "npm run bundle",
"bundle": "rollup -c -f umd --file=bundle/worker.js --name=threads --silent -- dist-esm/worker/bundle-entry.js",
"test": "npm run test:library && npm run test:tooling && npm run test:puppeteer:basic && npm run test:puppeteer:webpack",
"test": "npm run test:library && npm run test:tooling && npm run test:puppeteer:basic && test:puppeteer:shared && npm run test:puppeteer:webpack",
"test:library": "cross-env TS_NODE_FILES=true ava ./test/**/*.test.ts",
"test:tooling": "cross-env TS_NODE_FILES=true ava ./test-tooling/**/*.test.ts",
"test:puppeteer:basic": "puppet-run --plugin=mocha --bundle=./test/workers/:workers/ --serve=./bundle/worker.js:/worker.js ./test/*.chromium*.ts",
Expand Down
1 change: 1 addition & 0 deletions src/master/implementation.browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ function selectWorkerImplementation(): ImplementationExport {

return {
blob: BlobWorker,
shared: SharedWorker,
default: WebWorker
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/master/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ export const BlobWorker = getWorkerImplementation().blob

/** Worker implementation. Either web worker or a node.js Worker class. */
export const Worker = getWorkerImplementation().default

/** Shared Worker implementation. Available only in the web. */
export const SharedWorker = getWorkerImplementation().shared
15 changes: 12 additions & 3 deletions src/master/invocation-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
ModuleMethods,
ModuleProxy,
ProxyableFunction,
Worker as WorkerType
Worker as TWorker,
} from "../types/master"
import {
MasterJobCancelMessage,
Expand All @@ -26,6 +26,8 @@ import {
WorkerMessageType
} from "../types/messages"

type WorkerType = SharedWorker | TWorker

const debugMessages = DebugLogger("threads:master:messages")

let nextJobUID = 1
Expand Down Expand Up @@ -75,13 +77,19 @@ function createObservableForJob<ResultType>(worker: WorkerType, jobUID: number):

worker.addEventListener("message", messageHandler)

if (worker instanceof SharedWorker) {
worker.port.start()
}

return () => {
if (asyncType === "observable" || !asyncType) {
const cancelMessage: MasterJobCancelMessage = {
type: MasterMessageType.cancel,
uid: jobUID
}
worker.postMessage(cancelMessage)
const port = worker instanceof SharedWorker ? worker.port : worker;

port.postMessage(cancelMessage, []);
}
worker.removeEventListener("message", messageHandler)
}
Expand Down Expand Up @@ -125,11 +133,12 @@ export function createProxyFunction<Args extends any[], ReturnType>(worker: Work
method,
args
}
const port = worker instanceof SharedWorker ? worker.port : worker;

debugMessages("Sending command to run function to worker:", runMessage)

try {
worker.postMessage(runMessage, transferables)
port.postMessage(runMessage, transferables)
} catch (error) {
return ObservablePromise.from(Promise.reject(error))
}
Expand Down
58 changes: 52 additions & 6 deletions src/master/spawn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
ModuleThread,
PrivateThreadProps,
StripAsync,
Worker as WorkerType,
Worker as TWorker,
WorkerEvent,
WorkerEventType,
WorkerInternalErrorEvent,
Expand All @@ -19,6 +19,8 @@ import { WorkerInitMessage, WorkerUncaughtErrorMessage } from "../types/messages
import { WorkerFunction, WorkerModule } from "../types/worker"
import { createProxyFunction, createProxyModule } from "./invocation-proxy"

type WorkerType = SharedWorker | TWorker

type ArbitraryWorkerInterface = WorkerFunction & WorkerModule<string> & { somekeythatisneverusedinproductioncode123: "magicmarker123" }
type ArbitraryThreadType = FunctionThread<any, any> & ModuleThread<any>

Expand All @@ -31,6 +33,7 @@ type ExposedToThreadType<Exposed extends WorkerFunction | WorkerModule<any>> =
? ModuleThread<Exposed>
: never

console.log('hello from spawn')

const debugMessages = DebugLogger("threads:master:messages")
const debugSpawn = DebugLogger("threads:master:spawn")
Expand Down Expand Up @@ -106,7 +109,7 @@ function createEventObservable(worker: WorkerType, workerTermination: Promise<an
})
}

function createTerminator(worker: WorkerType): { termination: Promise<void>, terminate: () => Promise<void> } {
function createTerminator(worker: TWorker): {termination: Promise<void>, terminate: () => Promise<void> } {
const [termination, resolver] = createPromiseWithResolver<void>()
const terminate = async () => {
debugThreadUtils("Terminating worker")
Expand All @@ -117,12 +120,29 @@ function createTerminator(worker: WorkerType): { termination: Promise<void>, ter
return { terminate, termination }
}

function setPrivateThreadProps<T>(raw: T, worker: WorkerType, workerEvents: Observable<WorkerEvent>, terminate: () => Promise<void>): T & PrivateThreadProps {
function createSharedWorkerTerminator(worker: SharedWorker): {
termination: Promise<void>
terminate: () => Promise<void>
} {
const [termination, resolver] = createPromiseWithResolver<void>()
const terminate = async () => {
debugThreadUtils("Terminating shared worker")
await worker.port.close()
resolver()
}
return { terminate, termination }
}

function setPrivateThreadProps<T>(
raw: T,
worker: WorkerType,
workerEvents: Observable<WorkerEvent>,
terminate: () => Promise<void>
): T & PrivateThreadProps {
const workerErrors = workerEvents
.filter(event => event.type === WorkerEventType.internalError)
.map(errorEvent => (errorEvent as WorkerInternalErrorEvent).error)

// tslint:disable-next-line prefer-object-spread
return Object.assign(raw, {
[$errors]: workerErrors,
[$events]: workerEvents,
Expand All @@ -136,7 +156,7 @@ function setPrivateThreadProps<T>(raw: T, worker: WorkerType, workerEvents: Obse
* abstraction layer to provide the transparent API and verifies that
* the worker has initialized successfully.
*
* @param worker Instance of `Worker`. Either a web worker, `worker_threads` worker or `tiny-worker` worker.
* @param worker Instance of `Worker` or `SharedWorker`. Either a web worker, `worker_threads` worker or `tiny-worker` worker.
* @param [options]
* @param [options.timeout] Init message timeout. Default: 10000 or set by environment variable.
*/
Expand All @@ -146,11 +166,37 @@ export async function spawn<Exposed extends WorkerFunction | WorkerModule<any> =
): Promise<ExposedToThreadType<Exposed>> {
debugSpawn("Initializing new thread")

console.log('0000');

const timeout = options && options.timeout ? options.timeout : initMessageTimeout

console.log('000')

const initMessage = await withTimeout(receiveInitMessage(worker), timeout, `Timeout: Did not receive an init message from worker after ${timeout}ms. Make sure the worker calls expose().`)

console.log('00')

const exposed = initMessage.exposed
let termination, terminate

console.log('a')

if (worker instanceof SharedWorker) {
const o = createSharedWorkerTerminator(worker)

console.log('b')

termination = o.termination
terminate = o.terminate
} else {
const o = createTerminator(worker)

termination = o.termination
terminate = o.terminate
}

console.log('c', exposed.type)

const { termination, terminate } = createTerminator(worker)
const events = createEventObservable(worker, termination)

if (exposed.type === "function") {
Expand Down
3 changes: 2 additions & 1 deletion src/types/master.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export interface PrivateThreadProps {
[$errors]: Observable<Error>
[$events]: Observable<WorkerEvent>
[$terminate]: () => Promise<void>
[$worker]: Worker
[$worker]: SharedWorker | Worker
}

export type FunctionThread<Args extends any[] = any[], ReturnType = any> = ProxyableFunction<Args, ReturnType> & PrivateThreadProps
Expand Down Expand Up @@ -113,6 +113,7 @@ export declare class BlobWorker extends WorkerImplementation {

export interface ImplementationExport {
blob: typeof BlobWorker
shared?: typeof SharedWorker
default: typeof WorkerImplementation
}

Expand Down
35 changes: 31 additions & 4 deletions src/worker/implementation.browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,54 @@ interface WorkerGlobalScope {
addEventListener(eventName: string, listener: (event: Event) => void): void
postMessage(message: any, transferables?: any[]): void
removeEventListener(eventName: string, listener: (event: Event) => void): void
port?: WorkerGlobalScope & { start: () => void }
}

declare const self: WorkerGlobalScope

const isWorkerRuntime: AbstractedWorkerAPI["isWorkerRuntime"] = function isWorkerRuntime() {
const isWindowContext = typeof self !== "undefined" && typeof Window !== "undefined" && self instanceof Window
return typeof self !== "undefined" && self.postMessage && !isWindowContext ? true : false
const port = self.port || self;

return typeof self !== "undefined" && port.postMessage && !isWindowContext ? true : false
}

const postMessageToMaster: AbstractedWorkerAPI["postMessageToMaster"] = function postMessageToMaster(data, transferList?) {
self.postMessage(data, transferList)
const port = self.port || self;

port.postMessage(data, transferList || [])
}

const subscribeToMasterMessages: AbstractedWorkerAPI["subscribeToMasterMessages"] = function subscribeToMasterMessages(onMessage) {
const port = self.port || self;
const messageHandler = (messageEvent: MessageEvent) => {
onMessage(messageEvent.data)
}

// TODO: Handle onconnect here somehow!
if (self.port) {
// @ts-ignore TODO: Testing for now
const connectHandler = (e) => {
const port = e.ports[0];

port.addEventListener('message', (messageEvent: MessageEvent) => {
port.onMessage(messageEvent.data)
});

port.start();
}

self.port.addEventListener('connect', connectHandler)

// TODO: Does this need unsubscription too?
return () => {};
}

const unsubscribe = () => {
self.removeEventListener("message", messageHandler as EventListener)
port.removeEventListener("message", messageHandler as EventListener)
}
self.addEventListener("message", messageHandler as EventListener)
port.addEventListener("message", messageHandler as EventListener)

return unsubscribe
}

Expand Down
6 changes: 6 additions & 0 deletions src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ async function runFunction(jobUID: number, fn: WorkerFunction, args: any[]) {
}
}

console.log('hello from worker index')

/**
* Expose a function or a module (an object whose values are functions)
* to the main thread. Must be called exactly once in every worker thread
Expand All @@ -158,6 +160,8 @@ async function runFunction(jobUID: number, fn: WorkerFunction, args: any[]) {
* @param exposed Function or object whose values are functions
*/
export function expose(exposed: WorkerFunction | WorkerModule<any>) {
console.log('at expose', Implementation.isWorkerRuntime(), exposeCalled)

if (!Implementation.isWorkerRuntime()) {
throw Error("expose() called in the master thread.")
}
Expand All @@ -166,6 +170,8 @@ export function expose(exposed: WorkerFunction | WorkerModule<any>) {
}
exposeCalled = true

console.log('at expose, continuing', exposed)

if (typeof exposed === "function") {
Implementation.subscribeToMasterMessages(messageData => {
if (isMasterJobRunMessage(messageData) && !messageData.method) {
Expand Down
27 changes: 26 additions & 1 deletion test/spawn.chromium.mocha.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
*/

import { expect } from "chai"
import { spawn, BlobWorker, Thread } from "../"
import { spawn, BlobWorker, SharedWorker, Thread } from "../"

// We need this as a work-around to make our threads Worker global, since
// the bundler would otherwise not recognize `new Worker()` as a web worker
import "../src/master/register"

describe("threads in browser", function() {
/*
it("can spawn and terminate a thread", async function() {
const helloWorld = await spawn<() => string>(new Worker("./workers/hello-world.js"))
expect(await helloWorld()).to.equal("Hello World")
Expand Down Expand Up @@ -43,4 +44,28 @@ describe("threads in browser", function() {
expect(await increment()).to.equal(3)
await Thread.terminate(increment)
})
*/

it("can spawn and terminate a thread with a shared worker", async function () {
const sharedWorker = new SharedWorker("./workers/hello-world.js");

console.log('hello before spawn');

const helloWorld = await spawn<() => string>(sharedWorker);

console.log("hello world fn", helloWorld);

expect(await helloWorld()).to.equal("Hello World");
await Thread.terminate(helloWorld);
})

it("can call a function thread with a shared worker more than once", async function () {
const sharedWorker = new SharedWorker("./workers/increment.js");

const increment = await spawn<() => number>(sharedWorker);
expect(await increment()).to.equal(1);
expect(await increment()).to.equal(2);
expect(await increment()).to.equal(3);
await Thread.terminate(increment);
})
})
8 changes: 8 additions & 0 deletions test/workers/hello-world.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import { expose } from "../../src/worker"

onconnect = () => {
expose(function helloWorld() {
return "Hello World"
})
}

console.log('hello from worker')

expose(function helloWorld() {
return "Hello World"
})