Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions packages/test/src/failing-payload-converter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { defaultPayloadConverter, Payload } from '@temporalio/common';
import { PayloadConverter } from '@temporalio/common/lib/converter/payload-converter';

export const payloadConverter: PayloadConverter = {
toPayload<T>(value: T): Payload {
return defaultPayloadConverter.toPayload(value);
},
fromPayload<T>(_payload: Payload): T {
throw new Error('Intentional payload converter failure for testing');
},
};
113 changes: 113 additions & 0 deletions packages/test/src/test-nexus-codec-converter-errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import { randomUUID } from 'crypto';
import * as nexus from 'nexus-rpc';
import { NexusOperationFailure, Payload } from '@temporalio/common';
import { Client, WorkflowFailedError } from '@temporalio/client';
import type { PayloadCodec } from '@temporalio/common/lib/converter/payload-codec';
import * as workflow from '@temporalio/workflow';
import { helpers, makeTestFunction } from './helpers-integration';

const test = makeTestFunction({
workflowsPath: __filename,
workflowInterceptorModules: [__filename],
});

const testService = nexus.service('codec-converter-test', {
echoOp: nexus.operation<string, string>(),
});

export async function nexusEchoCaller(endpoint: string): Promise<string> {
const client = workflow.createNexusClient({
endpoint,
service: testService,
});
const handle = await client.startOperation('echoOp', 'hello');
return await handle.result();
}

////////////////////////////////////////////////////////////////////////////////////////////////////

test('Nexus operation codec failure is retried', async (t) => {
const { createWorker, registerNexusEndpoint, taskQueue } = helpers(t);
const { endpointName } = await registerNexusEndpoint();

let decodeCount = 0;
const failingCodec: PayloadCodec = {
async encode(payloads: Payload[]): Promise<Payload[]> {
return payloads;
},
async decode(payloads: Payload[]): Promise<Payload[]> {
decodeCount++;
if (decodeCount === 1) {
throw new Error('Intentional codec decode failure');
Comment on lines +40 to +41

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Fail codec on Nexus decode path only

This injection throws on the first decode call globally, but the same codec is also configured on the worker (dataConverter.payloadCodecs) where it is used for workflow activation decoding (WorkflowCodecRunner.decodeActivation) before the Nexus handler runs. That means the initial failure can be consumed outside the Nexus operation path, and decodeCount >= 2 can still pass even if Nexus input deserialization is never retried, so this test can miss regressions in the behavior this commit is trying to validate.

Useful? React with 👍 / 👎.

}
return payloads;
},
};

const worker = await createWorker({
dataConverter: { payloadCodecs: [failingCodec] },
nexusServices: [
nexus.serviceHandler(testService, {
async echoOp(_ctx, input) {
return input;
},
}),
],
});

const customClient = new Client({
connection: t.context.env.connection,
dataConverter: { payloadCodecs: [failingCodec] },
});

await worker.runUntil(async () => {
const result = await customClient.workflow.execute(nexusEchoCaller, {
taskQueue,
workflowId: randomUUID(),
args: [endpointName],
});
t.is(result, 'hello');
});

t.true(decodeCount >= 2, `Expected decode count >= 2, got ${decodeCount}`);
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codec retry test may hit wrong decode

Low Severity

failingCodec is installed on both the Worker and the Client, so the first decode failure can occur while decoding workflow inputs/other payloads (before any Nexus operation input is decoded). This can make the test pass without exercising the intended Nexus operation codec failure path, and can also make it flaky depending on decode ordering.

Fix in Cursor Fix in Web


////////////////////////////////////////////////////////////////////////////////////////////////////

test('Nexus operation converter failure is not retried', async (t) => {
const { createWorker, registerNexusEndpoint, taskQueue } = helpers(t);
const { endpointName } = await registerNexusEndpoint();

const worker = await createWorker({
dataConverter: { payloadConverterPath: require.resolve('./failing-payload-converter') },
nexusServices: [
nexus.serviceHandler(testService, {
async echoOp(_ctx, input) {
return input;
},
}),
],
});

await worker.runUntil(async () => {
const err = await t.throwsAsync(
() =>
t.context.env.client.workflow.execute(nexusEchoCaller, {
taskQueue,
workflowId: randomUUID(),
args: [endpointName],
}),
{
instanceOf: WorkflowFailedError,
}
);
t.true(err instanceof WorkflowFailedError);
t.true(err!.cause instanceof NexusOperationFailure);
const nexusFailure = err!.cause as NexusOperationFailure;
t.true(nexusFailure.cause instanceof nexus.HandlerError);
const handlerError = nexusFailure.cause as nexus.HandlerError;
t.is(handlerError.type, 'BAD_REQUEST');
t.false(handlerError.retryable);
t.regex(handlerError.message, /Intentional payload converter failure for testing/);
});
});
7 changes: 1 addition & 6 deletions packages/test/src/test-nexus-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,12 +407,7 @@ test('start Operation Handler errors', async (t) => {
t.is(res.status, 400);
const { message } = (await res.json()) as { message: string };
// Exact error message varies between Node versions and runtimes.
t.regex(
message,
isBun
? /Failed to deserialize input: SyntaxError: JSON Parse error:/
: /Failed to deserialize input: SyntaxError: Unexpected token .* JSON/
);
t.regex(message, isBun ? /JSON Parse error:/ : /Unexpected token .* JSON/);
}
});
});
Expand Down
12 changes: 10 additions & 2 deletions packages/worker/src/nexus/conversions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ export async function decodePayloadIntoLazyValue(
try {
decoded = await decodeOptionalSingle(dataConverter.payloadCodecs, payload);
} catch (err) {
throw new nexus.HandlerError('BAD_REQUEST', `Failed to decode payload: ${err}`);
if (err instanceof ApplicationFailure) {
throw err;
}
throw new nexus.HandlerError('INTERNAL', `Payload codec failed to decode Nexus operation input`, { cause: err });
}

// Nexus headers have string values and Temporal Payloads have binary values. Instead of
Expand All @@ -47,7 +50,12 @@ class PayloadSerializer implements nexus.Serializer {
try {
return this.payloadConverter.fromPayload(this.payload);
} catch (err) {
throw new nexus.HandlerError('BAD_REQUEST', `Failed to deserialize input: ${err}`);
if (err instanceof ApplicationFailure) {
throw err;
}
throw new nexus.HandlerError('BAD_REQUEST', `Payload converter failed to decode Nexus operation input`, {
cause: err,
});
}
}

Expand Down