From c4da17500b74c5e26330f8a983b431c9f3f3d4ab Mon Sep 17 00:00:00 2001 From: Milly Date: Sun, 19 Oct 2025 19:14:37 +0900 Subject: [PATCH] :+1: Add accumulate() function for automatic RPC batching Add accumulate() function that automatically batches multiple RPC calls that occur at the same timing during microtask processing. This enables parallel RPC execution with automatic batching and proper error handling. --- batch/accumulate.ts | 264 ++++++++ batch/accumulate_test.ts | 1245 ++++++++++++++++++++++++++++++++++++++ batch/error.ts | 43 ++ batch/mod.ts | 22 +- deno.jsonc | 1 + 5 files changed, 1574 insertions(+), 1 deletion(-) create mode 100644 batch/accumulate.ts create mode 100644 batch/accumulate_test.ts create mode 100644 batch/error.ts diff --git a/batch/accumulate.ts b/batch/accumulate.ts new file mode 100644 index 0000000..f3a1e16 --- /dev/null +++ b/batch/accumulate.ts @@ -0,0 +1,264 @@ +import { nextTick } from "node:process"; +import type { Call, Context, Denops, Dispatcher, Meta } from "@denops/core"; +import { BatchError } from "@denops/core"; +import { AccumulateCancelledError } from "./error.ts"; + +const errorProp = Symbol("AccumulateErrorResult"); + +type ErrorResult = { + [errorProp]: + | { type: "error"; message: string; cause: unknown } + | { type: "cancel"; cause: unknown } + | { type: "unknown"; message: string; cause: unknown }; +}; + +class AccumulateHelper implements Denops { + readonly #denops: Denops; + readonly #calls: Call[] = []; + readonly #results: unknown[] = []; + readonly #disposer = Promise.withResolvers(); + #closed = false; + #resolvedWaiter = Promise.withResolvers(); + + constructor(denops: Denops) { + this.#denops = denops; + } + + static close(helper: AccumulateHelper): void { + helper.#closed = true; + helper.#disposer.promise.catch(() => {/* prevent unhandled rejection */}); + helper.#disposer.reject(); + } + + get name(): string { + return this.#denops.name; + } + + get meta(): Meta { + return this.#denops.meta; + } + + get interrupted(): AbortSignal | undefined { + return this.#denops.interrupted; + } + + get context(): Record { + return this.#denops.context; + } + + get dispatcher(): Dispatcher { + return this.#denops.dispatcher; + } + + set dispatcher(dispatcher: Dispatcher) { + this.#denops.dispatcher = dispatcher; + } + + async redraw(force?: boolean): Promise { + return await this.#denops.redraw(force); + } + + async call(fn: string, ...args: unknown[]): Promise { + this.#ensureAvailable(); + const call: Call = [fn, ...args]; + const [result] = await this.#waitResolved([call]); + + if (isErrorResult(result)) { + const error = result[errorProp]; + if (error.type === "error") { + throw new Error(error.message, { cause: error.cause }); + } else if (error.type === "cancel") { + const repr = `['${fn}', ...]`; + throw new AccumulateCancelledError( + `Call was cancelled due to another error in parallel execution: ${repr}`, + { calls: [call], cause: error.cause }, + ); + } else { + throw new Error(error.message, { cause: error.cause }); + } + } + + return result; + } + + async batch(...calls: Call[]): Promise { + this.#ensureAvailable(); + if (calls.length === 0) { + return []; + } + const results = await this.#waitResolved(calls); + + const errorIndex = results.findIndex(isErrorResult); + if (errorIndex >= 0) { + const { [errorProp]: error } = results[errorIndex] as ErrorResult; + if (error.type === "error") { + throw new BatchError(error.message, results.slice(0, errorIndex)); + } else if (error.type === "cancel") { + const [[fn]] = calls; + const repr = `[['${fn}', ...], ... total ${calls.length} calls]`; + throw new AccumulateCancelledError( + `Batch calls were cancelled due to another error in parallel execution: ${repr}`, + { calls, cause: error.cause }, + ); + } else { + throw new Error(error.message, { cause: error.cause }); + } + } + + return results; + } + + async cmd(cmd: string, ctx: Context = {}): Promise { + await this.call("denops#api#cmd", cmd, ctx); + } + + async eval(expr: string, ctx: Context = {}): Promise { + return await this.call("denops#api#eval", expr, ctx); + } + + async dispatch( + name: string, + fn: string, + ...args: unknown[] + ): Promise { + return await this.#denops.dispatch(name, fn, ...args); + } + + #ensureAvailable(): void { + if (this.#closed) { + throw new TypeError( + "AccumulateHelper instance is not available outside of 'accumulate' block", + ); + } + } + + async #waitResolved(calls: Call[]): Promise { + const start = this.#calls.length; + this.#calls.push(...calls); + const end = this.#calls.length; + nextTick(() => { + if (end === this.#calls.length) { + this.#resolvePendingCalls(); + } + }); + try { + await Promise.race([ + this.#disposer.promise, + this.#resolvedWaiter.promise, + ]); + } catch { + // Rethrow the error if the disposer is rejected. + this.#ensureAvailable(); + } + return this.#results.slice(start, end); + } + + async #resolvePendingCalls(): Promise { + const resultIndex = this.#results.length; + const calls = this.#calls.slice(resultIndex); + this.#results.length = this.#calls.length; + const { resolve } = this.#resolvedWaiter; + this.#resolvedWaiter = Promise.withResolvers(); + if (!this.#closed) { + const results = await this.#resolveCalls(calls); + this.#results.splice(resultIndex, results.length, ...results); + } + resolve(); + } + + async #resolveCalls(calls: Call[]): Promise { + try { + return await this.#denops.batch(...calls); + } catch (error: unknown) { + if (isBatchError(error)) { + const { results, message } = error; + const errorResult = { + [errorProp]: { type: "error", message, cause: error }, + }; + const cancelledResults = calls.slice(results.length + 1) + .map(() => ({ + [errorProp]: { type: "cancel", cause: error }, + })); + return [...results, errorResult, ...cancelledResults]; + } else { + const message = error instanceof Error ? error.message : String(error); + const unknownErrors = calls.map(() => ({ + [errorProp]: { type: "unknown", message, cause: error }, + })); + return unknownErrors; + } + } + } +} + +function isBatchError(obj: unknown): obj is BatchError { + return obj instanceof Error && obj.name === "BatchError"; +} + +function isErrorResult(obj: unknown): obj is ErrorResult { + return obj != null && Object.hasOwn(obj, errorProp); +} + +/** + * Runs an `executor` function while automatically batching multiple RPCs. + * + * `accumulate()` allows you to write normal async functions while automatically + * batching multiple RPCs that occur at the same timing (during microtask + * processing) into a single RPC call. + * + * Note that RPC calls with side effects should be avoided, and if you do, the + * order in which you call them should be carefully considered. + * + * @example + * ```typescript + * import { assertType, IsExact } from "jsr:@std/testing/types"; + * import type { Entrypoint } from "jsr:@denops/std"; + * import * as fn from "jsr:@denops/std/function"; + * import { accumulate } from "jsr:@denops/std/batch"; + * + * export const main: Entrypoint = async (denops) => { + * const results = await accumulate(denops, async (denops) => { + * const lines = await fn.getline(denops, 1, "$"); + * return await Promise.all(lines.map(async (line, index) => { + * const keyword = await fn.matchstr(denops, line, "\\k\\+"); + * const len = await fn.len(denops, keyword); + * return { + * lnum: index + 1, + * keyword, + * len, + * }; + * })); + * }); + * + * assertType< + * IsExact< + * typeof results, + * { lnum: number; keyword: string; len: number; }[] + * > + * >(true); + * } + * ``` + * + * In the case of the example, the following 3 RPCs are called. + * + * 1. RPC call to `getline`. + * 2. Multiple `matchstr` calls in one RPC. + * 3. Multiple `len` calls in one RPC. + * + * @remarks + * The `denops` instance passed as the argument to the `executor` function is + * only valid within the `accumulate()` block. Attempting to use it outside the + * block will result in an error when calling `denops.call()`, `denops.batch()`, + * `denops.cmd()`, or `denops.eval()`. + */ +export async function accumulate( + denops: Denops, + executor: (helper: Denops) => T, +): Promise> { + const helper = new AccumulateHelper(denops); + try { + return await executor(helper); + } finally { + AccumulateHelper.close(helper); + } +} diff --git a/batch/accumulate_test.ts b/batch/accumulate_test.ts new file mode 100644 index 0000000..4a51df9 --- /dev/null +++ b/batch/accumulate_test.ts @@ -0,0 +1,1245 @@ +import { flushPromises, peekPromiseState } from "@core/asyncutil"; +import { delay } from "@std/async"; +import { + assertEquals, + assertInstanceOf, + assertRejects, + assertStrictEquals, + assertStringIncludes, +} from "@std/assert"; +import { assertType, type IsExact } from "@std/testing/types"; +import { + assertSpyCallArgs, + assertSpyCalls, + resolvesNext, + spy, + stub, +} from "@std/testing/mock"; +import { DisposableStack } from "@nick/dispose"; +import { BatchError, type Denops } from "@denops/core"; +import { batch, collect } from "./mod.ts"; +import { DenopsStub, test } from "@denops/test"; + +import { accumulate } from "./accumulate.ts"; +import { AccumulateCancelledError } from "./error.ts"; + +function preventUnhandledRejection(promise: Promise): void { + promise.catch(() => {/* noop */}); +} + +Deno.test("accumulate() resolves", async (t) => { + const mocked_denops = new DenopsStub(); + const stubBatch = (...values: unknown[]) => + stub( + mocked_denops, + "batch", + (...calls) => { + if (calls.length > values.length) { + return Promise.reject(new Error("Too few values")); + } + return Promise.resolve(values.splice(0, calls.length)); + }, + ); + + await t.step("undefined", async () => { + using denops_batch = stubBatch(undefined); + const actual = await accumulate(mocked_denops, (_helper) => { + return undefined; + }); + assertType>(true); + assertEquals(actual, undefined); + assertSpyCalls(denops_batch, 0); + }); + await t.step("null", async () => { + using denops_batch = stubBatch(null); + const actual = await accumulate(mocked_denops, (helper) => { + return helper.call("eval", "v:none") as Promise; + }); + assertType>(true); + assertEquals(actual, null); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [["eval", "v:none"]], + ]); + }); + await t.step("number", async () => { + using denops_batch = stubBatch(42); + const actual = await accumulate(mocked_denops, (helper) => { + return helper.call("strlen", "foo") as Promise; + }); + assertType>(true); + assertEquals(actual, 42); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [["strlen", "foo"]], + ]); + }); + await t.step("string", async () => { + using denops_batch = stubBatch("foo"); + const actual = await accumulate(mocked_denops, (helper) => { + return helper.call("matchstr", "foo", ".*") as Promise; + }); + assertType>(true); + assertEquals(actual, "foo"); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [["matchstr", "foo", ".*"]], + ]); + }); + await t.step("boolean", async () => { + using denops_batch = stubBatch(true); + const actual = await accumulate(mocked_denops, (helper) => { + return helper.call("eval", "v:true") as Promise; + }); + assertType>(true); + assertEquals(actual, true); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [["eval", "v:true"]], + ]); + }); + await t.step("bigint", async () => { + using denops_batch = stubBatch(42); + const actual = await accumulate(mocked_denops, async (helper) => { + return BigInt(await helper.call("strlen", "foo") as number); + }); + assertType>(true); + assertEquals(actual, 42n); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [["strlen", "foo"]], + ]); + }); + await t.step("Object", async () => { + using denops_batch = stubBatch(42, "a", true); + const actual = await accumulate(mocked_denops, async (helper) => { + const [a, b, c] = await Promise.all([ + helper.call("strlen", "foo") as Promise, + helper.call("matchstr", "bar", "a") as Promise, + helper.call("eval", "v:true") as Promise, + ]); + return { a, b, c }; + }); + assertType< + IsExact + >(true); + assertEquals(actual, { a: 42, b: "a", c: true }); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["matchstr", "bar", "a"], + ["eval", "v:true"], + ], + ]); + }); + await t.step("Tuple", async () => { + using denops_batch = stubBatch(42, "a", true); + const actual = await accumulate(mocked_denops, (helper) => { + return Promise.all([ + helper.call("strlen", "foo") as Promise, + helper.call("matchstr", "bar", "a") as Promise, + helper.call("eval", "v:true") as Promise, + ]); + }); + assertType>(true); + assertEquals(actual, [42, "a", true]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["matchstr", "bar", "a"], + ["eval", "v:true"], + ], + ]); + }); + await t.step("Array", async () => { + using denops_batch = stubBatch(42, 123, 39); + const actual = await accumulate(mocked_denops, (helper) => { + const items = ["foo", "bar", "baz"]; + return Promise.all(items.map( + (item) => helper.call("strlen", item) as Promise, + )); + }); + assertType>(true); + assertEquals(actual, [42, 123, 39]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "bar"], + ["strlen", "baz"], + ], + ]); + }); + await t.step("Set", async () => { + using denops_batch = stubBatch(42, 123, 39); + const actual = await accumulate(mocked_denops, async (helper) => { + return new Set( + await Promise.all([ + helper.call("strlen", "foo") as Promise, + helper.call("matchstr", "bar", "a") as Promise, + helper.call("strlen", "baz") as Promise, + ]), + ); + }); + assertType>>(true); + assertEquals(actual, new Set([42, 123, 39])); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["matchstr", "bar", "a"], + ["strlen", "baz"], + ], + ]); + }); + await t.step("Map values", async () => { + using denops_batch = stubBatch(42, 123, 39); + const actual = await accumulate(mocked_denops, async (helper) => { + const items = ["foo", "bar", "baz"]; + return new Map( + await Promise.all( + items.map(async (item) => + [item, await helper.call("strlen", item) as number] as const + ), + ), + ); + }); + assertType>>(true); + assertEquals( + actual, + new Map([ + ["foo", 42], + ["bar", 123], + ["baz", 39], + ]), + ); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "bar"], + ["strlen", "baz"], + ], + ]); + }); + await t.step("chained Promise", async () => { + using denops_batch = stubBatch(42, 39, 123, 456); + const actual = await accumulate(mocked_denops, async (helper) => { + const [a, b] = await Promise.all([ + helper.call("strlen", "foo"), + helper.call("strlen", "bar"), + ]); + return await Promise.all([ + helper.call("stridx", "bar", "a", a) as Promise, + helper.call("stridx", "baz", "b", b) as Promise, + ]); + }); + assertType>(true); + assertEquals(actual, [123, 456]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "bar"], + ], + [ + ["stridx", "bar", "a", 42], + ["stridx", "baz", "b", 39], + ], + ]); + }); + await t.step("delayed Promise", async () => { + const values = [42, 123, 39]; + using denops_batch = stub( + mocked_denops, + "batch", + async (...calls) => { + if (calls.length > values.length) { + return Promise.reject(new Error("Too few values")); + } + const results = values.splice(0, calls.length); + await delay(50); + return results; + }, + ); + const actual = await accumulate(mocked_denops, async (helper) => { + return await Promise.all( + [ + helper.call("strlen", "foo") as Promise, + (async () => { + const b = helper.call("strlen", "bar") as Promise; + await delay(100); + const c = helper.call("strlen", "baz") as Promise; + return Promise.all([b, c]); + })(), + ] as const, + ); + }); + assertType< + IsExact + >(true); + assertEquals(actual, [42, [123, 39]]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "bar"], + ], + [["strlen", "baz"]], + ]); + }); + await t.step("0 delayed Promise", async () => { + const values = [42, 123, 39]; + using denops_batch = stub( + mocked_denops, + "batch", + async (...calls) => { + if (calls.length > values.length) { + return Promise.reject(new Error("Too few values")); + } + const results = values.splice(0, calls.length); + await delay(50); + return results; + }, + ); + const actual = await accumulate(mocked_denops, async (helper) => { + return await Promise.all( + [ + helper.call("strlen", "foo") as Promise, + (async () => { + const b = helper.call("strlen", "bar") as Promise; + await delay(0); + const c = helper.call("strlen", "baz") as Promise; + return Promise.all([b, c]); + })(), + ] as const, + ); + }); + assertType< + IsExact + >(true); + assertEquals(actual, [42, [123, 39]]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "bar"], + ], + [["strlen", "baz"]], + ]); + }); + await t.step("nested 'accumulate()'", async () => { + using denops_batch = stubBatch(1, 2, 3, 4); + const actual = await accumulate(mocked_denops, (helper) => { + return Promise.all([ + helper.call("strlen", "foo") as Promise, + accumulate(helper, (innerHelper) => { + return Promise.all([ + innerHelper.call("stridx", "bar", "a") as Promise, + innerHelper.call("stridx", "baz", "z") as Promise, + ]); + }), + helper.call("strlen", "quux") as Promise, + ]); + }); + assertType< + IsExact + >(true); + assertEquals(actual, [1, [3, 4], 2]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "quux"], + ["stridx", "bar", "a"], + ["stridx", "baz", "z"], + ], + ]); + }); + await t.step("nested 'batch()'", async () => { + using denops_batch = stubBatch(1, 2, 3, 4); + const actual = await accumulate(mocked_denops, (helper) => { + return Promise.all([ + helper.call("strlen", "foo") as Promise, + batch(helper, async (batchHelper) => { + await batchHelper.call("stridx", "bar", "a"); + await batchHelper.call("stridx", "baz", "z"); + }), + helper.call("strlen", "quux") as Promise, + ]); + }); + assertType>(true); + assertEquals(actual, [1, undefined, 2]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "quux"], + ["stridx", "bar", "a"], + ["stridx", "baz", "z"], + ], + ]); + }); + await t.step("nested 'collect()'", async () => { + using denops_batch = stubBatch(1, 2, 3, 4); + const actual = await accumulate(mocked_denops, (helper) => { + return Promise.all([ + helper.call("strlen", "foo") as Promise, + collect(helper, (collectHelper) => [ + collectHelper.call("stridx", "bar", "a") as Promise, + collectHelper.call("stridx", "baz", "z") as Promise, + ]), + helper.call("strlen", "quux") as Promise, + ]); + }); + assertType< + IsExact + >(true); + assertEquals(actual, [1, [3, 4], 2]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "quux"], + ["stridx", "bar", "a"], + ["stridx", "baz", "z"], + ], + ]); + }); +}); + +test({ + mode: "all", + name: "accumulate()", + fn: async (denops, t) => { + await denops.call("execute", [ + "let g:test_fn_call_args = []", + "function! TestFn(...) abort", + " call add(g:test_fn_call_args, a:000->copy())", + "endfunction", + ]); + + await t.step("when the executor is not callable", async (t) => { + await t.step("rejects an error", async () => { + await assertRejects( + // deno-lint-ignore no-explicit-any + () => accumulate(denops, null as any), + TypeError, + ); + }); + }); + await t.step("when the executor resolves", async (t) => { + using denops_batch = spy(denops, "batch"); + let helperPromise: Promise; + await accumulate(denops, (helper) => { + helperPromise = helper.call("strlen", "foo"); + preventUnhandledRejection(helperPromise); + }); + + await t.step("rejects pending batch 'calls' immediately", async () => { + await flushPromises(); + assertEquals(await peekPromiseState(helperPromise), "rejected"); + await assertRejects( + () => helperPromise, + TypeError, + "not available outside", + ); + }); + await t.step("does not call underlying 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 0); + }); + }); + await t.step("when the executor throws", async (t) => { + using denops_batch = spy(denops, "batch"); + const error = new Error("test error"); + let helperPromise: Promise; + const accumulatePromise = accumulate(denops, (helper) => { + helperPromise = helper.call("strlen", "foo"); + preventUnhandledRejection(helperPromise); + throw error; + }); + preventUnhandledRejection(accumulatePromise); + await flushPromises(); + + await t.step("the helper method rejects immediately", async () => { + assertEquals(await peekPromiseState(helperPromise), "rejected"); + await assertRejects( + () => helperPromise, + TypeError, + "not available outside", + ); + }); + await t.step("rejects the actual error immediately", async () => { + assertEquals(await peekPromiseState(accumulatePromise), "rejected"); + const actual = await assertRejects(() => accumulatePromise); + assertStrictEquals(actual, error); + }); + await t.step("does not call underlying 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 0); + }); + }); + await t.step("when the executor rejects", async (t) => { + using denops_batch = spy(denops, "batch"); + const error = new Error("test error"); + let helperPromise: Promise; + const accumulatePromise = accumulate(denops, (helper) => { + helperPromise = helper.call("strlen", "foo"); + preventUnhandledRejection(helperPromise); + return Promise.reject(error); + }); + preventUnhandledRejection(accumulatePromise); + await flushPromises(); + + await t.step("the helper method rejects immediately", async () => { + assertEquals(await peekPromiseState(helperPromise), "rejected"); + await assertRejects( + () => helperPromise, + TypeError, + "not available outside", + ); + }); + await t.step("rejects the actual error immediately", async () => { + assertEquals(await peekPromiseState(accumulatePromise), "rejected"); + const actual = await assertRejects(() => accumulatePromise); + assertStrictEquals(actual, error); + }); + await t.step("does not call underlying 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 0); + }); + }); + await t.step( + "when the executor rejects while underlying 'denops.batch()' executing", + async (t) => { + using stack = new DisposableStack(); + const underlyingDenopsWaiter = stack.adopt( + Promise.withResolvers(), + (t) => t.reject(), + ); + const denops_batch = stack.use(stub( + denops, + "batch", + () => underlyingDenopsWaiter.promise, + )); + const error = new Error("test error"); + let helperPromise: Promise; + const accumulatePromise = accumulate(denops, async (helper) => { + helperPromise = helper.call("strlen", "foo"); + preventUnhandledRejection(helperPromise); + await delay(0); // Ensure underlying batch is executing + return Promise.reject(error); + }); + await flushPromises(); + + await t.step("the helper method rejects immediately", async () => { + assertEquals(await peekPromiseState(helperPromise), "rejected"); + await assertRejects( + () => helperPromise, + TypeError, + "not available outside", + ); + }); + await t.step("rejects the actual error immediately", async () => { + assertEquals(await peekPromiseState(accumulatePromise), "rejected"); + const actual = await assertRejects(() => accumulatePromise); + assertStrictEquals(actual, error); + }); + await t.step("calls underlying 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 1); + assertSpyCallArgs(denops_batch, 0, [["strlen", "foo"]]); + }); + await t.step("the underlying rejection should handled", async () => { + underlyingDenopsWaiter.reject( + new Error("This error should be ignored"), + ); + await flushPromises(); + }); + }, + ); + await t.step("AccumulateHelper", async (t) => { + await t.step(".redraw()", async (t) => { + await t.step("call underlying 'denops.redraw()'", async () => { + using denops_redraw = stub( + denops, + "redraw", + (): Promise => denops_redraw.original.call(denops), + ); + await accumulate(denops, async (helper) => { + await helper.redraw(); + }); + assertSpyCalls(denops_redraw, 1); + }); + await t.step("when underlying 'denops.redraw()' rejects", async (t) => { + await t.step("rejects an error", async (t) => { + using _denops_redraw = stub( + denops, + "redraw", + (): Promise => Promise.reject(new Error("Network error")), + ); + let error: Error; + await accumulate(denops, async (helper) => { + error = await assertRejects( + () => helper.redraw(), + Error, + "Network error", + ); + }); + await t.step("with the stack trace contains the call site", () => { + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.redraw", + ); + }); + }); + }); + }); + await t.step(".call()", async (t) => { + await t.step("calls Vim function", async () => { + await denops.call("execute", [ + "let g:test_fn_call_args = []", + ]); + await accumulate(denops, async (helper) => { + await helper.call("TestFn", "foo", 1, true); + await Promise.all([ + helper.call("TestFn", "a"), + helper.call("TestFn", "b"), + helper.call("TestFn", "c"), + ]); + }); + const actual = await denops.eval("g:test_fn_call_args"); + assertEquals(actual, [ + ["foo", 1, true], + ["a"], + ["b"], + ["c"], + ]); + }); + await t.step("resolves a result of Vim function", async () => { + let actual: unknown; + await accumulate(denops, async (helper) => { + actual = await helper.call("range", 2, 4); + }); + assertEquals(actual, [2, 3, 4]); + }); + await t.step("rejects an error which Vim throws", async (t) => { + let error: Error; + await accumulate(denops, async (helper) => { + error = await assertRejects( + () => helper.call("notexistsfn"), + Error, + "Unknown function: notexistsfn", + ); + }); + await t.step("and the stack trace contains the call site", () => { + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.call", + ); + }); + }); + await t.step( + "when an error occurs during parallel execution", + async (t) => { + let actual: PromiseSettledResult[]; + await accumulate(denops, async (helper) => { + actual = await Promise.allSettled([ + helper.call("range", 1, 3), + helper.call("range", 2, 4), + helper.call("notexistsfn"), + helper.call("range", 0, 2), + helper.call("range", 3, 5), + ]); + }); + await t.step("calls before the error resolves", () => { + assertEquals(actual[0].status, "fulfilled"); + assertEquals(actual[1].status, "fulfilled"); + assertEquals( + (actual[0] as PromiseFulfilledResult).value, + [1, 2, 3], + ); + assertEquals( + (actual[1] as PromiseFulfilledResult).value, + [2, 3, 4], + ); + }); + await t.step("the invalid call rejects the actual error", () => { + assertEquals(actual[2].status, "rejected"); + const error = (actual[2] as PromiseRejectedResult).reason; + assertInstanceOf(error, Error); + assertStringIncludes( + error.message, + "Unknown function: notexistsfn", + ); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.call", + ); + }); + await t.step( + "calls after the error rejects an AccumulateCancelledError", + () => { + assertEquals(actual[3].status, "rejected"); + assertEquals(actual[4].status, "rejected"); + const error1 = (actual[3] as PromiseRejectedResult).reason; + const error2 = (actual[4] as PromiseRejectedResult).reason; + assertInstanceOf(error1, AccumulateCancelledError); + assertInstanceOf(error2, AccumulateCancelledError); + assertStringIncludes(error1.message, "['range', ...]"); + assertStringIncludes(error2.message, "['range', ...]"); + assertEquals(error1.calls, [["range", 0, 2]]); + assertEquals(error2.calls, [["range", 3, 5]]); + assertStringIncludes( + error1.stack ?? "", + "AccumulateHelper.call", + ); + assertStringIncludes( + error2.stack ?? "", + "AccumulateHelper.call", + ); + }, + ); + }, + ); + await t.step( + "when underlying 'denops.batch()' throws a non-BatchError", + async (t) => { + const underlyingError = new Error("Network error"); + using denops_batch = stub( + denops, + "batch", + resolvesNext([underlyingError]), + ); + let actual: PromiseSettledResult[]; + await accumulate(denops, async (helper) => { + actual = await Promise.allSettled([ + helper.call("range", 0, 2), + helper.call("range", 3, 5), + ]); + }); + assertSpyCalls(denops_batch, 1); + await t.step("the first call rejects the actual error", () => { + assertEquals(actual[0].status, "rejected"); + const error = (actual[0] as PromiseRejectedResult).reason; + assertInstanceOf(error, Error); + assertEquals(error.message, "Network error"); + assertStrictEquals(error.cause, underlyingError); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.call", + ); + }); + await t.step("the second call rejects the actual error", () => { + assertEquals(actual[1].status, "rejected"); + const error = (actual[1] as PromiseRejectedResult).reason; + assertInstanceOf(error, Error); + assertEquals(error.message, "Network error"); + assertStrictEquals(error.cause, underlyingError); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.call", + ); + }); + }, + ); + await t.step( + "when underlying 'denops.batch()' throws a non-Error", + async (t) => { + const underlyingError = 42; + using denops_batch = stub( + denops, + "batch", + () => Promise.reject(underlyingError), + ); + let actual: PromiseSettledResult[]; + await accumulate(denops, async (helper) => { + actual = await Promise.allSettled([ + helper.call("range", 0, 2), + helper.call("range", 3, 5), + ]); + }); + assertSpyCalls(denops_batch, 1); + await t.step("the first call rejects the actual error", () => { + assertEquals(actual[0].status, "rejected"); + const error = (actual[0] as PromiseRejectedResult).reason; + assertInstanceOf(error, Error); + assertEquals(error.message, "42"); + assertStrictEquals(error.cause, underlyingError); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.call", + ); + }); + await t.step("the second call rejects the actual error", () => { + assertEquals(actual[1].status, "rejected"); + const error = (actual[1] as PromiseRejectedResult).reason; + assertInstanceOf(error, Error); + assertEquals(error.message, "42"); + assertStrictEquals(error.cause, underlyingError); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.call", + ); + }); + }, + ); + await t.step( + "when new calls are made while underlying 'denops.batch()' executing", + async (t) => { + using denops_batch = stub( + denops, + "batch", + async (...calls): Promise => { + await delay(10); + return await denops_batch.original.apply(denops, calls); + }, + ); + let preceding: Promise; + let delayed: Promise; + await accumulate(denops, async (helper) => { + preceding = helper.call("range", 0, 2); + delayed = (async () => { + await delay(0); // Ensure underlying batch is executing + return helper.call("range", 1, 3); + })(); + await Promise.allSettled([preceding, delayed]); + }); + assertSpyCalls(denops_batch, 2); + await t.step("the preceding call resolves", async () => { + assertEquals(await preceding, [0, 1, 2]); + }); + await t.step("the delayed call resolves", async () => { + assertEquals(await delayed, [1, 2, 3]); + }); + }, + ); + }); + await t.step(".cmd()", async (t) => { + await t.step("executes Vim command", async () => { + await denops.call("execute", [ + "let g:test_fn_call_args = []", + ]); + await accumulate(denops, async (helper) => { + await helper.cmd("call TestFn('foo', 1, v:true)"); + await Promise.all([ + helper.cmd("call TestFn('a')"), + helper.cmd("call TestFn(value)", { value: "b" }), + helper.cmd("call TestFn(value)", { value: "c" }), + ]); + }); + const actual = await denops.eval("g:test_fn_call_args"); + assertEquals(actual, [ + ["foo", 1, true], + ["a"], + ["b"], + ["c"], + ]); + }); + await t.step("rejects an error which Vim throws", async (t) => { + let error: Error; + await accumulate(denops, async (helper) => { + error = await assertRejects( + () => helper.cmd("call notexistsfn()"), + Error, + "Unknown function: notexistsfn", + ); + }); + await t.step("and the stack trace contains the call site", () => { + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.cmd", + ); + }); + }); + }); + await t.step(".eval()", async (t) => { + await t.step("evaluates Vim expression", async () => { + await denops.call("execute", [ + "let g:test_fn_call_args = []", + ]); + await accumulate(denops, async (helper) => { + await helper.eval("TestFn('foo', 1, v:true)"); + await Promise.all([ + helper.eval("TestFn('a')"), + helper.eval("TestFn(value)", { value: "b" }), + helper.eval("TestFn(value)", { value: "c" }), + ]); + }); + const actual = await denops.eval("g:test_fn_call_args"); + assertEquals(actual, [ + ["foo", 1, true], + ["a"], + ["b"], + ["c"], + ]); + }); + await t.step("resolves a result of Vim expression", async () => { + let actual: unknown; + await accumulate(denops, async (helper) => { + actual = await helper.eval("range(2, 4)"); + }); + assertEquals(actual, [2, 3, 4]); + }); + await t.step("rejects an error which Vim throws", async (t) => { + let error: Error; + await accumulate(denops, async (helper) => { + error = await assertRejects( + () => helper.eval("notexistsfn()"), + Error, + "Unknown function: notexistsfn", + ); + }); + await t.step("and the stack trace contains the call site", () => { + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.eval", + ); + }); + }); + }); + await t.step(".batch()", async (t) => { + await t.step("calls Vim functions", async () => { + await denops.call("execute", [ + "let g:test_fn_call_args = []", + ]); + await accumulate(denops, async (helper) => { + await helper.batch( + ["TestFn", "foo", 1, true], + ["TestFn", "a"], + ["TestFn", "b"], + ["TestFn", "c"], + ); + }); + const actual = await denops.eval("g:test_fn_call_args"); + assertEquals(actual, [ + ["foo", 1, true], + ["a"], + ["b"], + ["c"], + ]); + }); + await t.step("resolves results of Vim functions", async () => { + let actual: unknown; + await accumulate(denops, async (helper) => { + actual = await helper.batch( + ["range", 0, 2], + ["range", 2, 4], + ["matchstr", "hello", "el*"], + ); + }); + assertEquals(actual, [ + [0, 1, 2], + [2, 3, 4], + "ell", + ]); + }); + await t.step("resolves an empty array if no arguments", async () => { + using denops_batch = spy(denops, "batch"); + let actual: unknown; + await accumulate(denops, async (helper) => { + actual = await helper.batch(); + }); + assertEquals(actual, []); + assertSpyCalls(denops_batch, 0); + }); + await t.step("rejects a BatchError which Vim throws", async (t) => { + let error: BatchError; + await accumulate(denops, async (helper) => { + error = await assertRejects( + () => + helper.batch( + ["range", 3], + ["range", 2, 4], + ["notexistsfn"], + ["range", 3], + ), + BatchError, + "Unknown function: notexistsfn", + ); + assertEquals(error.results, [[0, 1, 2], [2, 3, 4]]); + }); + await t.step("and the stack trace contains the call site", () => { + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.batch", + ); + }); + }); + await t.step( + "when an error occurs during parallel execution", + async (t) => { + let actual: PromiseSettledResult[]; + await accumulate(denops, async (helper) => { + actual = await Promise.allSettled([ + helper.batch( + ["range", 1, 3], + ["range", 2, 4], + ), + helper.batch( + ["range", 1, 3], + ["notexistsfn"], + ["range", 2, 4], + ), + helper.batch( + ["range", 0, 2], + ["range", 3, 5], + ), + ]); + }); + await t.step("calls before the error resolves", () => { + assertEquals(actual[0].status, "fulfilled"); + assertEquals( + (actual[0] as PromiseFulfilledResult).value, + [[1, 2, 3], [2, 3, 4]], + ); + }); + await t.step("the invalid call rejects a BatchError", () => { + assertEquals(actual[1].status, "rejected"); + const error = (actual[1] as PromiseRejectedResult).reason; + assertInstanceOf(error, BatchError); + assertStringIncludes( + error.message, + "Unknown function: notexistsfn", + ); + assertEquals(error.results, [[1, 2, 3]]); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.batch", + ); + }); + await t.step( + "calls after the error rejects an AccumulateCancelledError", + () => { + assertEquals(actual[2].status, "rejected"); + const error = (actual[2] as PromiseRejectedResult).reason; + assertInstanceOf(error, AccumulateCancelledError); + assertStringIncludes( + error.message, + "[['range', ...], ... total 2 calls]", + ); + assertEquals(error.calls, [["range", 0, 2], ["range", 3, 5]]); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.batch", + ); + }, + ); + }, + ); + await t.step( + "when underlying 'denops.batch()' throws a non-BatchError", + async (t) => { + const underlyingError = new Error("Network error"); + using denops_batch = stub( + denops, + "batch", + resolvesNext([underlyingError]), + ); + let actual: Promise; + await accumulate(denops, async (helper) => { + actual = helper.batch( + ["range", 1, 3], + ["range", 2, 4], + ); + await Promise.allSettled([actual]); + }); + assertSpyCalls(denops_batch, 1); + await t.step("rejects the actual error", async () => { + const error = await assertRejects( + () => actual, + Error, + "Network error", + ); + assertStrictEquals(error.cause, underlyingError); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.batch", + ); + }); + }, + ); + }); + await t.step(".dispatch()", async (t) => { + await t.step("calls 'denops.dispatch()'", async () => { + using denops_dispatch = stub( + denops, + "dispatch", + resolvesNext(["one", "two", "three"]), + ); + await accumulate(denops, async (helper) => { + await helper.dispatch("pluginA", "foo", "bar", 42, false); + await Promise.all([ + helper.dispatch("pluginA", "baz", 1), + helper.dispatch("pluginB", "qux", 2), + ]); + }); + assertEquals(denops_dispatch.calls.map((c) => c.args), [ + ["pluginA", "foo", "bar", 42, false], + ["pluginA", "baz", 1], + ["pluginB", "qux", 2], + ]); + }); + await t.step("resolves a result of 'denops.dispatch()'", async () => { + using _denops_dispatch = stub( + denops, + "dispatch", + resolvesNext(["one"]), + ); + let actual: unknown; + await accumulate(denops, async (helper) => { + actual = await helper.dispatch("pluginA", "foo", "bar"); + }); + assertEquals(actual, "one"); + }); + await t.step( + "rejects an error which the 'denops.dispatch()' rejects", + async (t) => { + using _denops_dispatch = stub( + denops, + "dispatch", + () => { + throw new Error("test plugin error"); + }, + ); + let error: Error; + await accumulate(denops, async (helper) => { + error = await assertRejects( + () => helper.dispatch("pluginA", "foo", "bar"), + Error, + "test plugin error", + ); + }); + await t.step("and the stack trace contains the call site", () => { + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.dispatch", + ); + }); + }, + ); + }); + await t.step(".name", async (t) => { + await t.step("getter returns 'denops.name'", async () => { + let actual: unknown; + await accumulate(denops, (helper) => { + actual = helper.name; + }); + assertStrictEquals(actual, denops.name); + }); + }); + await t.step(".meta", async (t) => { + await t.step("getter returns 'denops.meta'", async () => { + let actual: unknown; + await accumulate(denops, (helper) => { + actual = helper.meta; + }); + assertStrictEquals(actual, denops.meta); + }); + }); + await t.step(".interrupted", async (t) => { + await t.step("getter returns 'denops.interrupted'", async () => { + let actual: unknown; + await accumulate(denops, (helper) => { + actual = helper.interrupted; + }); + assertStrictEquals(actual, denops.interrupted); + }); + }); + await t.step(".context", async (t) => { + await t.step("getter returns 'denops.context'", async () => { + let actual: unknown; + await accumulate(denops, (helper) => { + actual = helper.context; + }); + assertStrictEquals(actual, denops.context); + }); + }); + await t.step(".dispatcher", async (t) => { + const MY_DISPATCHER = { + foo: () => {}, + }; + + await t.step("setter sets to 'denops.dispatcher'", async () => { + using stack = new DisposableStack(); + stack.adopt(denops.dispatcher, (saved) => { + denops.dispatcher = saved; + }); + await accumulate(denops, (helper) => { + helper.dispatcher = MY_DISPATCHER; + }); + assertStrictEquals(denops.dispatcher, MY_DISPATCHER); + }); + await t.step("getter returns 'denops.dispatcher'", async () => { + using stack = new DisposableStack(); + stack.adopt(denops.dispatcher, (saved) => { + denops.dispatcher = saved; + }); + denops.dispatcher = MY_DISPATCHER; + let actual: unknown; + await accumulate(denops, (helper) => { + actual = helper.dispatcher; + }); + assertStrictEquals(actual, MY_DISPATCHER); + }); + }); + await t.step("when outside of the 'accumulate()' block", async (t) => { + await t.step(".call()", async (t) => { + using denops_batch = spy(denops, "batch"); + let helper_outside: Denops; + await accumulate(denops, (helper) => { + helper_outside = helper; + }); + + await t.step("rejects an error", async () => { + await assertRejects( + () => helper_outside.call("range", 0), + TypeError, + "not available outside", + ); + }); + await t.step("does not call 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 0); + }); + }); + await t.step(".cmd()", async (t) => { + using denops_batch = spy(denops, "batch"); + let helper_outside: Denops; + await accumulate(denops, (helper) => { + helper_outside = helper; + }); + + await t.step("rejects an error", async () => { + await assertRejects( + () => helper_outside.cmd("echo 'hello'"), + TypeError, + "not available outside", + ); + }); + await t.step("does not call 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 0); + }); + }); + await t.step(".eval()", async (t) => { + using denops_batch = spy(denops, "batch"); + let helper_outside: Denops; + await accumulate(denops, (helper) => { + helper_outside = helper; + }); + + await t.step("rejects an error", async () => { + await assertRejects( + () => helper_outside.eval("123"), + TypeError, + "not available outside", + ); + }); + await t.step("does not call 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 0); + }); + }); + await t.step(".batch()", async (t) => { + using denops_batch = spy(denops, "batch"); + let helper_outside: Denops; + await accumulate(denops, (helper) => { + helper_outside = helper; + }); + + await t.step("rejects an error", async () => { + await assertRejects( + () => helper_outside.batch(["range", 0]), + TypeError, + "not available outside", + ); + }); + await t.step("does not call 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 0); + }); + }); + }); + }); + }, +}); diff --git a/batch/error.ts b/batch/error.ts new file mode 100644 index 0000000..530ee98 --- /dev/null +++ b/batch/error.ts @@ -0,0 +1,43 @@ +import type { Call } from "@denops/core"; + +/** + * Options for creating an {@linkcode AccumulateCancelledError}. + */ +export interface AccumulateCancelledErrorOptions extends ErrorOptions { + /** + * Information about the cancelled Vim/Neovim function calls. + */ + calls?: readonly Call[]; +} + +/** + * Error thrown when a Vim/Neovim function call is cancelled due to another + * error in a parallel execution within the same batch. + * + * This error occurs when multiple Vim/Neovim function calls are executed in + * parallel using `Promise.all()` or similar constructs within an `accumulate()` + * block, and one of the calls fails, causing the remaining calls in the same + * batch to be cancelled. + */ +export class AccumulateCancelledError extends Error { + static { + this.prototype.name = "AccumulateCancelledError"; + } + + /** + * Information about the cancelled Vim/Neovim function calls. + */ + readonly calls?: readonly Call[]; + + /** + * Creates a new {@linkcode AccumulateCancelledError}. + * + * @param message - The error message describing why the call was cancelled. + * @param options - Additional options for the error. + */ + constructor(message?: string, options: AccumulateCancelledErrorOptions = {}) { + const { calls, ...errorOptions } = options; + super(message, errorOptions); + this.calls = calls; + } +} diff --git a/batch/mod.ts b/batch/mod.ts index 67d092f..3b63149 100644 --- a/batch/mod.ts +++ b/batch/mod.ts @@ -3,7 +3,8 @@ * * ```typescript * import type { Entrypoint } from "jsr:@denops/std"; - * import { batch, collect } from "jsr:@denops/std/batch"; + * import * as fn from "jsr:@denops/std/function"; + * import { accumulate, batch, collect } from "jsr:@denops/std/batch"; * * export const main: Entrypoint = async (denops) => { * // Call multiple denops functions sequentially in a single RPC call @@ -20,10 +21,29 @@ * denops.eval("&filetype"), * ]); * // results contains the value of modifiable, modified, and filetype + * + * // Automatically batch multiple denops calls while writing regular async code + * // In this example, only 3 RPC calls are made: + * // 1. fn.getline, 2. batched fn.matchstr calls, 3. batched fn.len calls + * const results2 = await accumulate(denops, async (denops) => { + * const lines = await fn.getline(denops, 1, "$"); + * return await Promise.all(lines.map(async (line, index) => { + * const keyword = await fn.matchstr(denops, line, "\\k\\+"); + * const len = await fn.len(denops, keyword); + * return { + * lnum: index + 1, + * keyword, + * len, + * }; + * })); + * }); + * // results2 contains an array of objects with lnum, keyword, and len * } * ``` * * @module */ +export * from "./accumulate.ts"; export * from "./batch.ts"; export * from "./collect.ts"; +export * from "./error.ts"; diff --git a/deno.jsonc b/deno.jsonc index 266a132..efda116 100644 --- a/deno.jsonc +++ b/deno.jsonc @@ -44,6 +44,7 @@ "@lambdalisue/unreachable": "jsr:@lambdalisue/unreachable@^1.0.1", "@nick/dispose": "jsr:@nick/dispose@^1.1.0", "@std/assert": "jsr:@std/assert@^1.0.14", + "@std/async": "jsr:@std/async@^1.0.15", "@std/collections": "jsr:@std/collections@^1.1.3", "@std/fs": "jsr:@std/fs@^1.0.19", "@std/path": "jsr:@std/path@^1.1.2",