Skip to content

Resolve transferable object types on call #352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
"observable-fns": "^0.5.1"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^16.0.0",
"@rollup/plugin-node-resolve": "^10.0.0",
"@types/chai": "^4.2.14",
"@types/debug": "^4.1.5",
"@types/execa": "^2.0.0",
Expand All @@ -88,8 +90,6 @@
"raw-loader": "^4.0.2",
"rimraf": "^3.0.2",
"rollup": "^2.32.1",
"@rollup/plugin-commonjs": "^16.0.0",
"@rollup/plugin-node-resolve": "^10.0.0",
"threads-plugin": "^1.3.3",
"tiny-worker": "^2.2.0",
"ts-loader": "^8.0.7",
Expand Down
2 changes: 1 addition & 1 deletion src/master/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ class WorkerPool<ThreadType extends Thread> implements Pool<ThreadType> {
next(event) {
if (event.type === PoolEventType.taskQueueDrained) {
subscription.unsubscribe()
resolve()
resolve(void 0)
}
},
error: reject // make a pool-wide error reject the completed() result promise
Expand Down
14 changes: 12 additions & 2 deletions src/types/master.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import { Observable } from "observable-fns"
import { ObservablePromise } from "../observable-promise"
import { $errors, $events, $terminate, $worker } from "../symbols"
import { TransferDescriptor } from "../transferable"

interface ObservableLikeSubscription {
unsubscribe(): any
Expand All @@ -26,12 +27,21 @@ export type StripAsync<Type> =
? ObservableBaseType
: Type

export type StripTransfer<Type> =
Type extends TransferDescriptor<infer BaseType>
? BaseType
: Type

export type ModuleMethods = { [methodName: string]: (...args: any) => any }

export type ProxyableArgs<Args extends any[]> = Args extends [arg0: infer Arg0, ...rest: infer RestArgs]
? [Arg0 extends Transferable ? Arg0 | TransferDescriptor<Arg0> : Arg0, ...RestArgs]
: Args

export type ProxyableFunction<Args extends any[], ReturnType> =
Args extends []
? () => ObservablePromise<StripAsync<ReturnType>>
: (...args: Args) => ObservablePromise<StripAsync<ReturnType>>
? () => ObservablePromise<StripTransfer<StripAsync<ReturnType>>>
: (...args: ProxyableArgs<Args>) => ObservablePromise<StripTransfer<StripAsync<ReturnType>>>

export type ModuleProxy<Methods extends ModuleMethods> = {
[method in keyof Methods]: ProxyableFunction<Parameters<Methods[method]>, ReturnType<Methods[method]>>
Expand Down
7 changes: 5 additions & 2 deletions test/transferables.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import test from "ava"
import { spawn, Thread, Transfer, TransferDescriptor, Worker } from "../src/index"
import { XorBuffer } from "./workers/arraybuffer-xor"

type SpyInit<Args extends any[], OriginalReturn, NewReturn> =
(originalFn: (...args: Args) => OriginalReturn) =>
Expand Down Expand Up @@ -43,8 +44,10 @@ test("can pass transferable objects on thread call", async t => {
return postMessage(...args)
})

const xorBuffer = await spawn<(buffer: ArrayBuffer | TransferDescriptor<ArrayBuffer>, value: number) => ArrayBuffer>(worker)
await xorBuffer(Transfer(testData), 15)
const xorBuffer = await spawn<XorBuffer>(worker)
const returnedBuffer = await xorBuffer(Transfer(testData), 15)

t.is(returnedBuffer.byteLength, 64)

t.is(postMessageCalls.length, 1)
t.is(postMessageCalls[0].length, 2)
Expand Down
7 changes: 5 additions & 2 deletions test/workers/arraybuffer-xor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { expose, Transfer } from "../../src/worker"

expose(function xor(buffer: ArrayBuffer, value: number) {
function xor(buffer: ArrayBuffer, value: number) {
const view = new Uint8Array(buffer)
view.forEach((byte, offset) => view.set([byte ^ value], offset))
return Transfer(buffer)
})
}

expose(xor)
export type XorBuffer = typeof xor