diff --git a/README.md b/README.md index 047eb756..fec1f469 100644 --- a/README.md +++ b/README.md @@ -181,6 +181,40 @@ Use `expose()` to make a function or an object containing methods callable from In case of exposing an object, `spawn()` will asynchronously return an object exposing all the object's functions. If you `expose()` a function, `spawn` will also return a callable function, not an object. +## Shared Workers + +[Shared Workers](https://developer.mozilla.org/en-US/docs/Web/API/SharedWorker) can be accessed from multiple browsing contexts (windows, iframes, other workers) making them useful for sharing tasks such as synchronization and avoiding redundant work. + +In threads.js, the functionality is exposed as follows: + +```js +// master.js +import { spawn, SharedWorker, Thread } from "threads" + +const auth = await spawn(new SharedWorker("./workers/auth")) +const hashed = await auth.hashPassword("Super secret password", "1234") + +console.log("Hashed password:", hashed) + +await Thread.terminate(auth) +``` + +```js +// workers/auth.js +import sha256 from "js-sha256" +import { expose } from "threads/worker" + +expose({ + hashPassword(password, salt) { + return sha256(password + salt) + }, +}) +``` + +As you might notice, compared to the original example, only the imports (`Worker` -> `SharedWorker` and `expose` path) have changed. + +Note that as the functionality makes sense only in the browser, it's available only there. Based on [caniuse](https://caniuse.com/sharedworkers), the functionality is widely supported [Safari being a notable exception](https://bugs.webkit.org/show_bug.cgi?id=149850). + ## Usage

diff --git a/package.json b/package.json index c4ef1233..9f887d86 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,7 @@ "build:es": "tsc -p tsconfig-esm.json", "postbuild": "npm run bundle", "bundle": "rollup -c -f umd --file=bundle/worker.js --name=threads --silent -- dist-esm/worker/bundle-entry.js", - "test": "npm run test:library && npm run test:tooling && npm run test:puppeteer:basic && npm run test:puppeteer:webpack", + "test": "npm run test:library && npm run test:tooling && npm run test:puppeteer:basic && test:puppeteer:shared && npm run test:puppeteer:webpack", "test:library": "cross-env TS_NODE_FILES=true ava ./test/**/*.test.ts", "test:tooling": "cross-env TS_NODE_FILES=true ava ./test-tooling/**/*.test.ts", "test:puppeteer:basic": "puppet-run --plugin=mocha --bundle=./test/workers/:workers/ --serve=./bundle/worker.js:/worker.js ./test/*.chromium*.ts", diff --git a/src/master/implementation.browser.ts b/src/master/implementation.browser.ts index 87846b97..c8c58134 100644 --- a/src/master/implementation.browser.ts +++ b/src/master/implementation.browser.ts @@ -63,6 +63,7 @@ function selectWorkerImplementation(): ImplementationExport { return { blob: BlobWorker, + shared: SharedWorker, default: WebWorker } } diff --git a/src/master/index.ts b/src/master/index.ts index ed1b2da1..82034da2 100644 --- a/src/master/index.ts +++ b/src/master/index.ts @@ -17,3 +17,6 @@ export const BlobWorker = getWorkerImplementation().blob /** Worker implementation. Either web worker or a node.js Worker class. */ export const Worker = getWorkerImplementation().default + +/** Shared Worker implementation. Available only in the web. */ +export const SharedWorker = getWorkerImplementation().shared diff --git a/src/master/invocation-proxy.ts b/src/master/invocation-proxy.ts index 231eb1d2..002529cd 100644 --- a/src/master/invocation-proxy.ts +++ b/src/master/invocation-proxy.ts @@ -14,7 +14,7 @@ import { ModuleMethods, ModuleProxy, ProxyableFunction, - Worker as WorkerType + Worker as TWorker, } from "../types/master" import { MasterJobCancelMessage, @@ -26,6 +26,8 @@ import { WorkerMessageType } from "../types/messages" +type WorkerType = SharedWorker | TWorker + const debugMessages = DebugLogger("threads:master:messages") let nextJobUID = 1 @@ -75,13 +77,19 @@ function createObservableForJob(worker: WorkerType, jobUID: number): worker.addEventListener("message", messageHandler) + if (worker instanceof SharedWorker) { + worker.port.start() + } + return () => { if (asyncType === "observable" || !asyncType) { const cancelMessage: MasterJobCancelMessage = { type: MasterMessageType.cancel, uid: jobUID } - worker.postMessage(cancelMessage) + const port = worker instanceof SharedWorker ? worker.port : worker; + + port.postMessage(cancelMessage, []); } worker.removeEventListener("message", messageHandler) } @@ -125,11 +133,12 @@ export function createProxyFunction(worker: Work method, args } + const port = worker instanceof SharedWorker ? worker.port : worker; debugMessages("Sending command to run function to worker:", runMessage) try { - worker.postMessage(runMessage, transferables) + port.postMessage(runMessage, transferables) } catch (error) { return ObservablePromise.from(Promise.reject(error)) } diff --git a/src/master/spawn.ts b/src/master/spawn.ts index 2843bca0..3ba526ce 100644 --- a/src/master/spawn.ts +++ b/src/master/spawn.ts @@ -8,7 +8,7 @@ import { ModuleThread, PrivateThreadProps, StripAsync, - Worker as WorkerType, + Worker as TWorker, WorkerEvent, WorkerEventType, WorkerInternalErrorEvent, @@ -19,6 +19,8 @@ import { WorkerInitMessage, WorkerUncaughtErrorMessage } from "../types/messages import { WorkerFunction, WorkerModule } from "../types/worker" import { createProxyFunction, createProxyModule } from "./invocation-proxy" +type WorkerType = SharedWorker | TWorker + type ArbitraryWorkerInterface = WorkerFunction & WorkerModule & { somekeythatisneverusedinproductioncode123: "magicmarker123" } type ArbitraryThreadType = FunctionThread & ModuleThread @@ -31,6 +33,7 @@ type ExposedToThreadType> = ? ModuleThread : never +console.log('hello from spawn') const debugMessages = DebugLogger("threads:master:messages") const debugSpawn = DebugLogger("threads:master:spawn") @@ -106,7 +109,7 @@ function createEventObservable(worker: WorkerType, workerTermination: Promise, terminate: () => Promise } { +function createTerminator(worker: TWorker): {termination: Promise, terminate: () => Promise } { const [termination, resolver] = createPromiseWithResolver() const terminate = async () => { debugThreadUtils("Terminating worker") @@ -117,12 +120,29 @@ function createTerminator(worker: WorkerType): { termination: Promise, ter return { terminate, termination } } -function setPrivateThreadProps(raw: T, worker: WorkerType, workerEvents: Observable, terminate: () => Promise): T & PrivateThreadProps { +function createSharedWorkerTerminator(worker: SharedWorker): { + termination: Promise + terminate: () => Promise +} { + const [termination, resolver] = createPromiseWithResolver() + const terminate = async () => { + debugThreadUtils("Terminating shared worker") + await worker.port.close() + resolver() + } + return { terminate, termination } +} + +function setPrivateThreadProps( + raw: T, + worker: WorkerType, + workerEvents: Observable, + terminate: () => Promise +): T & PrivateThreadProps { const workerErrors = workerEvents .filter(event => event.type === WorkerEventType.internalError) .map(errorEvent => (errorEvent as WorkerInternalErrorEvent).error) - // tslint:disable-next-line prefer-object-spread return Object.assign(raw, { [$errors]: workerErrors, [$events]: workerEvents, @@ -136,7 +156,7 @@ function setPrivateThreadProps(raw: T, worker: WorkerType, workerEvents: Obse * abstraction layer to provide the transparent API and verifies that * the worker has initialized successfully. * - * @param worker Instance of `Worker`. Either a web worker, `worker_threads` worker or `tiny-worker` worker. + * @param worker Instance of `Worker` or `SharedWorker`. Either a web worker, `worker_threads` worker or `tiny-worker` worker. * @param [options] * @param [options.timeout] Init message timeout. Default: 10000 or set by environment variable. */ @@ -146,11 +166,37 @@ export async function spawn = ): Promise> { debugSpawn("Initializing new thread") +console.log('0000'); + const timeout = options && options.timeout ? options.timeout : initMessageTimeout + + console.log('000') + const initMessage = await withTimeout(receiveInitMessage(worker), timeout, `Timeout: Did not receive an init message from worker after ${timeout}ms. Make sure the worker calls expose().`) + + console.log('00') + const exposed = initMessage.exposed + let termination, terminate + +console.log('a') + + if (worker instanceof SharedWorker) { + const o = createSharedWorkerTerminator(worker) + +console.log('b') + + termination = o.termination + terminate = o.terminate + } else { + const o = createTerminator(worker) + + termination = o.termination + terminate = o.terminate + } + +console.log('c', exposed.type) - const { termination, terminate } = createTerminator(worker) const events = createEventObservable(worker, termination) if (exposed.type === "function") { diff --git a/src/types/master.ts b/src/types/master.ts index 604e7fa9..ab081427 100644 --- a/src/types/master.ts +++ b/src/types/master.ts @@ -51,7 +51,7 @@ export interface PrivateThreadProps { [$errors]: Observable [$events]: Observable [$terminate]: () => Promise - [$worker]: Worker + [$worker]: SharedWorker | Worker } export type FunctionThread = ProxyableFunction & PrivateThreadProps @@ -113,6 +113,7 @@ export declare class BlobWorker extends WorkerImplementation { export interface ImplementationExport { blob: typeof BlobWorker + shared?: typeof SharedWorker default: typeof WorkerImplementation } diff --git a/src/worker/implementation.browser.ts b/src/worker/implementation.browser.ts index 1988ad9c..14804c1a 100644 --- a/src/worker/implementation.browser.ts +++ b/src/worker/implementation.browser.ts @@ -7,27 +7,54 @@ interface WorkerGlobalScope { addEventListener(eventName: string, listener: (event: Event) => void): void postMessage(message: any, transferables?: any[]): void removeEventListener(eventName: string, listener: (event: Event) => void): void + port?: WorkerGlobalScope & { start: () => void } } declare const self: WorkerGlobalScope const isWorkerRuntime: AbstractedWorkerAPI["isWorkerRuntime"] = function isWorkerRuntime() { const isWindowContext = typeof self !== "undefined" && typeof Window !== "undefined" && self instanceof Window - return typeof self !== "undefined" && self.postMessage && !isWindowContext ? true : false + const port = self.port || self; + + return typeof self !== "undefined" && port.postMessage && !isWindowContext ? true : false } const postMessageToMaster: AbstractedWorkerAPI["postMessageToMaster"] = function postMessageToMaster(data, transferList?) { - self.postMessage(data, transferList) + const port = self.port || self; + + port.postMessage(data, transferList || []) } const subscribeToMasterMessages: AbstractedWorkerAPI["subscribeToMasterMessages"] = function subscribeToMasterMessages(onMessage) { + const port = self.port || self; const messageHandler = (messageEvent: MessageEvent) => { onMessage(messageEvent.data) } + + // TODO: Handle onconnect here somehow! + if (self.port) { + // @ts-ignore TODO: Testing for now + const connectHandler = (e) => { + const port = e.ports[0]; + + port.addEventListener('message', (messageEvent: MessageEvent) => { + port.onMessage(messageEvent.data) + }); + + port.start(); + } + + self.port.addEventListener('connect', connectHandler) + + // TODO: Does this need unsubscription too? + return () => {}; + } + const unsubscribe = () => { - self.removeEventListener("message", messageHandler as EventListener) + port.removeEventListener("message", messageHandler as EventListener) } - self.addEventListener("message", messageHandler as EventListener) + port.addEventListener("message", messageHandler as EventListener) + return unsubscribe } diff --git a/src/worker/index.ts b/src/worker/index.ts index 79bbf0be..c40b099d 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -150,6 +150,8 @@ async function runFunction(jobUID: number, fn: WorkerFunction, args: any[]) { } } +console.log('hello from worker index') + /** * Expose a function or a module (an object whose values are functions) * to the main thread. Must be called exactly once in every worker thread @@ -158,6 +160,8 @@ async function runFunction(jobUID: number, fn: WorkerFunction, args: any[]) { * @param exposed Function or object whose values are functions */ export function expose(exposed: WorkerFunction | WorkerModule) { + console.log('at expose', Implementation.isWorkerRuntime(), exposeCalled) + if (!Implementation.isWorkerRuntime()) { throw Error("expose() called in the master thread.") } @@ -166,6 +170,8 @@ export function expose(exposed: WorkerFunction | WorkerModule) { } exposeCalled = true + console.log('at expose, continuing', exposed) + if (typeof exposed === "function") { Implementation.subscribeToMasterMessages(messageData => { if (isMasterJobRunMessage(messageData) && !messageData.method) { diff --git a/test/spawn.chromium.mocha.ts b/test/spawn.chromium.mocha.ts index 6a5cb350..e985fb96 100644 --- a/test/spawn.chromium.mocha.ts +++ b/test/spawn.chromium.mocha.ts @@ -4,13 +4,14 @@ */ import { expect } from "chai" -import { spawn, BlobWorker, Thread } from "../" +import { spawn, BlobWorker, SharedWorker, Thread } from "../" // We need this as a work-around to make our threads Worker global, since // the bundler would otherwise not recognize `new Worker()` as a web worker import "../src/master/register" describe("threads in browser", function() { +/* it("can spawn and terminate a thread", async function() { const helloWorld = await spawn<() => string>(new Worker("./workers/hello-world.js")) expect(await helloWorld()).to.equal("Hello World") @@ -43,4 +44,28 @@ describe("threads in browser", function() { expect(await increment()).to.equal(3) await Thread.terminate(increment) }) +*/ + + it("can spawn and terminate a thread with a shared worker", async function () { + const sharedWorker = new SharedWorker("./workers/hello-world.js"); + + console.log('hello before spawn'); + + const helloWorld = await spawn<() => string>(sharedWorker); + + console.log("hello world fn", helloWorld); + + expect(await helloWorld()).to.equal("Hello World"); + await Thread.terminate(helloWorld); + }) + + it("can call a function thread with a shared worker more than once", async function () { + const sharedWorker = new SharedWorker("./workers/increment.js"); + + const increment = await spawn<() => number>(sharedWorker); + expect(await increment()).to.equal(1); + expect(await increment()).to.equal(2); + expect(await increment()).to.equal(3); + await Thread.terminate(increment); + }) }) diff --git a/test/workers/hello-world.ts b/test/workers/hello-world.ts index f573eb44..69768b58 100644 --- a/test/workers/hello-world.ts +++ b/test/workers/hello-world.ts @@ -1,5 +1,13 @@ import { expose } from "../../src/worker" +onconnect = () => { + expose(function helloWorld() { + return "Hello World" + }) +} + +console.log('hello from worker') + expose(function helloWorld() { return "Hello World" })