diff --git a/packages/test/src/failing-payload-converter.ts b/packages/test/src/failing-payload-converter.ts new file mode 100644 index 000000000..864e978bf --- /dev/null +++ b/packages/test/src/failing-payload-converter.ts @@ -0,0 +1,11 @@ +import { defaultPayloadConverter, Payload } from '@temporalio/common'; +import { PayloadConverter } from '@temporalio/common/lib/converter/payload-converter'; + +export const payloadConverter: PayloadConverter = { + toPayload(value: T): Payload { + return defaultPayloadConverter.toPayload(value); + }, + fromPayload(_payload: Payload): T { + throw new Error('Intentional payload converter failure for testing'); + }, +}; diff --git a/packages/test/src/test-nexus-codec-converter-errors.ts b/packages/test/src/test-nexus-codec-converter-errors.ts new file mode 100644 index 000000000..56ea61eaf --- /dev/null +++ b/packages/test/src/test-nexus-codec-converter-errors.ts @@ -0,0 +1,113 @@ +import { randomUUID } from 'crypto'; +import * as nexus from 'nexus-rpc'; +import { NexusOperationFailure, Payload } from '@temporalio/common'; +import { Client, WorkflowFailedError } from '@temporalio/client'; +import type { PayloadCodec } from '@temporalio/common/lib/converter/payload-codec'; +import * as workflow from '@temporalio/workflow'; +import { helpers, makeTestFunction } from './helpers-integration'; + +const test = makeTestFunction({ + workflowsPath: __filename, + workflowInterceptorModules: [__filename], +}); + +const testService = nexus.service('codec-converter-test', { + echoOp: nexus.operation(), +}); + +export async function nexusEchoCaller(endpoint: string): Promise { + const client = workflow.createNexusClient({ + endpoint, + service: testService, + }); + const handle = await client.startOperation('echoOp', 'hello'); + return await handle.result(); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +test('Nexus operation codec failure is retried', async (t) => { + const { createWorker, registerNexusEndpoint, taskQueue } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + + let decodeCount = 0; + const failingCodec: PayloadCodec = { + async encode(payloads: Payload[]): Promise { + return payloads; + }, + async decode(payloads: Payload[]): Promise { + decodeCount++; + if (decodeCount === 1) { + throw new Error('Intentional codec decode failure'); + } + return payloads; + }, + }; + + const worker = await createWorker({ + dataConverter: { payloadCodecs: [failingCodec] }, + nexusServices: [ + nexus.serviceHandler(testService, { + async echoOp(_ctx, input) { + return input; + }, + }), + ], + }); + + const customClient = new Client({ + connection: t.context.env.connection, + dataConverter: { payloadCodecs: [failingCodec] }, + }); + + await worker.runUntil(async () => { + const result = await customClient.workflow.execute(nexusEchoCaller, { + taskQueue, + workflowId: randomUUID(), + args: [endpointName], + }); + t.is(result, 'hello'); + }); + + t.true(decodeCount >= 2, `Expected decode count >= 2, got ${decodeCount}`); +}); + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +test('Nexus operation converter failure is not retried', async (t) => { + const { createWorker, registerNexusEndpoint, taskQueue } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + + const worker = await createWorker({ + dataConverter: { payloadConverterPath: require.resolve('./failing-payload-converter') }, + nexusServices: [ + nexus.serviceHandler(testService, { + async echoOp(_ctx, input) { + return input; + }, + }), + ], + }); + + await worker.runUntil(async () => { + const err = await t.throwsAsync( + () => + t.context.env.client.workflow.execute(nexusEchoCaller, { + taskQueue, + workflowId: randomUUID(), + args: [endpointName], + }), + { + instanceOf: WorkflowFailedError, + } + ); + t.true(err instanceof WorkflowFailedError); + t.true(err!.cause instanceof NexusOperationFailure); + const nexusFailure = err!.cause as NexusOperationFailure; + t.true(nexusFailure.cause instanceof nexus.HandlerError); + const handlerError = nexusFailure.cause as nexus.HandlerError; + t.is(handlerError.type, 'BAD_REQUEST'); + t.false(handlerError.retryable); + t.regex(handlerError.message, /Intentional payload converter failure for testing/); + }); +}); diff --git a/packages/test/src/test-nexus-handler.ts b/packages/test/src/test-nexus-handler.ts index 3ce174a5a..99baea9ed 100644 --- a/packages/test/src/test-nexus-handler.ts +++ b/packages/test/src/test-nexus-handler.ts @@ -407,12 +407,7 @@ test('start Operation Handler errors', async (t) => { t.is(res.status, 400); const { message } = (await res.json()) as { message: string }; // Exact error message varies between Node versions and runtimes. - t.regex( - message, - isBun - ? /Failed to deserialize input: SyntaxError: JSON Parse error:/ - : /Failed to deserialize input: SyntaxError: Unexpected token .* JSON/ - ); + t.regex(message, isBun ? /JSON Parse error:/ : /Unexpected token .* JSON/); } }); }); diff --git a/packages/worker/src/nexus/conversions.ts b/packages/worker/src/nexus/conversions.ts index 32ad7a35f..2739da83a 100644 --- a/packages/worker/src/nexus/conversions.ts +++ b/packages/worker/src/nexus/conversions.ts @@ -20,7 +20,10 @@ export async function decodePayloadIntoLazyValue( try { decoded = await decodeOptionalSingle(dataConverter.payloadCodecs, payload); } catch (err) { - throw new nexus.HandlerError('BAD_REQUEST', `Failed to decode payload: ${err}`); + if (err instanceof ApplicationFailure) { + throw err; + } + throw new nexus.HandlerError('INTERNAL', `Payload codec failed to decode Nexus operation input`, { cause: err }); } // Nexus headers have string values and Temporal Payloads have binary values. Instead of @@ -47,7 +50,12 @@ class PayloadSerializer implements nexus.Serializer { try { return this.payloadConverter.fromPayload(this.payload); } catch (err) { - throw new nexus.HandlerError('BAD_REQUEST', `Failed to deserialize input: ${err}`); + if (err instanceof ApplicationFailure) { + throw err; + } + throw new nexus.HandlerError('BAD_REQUEST', `Payload converter failed to decode Nexus operation input`, { + cause: err, + }); } }