diff --git a/packages/client/src/async-completion-client.ts b/packages/client/src/async-completion-client.ts index 36cd894b0..644b25082 100644 --- a/packages/client/src/async-completion-client.ts +++ b/packages/client/src/async-completion-client.ts @@ -1,5 +1,6 @@ import { status as grpcStatus } from '@grpc/grpc-js'; -import { ensureTemporalFailure } from '@temporalio/common'; +import { ensureTemporalFailure, LoadedDataConverter, SerializationContext } from '@temporalio/common'; +import { withSerializationContext } from '@temporalio/common/lib/converter/serialization-context'; import { encodeErrorToFailure, encodeToPayloads } from '@temporalio/common/lib/internal-non-workflow'; import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; @@ -81,6 +82,7 @@ export interface FullActivityId { */ export class AsyncCompletionClient extends BaseClient { public readonly options: LoadedAsyncCompletionClientOptions; + private context?: SerializationContext; constructor(options?: AsyncCompletionClientOptions) { super(options); @@ -91,6 +93,13 @@ export class AsyncCompletionClient extends BaseClient { }; } + /** + * Return the data converter bound to the current serialization context, if any. + */ + private get contextDataConverter(): LoadedDataConverter { + return this.context ? withSerializationContext(this.dataConverter, this.context) : this.dataConverter; + } + /** * Raw gRPC access to the Temporal service. * @@ -101,6 +110,20 @@ export class AsyncCompletionClient extends BaseClient { return this.connection.workflowService; } + /** + * Return a client instance with all serialization operations bound to `context`. + */ + withContext(context: SerializationContext): AsyncCompletionClient { + const client = new AsyncCompletionClient({ + connection: this.connection, + dataConverter: this.dataConverter, + identity: this.options.identity, + namespace: this.options.namespace, + }); + client.context = context; + return client; + } + /** * Transforms grpc errors into well defined TS errors. */ @@ -127,7 +150,8 @@ export class AsyncCompletionClient extends BaseClient { async complete(fullActivityId: FullActivityId, result: unknown): Promise; async complete(taskTokenOrFullActivityId: Uint8Array | FullActivityId, result: unknown): Promise { - const payloads = await encodeToPayloads(this.dataConverter, result); + const dataConverter = this.contextDataConverter; + const payloads = await encodeToPayloads(dataConverter, result); try { if (taskTokenOrFullActivityId instanceof Uint8Array) { await this.workflowService.respondActivityTaskCompleted({ @@ -159,7 +183,8 @@ export class AsyncCompletionClient extends BaseClient { async fail(fullActivityId: FullActivityId, err: unknown): Promise; async fail(taskTokenOrFullActivityId: Uint8Array | FullActivityId, err: unknown): Promise { - const failure = await encodeErrorToFailure(this.dataConverter, ensureTemporalFailure(err)); + const dataConverter = this.contextDataConverter; + const failure = await encodeErrorToFailure(dataConverter, ensureTemporalFailure(err)); try { if (taskTokenOrFullActivityId instanceof Uint8Array) { await this.workflowService.respondActivityTaskFailed({ @@ -191,7 +216,8 @@ export class AsyncCompletionClient extends BaseClient { reportCancellation(fullActivityId: FullActivityId, details?: unknown): Promise; async reportCancellation(taskTokenOrFullActivityId: Uint8Array | FullActivityId, details?: unknown): Promise { - const payloads = await encodeToPayloads(this.dataConverter, details); + const dataConverter = this.contextDataConverter; + const payloads = await encodeToPayloads(dataConverter, details); try { if (taskTokenOrFullActivityId instanceof Uint8Array) { await this.workflowService.respondActivityTaskCanceled({ @@ -223,7 +249,8 @@ export class AsyncCompletionClient extends BaseClient { heartbeat(fullActivityId: FullActivityId, details?: unknown): Promise; async heartbeat(taskTokenOrFullActivityId: Uint8Array | FullActivityId, details?: unknown): Promise { - const payloads = await encodeToPayloads(this.dataConverter, details); + const dataConverter = this.contextDataConverter; + const payloads = await encodeToPayloads(dataConverter, details); let cancelRequested = false; let paused = false; let reset = false; diff --git a/packages/client/src/schedule-client.ts b/packages/client/src/schedule-client.ts index 34ec5c64d..e1381c24b 100644 --- a/packages/client/src/schedule-client.ts +++ b/packages/client/src/schedule-client.ts @@ -234,7 +234,7 @@ export class ScheduleClient extends BaseClient { scheduleId: opts.scheduleId, schedule: { spec: encodeScheduleSpec(opts.spec), - action: await encodeScheduleAction(this.dataConverter, opts.action, headers), + action: await encodeScheduleAction(this.dataConverter, this.options.namespace, opts.action, headers), policies: encodeSchedulePolicies(opts.policies), state: encodeScheduleState(opts.state), }, @@ -298,7 +298,7 @@ export class ScheduleClient extends BaseClient { scheduleId, schedule: { spec: encodeScheduleSpec(opts.spec), - action: await encodeScheduleAction(this.dataConverter, opts.action, header), + action: await encodeScheduleAction(this.dataConverter, this.options.namespace, opts.action, header), policies: encodeSchedulePolicies(opts.policies), state: encodeScheduleState(opts.state), }, @@ -432,7 +432,11 @@ export class ScheduleClient extends BaseClient { return { scheduleId, spec: decodeScheduleSpec(raw.schedule.spec), - action: await decodeScheduleAction(this.client.dataConverter, raw.schedule.action), + action: await decodeScheduleAction( + this.client.dataConverter, + this.client.options.namespace, + raw.schedule.action + ), memo: await decodeMapFromPayloads(this.client.dataConverter, raw.memo?.fields), searchAttributes: decodeSearchAttributes(raw.searchAttributes?.indexedFields), typedSearchAttributes: decodeTypedSearchAttributes(raw.searchAttributes?.indexedFields), diff --git a/packages/client/src/schedule-helpers.ts b/packages/client/src/schedule-helpers.ts index e01b0e27b..a85d01e1b 100644 --- a/packages/client/src/schedule-helpers.ts +++ b/packages/client/src/schedule-helpers.ts @@ -1,5 +1,6 @@ import Long from 'long'; import { + WorkflowSerializationContext, compilePriority, compileRetryPolicy, decodePriority, @@ -7,6 +8,7 @@ import { extractWorkflowType, LoadedDataConverter, } from '@temporalio/common'; +import { withSerializationContext } from '@temporalio/common/lib/converter/serialization-context'; import { encodeUserMetadata, decodeUserMetadata } from '@temporalio/common/lib/internal-non-workflow/codec-helpers'; import { encodeUnifiedSearchAttributes, @@ -247,16 +249,24 @@ export function encodeScheduleSpec(spec: ScheduleSpec): temporal.api.schedule.v1 export async function encodeScheduleAction( dataConverter: LoadedDataConverter, + namespace: string, action: CompiledScheduleAction, headers: Headers ): Promise { + const context: WorkflowSerializationContext = { + type: 'workflow', + namespace, + workflowId: action.workflowId, + workflowType: action.workflowType, + }; + const contextDataConverter = withSerializationContext(dataConverter, context); return { startWorkflow: { workflowId: action.workflowId, workflowType: { name: action.workflowType, }, - input: { payloads: await encodeToPayloads(dataConverter, ...action.args) }, + input: { payloads: await encodeToPayloads(contextDataConverter, ...action.args) }, taskQueue: { kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL, name: action.taskQueue, @@ -265,7 +275,7 @@ export async function encodeScheduleAction( workflowRunTimeout: msOptionalToTs(action.workflowRunTimeout), workflowTaskTimeout: msOptionalToTs(action.workflowTaskTimeout), retryPolicy: action.retry ? compileRetryPolicy(action.retry) : undefined, - memo: action.memo ? { fields: await encodeMapToPayloads(dataConverter, action.memo) } : undefined, + memo: action.memo ? { fields: await encodeMapToPayloads(contextDataConverter, action.memo) } : undefined, searchAttributes: action.searchAttributes || action.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated ? { @@ -273,7 +283,7 @@ export async function encodeScheduleAction( } : undefined, header: { fields: headers }, - userMetadata: await encodeUserMetadata(dataConverter, action.staticSummary, action.staticDetails), + userMetadata: await encodeUserMetadata(contextDataConverter, action.staticSummary, action.staticDetails), priority: action.priority ? compilePriority(action.priority) : undefined, }, }; @@ -321,20 +331,32 @@ export function decodeScheduleSpec(pb: temporal.api.schedule.v1.IScheduleSpec): export async function decodeScheduleAction( dataConverter: LoadedDataConverter, + namespace: string, pb: temporal.api.schedule.v1.IScheduleAction ): Promise { if (pb.startWorkflow) { - const { staticSummary, staticDetails } = await decodeUserMetadata(dataConverter, pb.startWorkflow?.userMetadata); + const workflowId = pb.startWorkflow.workflowId!; + const workflowType = pb.startWorkflow.workflowType?.name ?? undefined; + const contextDataConverter = withSerializationContext(dataConverter, { + type: 'workflow', + namespace, + workflowId, + workflowType, + }); + const { staticSummary, staticDetails } = await decodeUserMetadata( + contextDataConverter, + pb.startWorkflow?.userMetadata + ); return { type: 'startWorkflow', - workflowId: pb.startWorkflow.workflowId!, + workflowId, workflowType: pb.startWorkflow.workflowType!.name!, taskQueue: pb.startWorkflow.taskQueue!.name!, - args: await decodeArrayFromPayloads(dataConverter, pb.startWorkflow.input?.payloads), - memo: await decodeMapFromPayloads(dataConverter, pb.startWorkflow.memo?.fields), + args: await decodeArrayFromPayloads(contextDataConverter, pb.startWorkflow.input?.payloads), + memo: await decodeMapFromPayloads(contextDataConverter, pb.startWorkflow.memo?.fields), retry: decompileRetryPolicy(pb.startWorkflow.retryPolicy), searchAttributes: decodeSearchAttributes(pb.startWorkflow.searchAttributes?.indexedFields), typedSearchAttributes: decodeTypedSearchAttributes(pb.startWorkflow.searchAttributes?.indexedFields), diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index d51f7b230..39603483d 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -24,6 +24,7 @@ import { WorkflowIdConflictPolicy, compilePriority, } from '@temporalio/common'; +import { withSerializationContext } from '@temporalio/common/lib/converter/serialization-context'; import { encodeUserMetadata } from '@temporalio/common/lib/internal-non-workflow/codec-helpers'; import { encodeUnifiedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; @@ -789,6 +790,11 @@ export class WorkflowClient extends BaseClient { runId?: string, opts?: WorkflowResultOptions ): Promise> { + const dataConverter = withSerializationContext(this.dataConverter, { + type: 'workflow', + namespace: this.options.namespace, + workflowId, + }); const followRuns = opts?.followRuns ?? true; const execution: temporal.api.common.v1.IWorkflowExecution = { workflowId, runId }; const req: GetWorkflowExecutionHistoryRequest = { @@ -828,7 +834,7 @@ export class WorkflowClient extends BaseClient { // Note that we can only return one value from our workflow function in JS. // Ignore any other payloads in result const [result] = await decodeArrayFromPayloads( - this.dataConverter, + dataConverter, ev.workflowExecutionCompletedEventAttributes.result?.payloads ); return result as any; @@ -841,16 +847,13 @@ export class WorkflowClient extends BaseClient { const { failure, retryState } = ev.workflowExecutionFailedEventAttributes; throw new WorkflowFailedError( 'Workflow execution failed', - await decodeOptionalFailureToOptionalError(this.dataConverter, failure), + await decodeOptionalFailureToOptionalError(dataConverter, failure), decodeRetryState(retryState) ); } else if (ev.workflowExecutionCanceledEventAttributes) { const failure = new CancelledFailure( 'Workflow canceled', - await decodeArrayFromPayloads( - this.dataConverter, - ev.workflowExecutionCanceledEventAttributes.details?.payloads - ) + await decodeArrayFromPayloads(dataConverter, ev.workflowExecutionCanceledEventAttributes.details?.payloads) ); failure.stack = ''; throw new WorkflowFailedError('Workflow execution cancelled', failure, RetryState.NON_RETRYABLE_FAILURE); @@ -937,13 +940,18 @@ export class WorkflowClient extends BaseClient { * Used as the final function of the query interceptor chain */ protected async _queryWorkflowHandler(input: WorkflowQueryInput): Promise { + const dataConverter = withSerializationContext(this.dataConverter, { + type: 'workflow', + namespace: this.options.namespace, + workflowId: input.workflowExecution.workflowId!, + }); const req: temporal.api.workflowservice.v1.IQueryWorkflowRequest = { queryRejectCondition: input.queryRejectCondition, namespace: this.options.namespace, execution: input.workflowExecution, query: { queryType: input.queryType, - queryArgs: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) }, + queryArgs: { payloads: await encodeToPayloads(dataConverter, ...input.args) }, header: { fields: input.headers }, }, }; @@ -969,13 +977,18 @@ export class WorkflowClient extends BaseClient { throw new TypeError('Invalid response from server'); } // We ignore anything but the first result - return await decodeFromPayloadsAtIndex(this.dataConverter, 0, response.queryResult?.payloads); + return await decodeFromPayloadsAtIndex(dataConverter, 0, response.queryResult?.payloads); } protected async _createUpdateWorkflowRequest( lifecycleStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage, input: WorkflowStartUpdateInput ): Promise { + const dataConverter = withSerializationContext(this.dataConverter, { + type: 'workflow', + namespace: this.options.namespace, + workflowId: input.workflowExecution.workflowId!, + }); const updateId = input.options?.updateId ?? uuid4(); return { namespace: this.options.namespace, @@ -992,7 +1005,7 @@ export class WorkflowClient extends BaseClient { input: { header: { fields: input.headers }, name: input.updateName, - args: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) }, + args: { payloads: await encodeToPayloads(dataConverter, ...input.args) }, }, }, }; @@ -1136,6 +1149,11 @@ export class WorkflowClient extends BaseClient { workflowRunId?: string, outcome?: temporal.api.update.v1.IOutcome ): WorkflowUpdateHandle { + const dataConverter = withSerializationContext(this.dataConverter, { + type: 'workflow', + namespace: this.options.namespace, + workflowId, + }); return { updateId, workflowId, @@ -1146,10 +1164,10 @@ export class WorkflowClient extends BaseClient { if (completedOutcome.failure) { throw new WorkflowUpdateFailedError( 'Workflow Update failed', - await decodeOptionalFailureToOptionalError(this.dataConverter, completedOutcome.failure) + await decodeOptionalFailureToOptionalError(dataConverter, completedOutcome.failure) ); } else { - return await decodeFromPayloadsAtIndex(this.dataConverter, 0, completedOutcome.success?.payloads); + return await decodeFromPayloadsAtIndex(dataConverter, 0, completedOutcome.success?.payloads); } }, }; @@ -1178,8 +1196,7 @@ export class WorkflowClient extends BaseClient { return response.outcome; } } catch (err) { - const wE = typeof workflowExecution.workflowId === 'string' ? workflowExecution : undefined; - this.rethrowUpdateGrpcError(err, 'Workflow Update Poll failed', wE as WorkflowExecution); + this.rethrowUpdateGrpcError(err, 'Workflow Update Poll failed', workflowExecution as WorkflowExecution); } } } @@ -1190,6 +1207,11 @@ export class WorkflowClient extends BaseClient { * Used as the final function of the signal interceptor chain */ protected async _signalWorkflowHandler(input: WorkflowSignalInput): Promise { + const dataConverter = withSerializationContext(this.dataConverter, { + type: 'workflow', + namespace: this.options.namespace, + workflowId: input.workflowExecution.workflowId!, + }); const req: temporal.api.workflowservice.v1.ISignalWorkflowExecutionRequest = { identity: this.options.identity, namespace: this.options.namespace, @@ -1198,7 +1220,7 @@ export class WorkflowClient extends BaseClient { // control is unused, signalName: input.signalName, header: { fields: input.headers }, - input: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) }, + input: { payloads: await encodeToPayloads(dataConverter, ...input.args) }, }; try { await this.workflowService.signalWorkflowExecution(req); @@ -1215,6 +1237,12 @@ export class WorkflowClient extends BaseClient { protected async _signalWithStartWorkflowHandler(input: WorkflowSignalWithStartInput): Promise { const { identity } = this.options; const { options, workflowType, signalName, signalArgs, headers } = input; + const dataConverter = withSerializationContext(this.dataConverter, { + type: 'workflow', + namespace: this.options.namespace, + workflowId: options.workflowId, + workflowType, + }); const req: temporal.api.workflowservice.v1.ISignalWithStartWorkflowExecutionRequest = { namespace: this.options.namespace, identity, @@ -1223,9 +1251,9 @@ export class WorkflowClient extends BaseClient { workflowIdReusePolicy: encodeWorkflowIdReusePolicy(options.workflowIdReusePolicy), workflowIdConflictPolicy: encodeWorkflowIdConflictPolicy(options.workflowIdConflictPolicy), workflowType: { name: workflowType }, - input: { payloads: await encodeToPayloads(this.dataConverter, ...options.args) }, + input: { payloads: await encodeToPayloads(dataConverter, ...options.args) }, signalName, - signalInput: { payloads: await encodeToPayloads(this.dataConverter, ...signalArgs) }, + signalInput: { payloads: await encodeToPayloads(dataConverter, ...signalArgs) }, taskQueue: { kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL, name: options.taskQueue, @@ -1235,7 +1263,7 @@ export class WorkflowClient extends BaseClient { workflowTaskTimeout: options.workflowTaskTimeout, workflowStartDelay: options.startDelay, retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined, - memo: options.memo ? { fields: await encodeMapToPayloads(this.dataConverter, options.memo) } : undefined, + memo: options.memo ? { fields: await encodeMapToPayloads(dataConverter, options.memo) } : undefined, searchAttributes: options.searchAttributes || options.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated ? { @@ -1244,7 +1272,7 @@ export class WorkflowClient extends BaseClient { : undefined, cronSchedule: options.cronSchedule, header: { fields: headers }, - userMetadata: await encodeUserMetadata(this.dataConverter, options.staticSummary, options.staticDetails), + userMetadata: await encodeUserMetadata(dataConverter, options.staticSummary, options.staticDetails), priority: options.priority ? compilePriority(options.priority) : undefined, versioningOverride: options.versioningOverride ?? undefined, }; @@ -1295,6 +1323,12 @@ export class WorkflowClient extends BaseClient { protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise { const { options: opts, workflowType, headers } = input; const { identity, namespace } = this.options; + const dataConverter = withSerializationContext(this.dataConverter, { + type: 'workflow', + namespace: this.options.namespace, + workflowId: opts.workflowId, + workflowType, + }); const internalOptions = (opts as InternalWorkflowStartOptions)[InternalWorkflowStartOptionsSymbol]; const supportsEagerStart = (this.connection as InternalConnectionLike)?.[InternalConnectionLikeSymbol] ?.supportsEagerStart; @@ -1314,7 +1348,7 @@ export class WorkflowClient extends BaseClient { workflowIdReusePolicy: encodeWorkflowIdReusePolicy(opts.workflowIdReusePolicy), workflowIdConflictPolicy: encodeWorkflowIdConflictPolicy(opts.workflowIdConflictPolicy), workflowType: { name: workflowType }, - input: { payloads: await encodeToPayloads(this.dataConverter, ...opts.args) }, + input: { payloads: await encodeToPayloads(dataConverter, ...opts.args) }, taskQueue: { kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL, name: opts.taskQueue, @@ -1324,7 +1358,7 @@ export class WorkflowClient extends BaseClient { workflowTaskTimeout: opts.workflowTaskTimeout, workflowStartDelay: opts.startDelay, retryPolicy: opts.retry ? compileRetryPolicy(opts.retry) : undefined, - memo: opts.memo ? { fields: await encodeMapToPayloads(this.dataConverter, opts.memo) } : undefined, + memo: opts.memo ? { fields: await encodeMapToPayloads(dataConverter, opts.memo) } : undefined, searchAttributes: opts.searchAttributes || opts.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated ? { @@ -1333,7 +1367,7 @@ export class WorkflowClient extends BaseClient { : undefined, cronSchedule: opts.cronSchedule, header: { fields: headers }, - userMetadata: await encodeUserMetadata(this.dataConverter, opts.staticSummary, opts.staticDetails), + userMetadata: await encodeUserMetadata(dataConverter, opts.staticSummary, opts.staticDetails), priority: opts.priority ? compilePriority(opts.priority) : undefined, versioningOverride: opts.versioningOverride ?? undefined, requestEagerExecution: opts.requestEagerStart, @@ -1349,12 +1383,17 @@ export class WorkflowClient extends BaseClient { protected async _terminateWorkflowHandler( input: WorkflowTerminateInput ): Promise { + const dataConverter = withSerializationContext(this.dataConverter, { + type: 'workflow', + namespace: this.options.namespace, + workflowId: input.workflowExecution.workflowId!, + }); const req: temporal.api.workflowservice.v1.ITerminateWorkflowExecutionRequest = { namespace: this.options.namespace, identity: this.options.identity, ...input, details: { - payloads: input.details ? await encodeToPayloads(this.dataConverter, ...input.details) : undefined, + payloads: input.details ? await encodeToPayloads(dataConverter, ...input.details) : undefined, }, firstExecutionRunId: input.firstExecutionRunId, }; @@ -1470,14 +1509,19 @@ export class WorkflowClient extends BaseClient { const raw = await fn({ workflowExecution: { workflowId, runId }, }); - const info = await executionInfoFromRaw(raw.workflowExecutionInfo ?? {}, this.client.dataConverter, raw); + const dataConverter = withSerializationContext(this.client.dataConverter, { + type: 'workflow', + namespace: this.client.options.namespace, + workflowId, + }); + const info = await executionInfoFromRaw(raw.workflowExecutionInfo ?? {}, dataConverter, raw); const userMetadata = raw.executionConfig?.userMetadata; return { ...info, staticDetails: async () => - (await decodeOptionalSinglePayload(this.client.dataConverter, userMetadata?.details)) ?? undefined, + (await decodeOptionalSinglePayload(dataConverter, userMetadata?.details)) ?? undefined, staticSummary: async () => - (await decodeOptionalSinglePayload(this.client.dataConverter, userMetadata?.summary)) ?? undefined, + (await decodeOptionalSinglePayload(dataConverter, userMetadata?.summary)) ?? undefined, raw, }; }, @@ -1593,7 +1637,13 @@ export class WorkflowClient extends BaseClient { // Decoding is done for `memo` fields which tend to be small. // We might decide to change that based on user feedback. for (const raw of response.executions) { - yield await executionInfoFromRaw(raw, this.dataConverter, raw); + const dataConverter = withSerializationContext(this.dataConverter, { + type: 'workflow', + namespace: this.options.namespace, + workflowId: raw.execution!.workflowId!, + workflowType: raw.type?.name ?? undefined, + }); + yield await executionInfoFromRaw(raw, dataConverter, raw); } nextPageToken = response.nextPageToken; if (nextPageToken == null || nextPageToken.length === 0) break; diff --git a/packages/common/src/converter/failure-converter.ts b/packages/common/src/converter/failure-converter.ts index dc82aede5..7a6901a80 100644 --- a/packages/common/src/converter/failure-converter.ts +++ b/packages/common/src/converter/failure-converter.ts @@ -24,6 +24,7 @@ import { makeProtoEnumConverters } from '../internal-workflow'; import { isError } from '../type-helpers'; import { msOptionalToTs } from '../time'; import { arrayFromPayloads, fromPayloadsAtIndex, PayloadConverter, toPayloads } from './payload-converter'; +import type { SerializationContext } from './serialization-context'; // Can't import proto enums into the workflow sandbox, use this helper type and enum converter instead. const NexusHandlerErrorRetryBehavior = { @@ -108,6 +109,11 @@ export interface FailureConverter { * The returned error must be an instance of `TemporalFailure`. */ failureToError(err: ProtoFailure, payloadConverter: PayloadConverter): Error; + + /** + * Optionally return a context-bound converter for a specific serialization operation. + */ + withContext?(context: SerializationContext): FailureConverter; } /** diff --git a/packages/common/src/converter/payload-codec.ts b/packages/common/src/converter/payload-codec.ts index 8562fa468..8fd01b3e3 100644 --- a/packages/common/src/converter/payload-codec.ts +++ b/packages/common/src/converter/payload-codec.ts @@ -1,4 +1,5 @@ import { Payload } from '../interfaces'; +import type { SerializationContext } from './serialization-context'; /** * `PayloadCodec` is an optional step that happens between the wire and the {@link PayloadConverter}: @@ -19,4 +20,9 @@ export interface PayloadCodec { * Decode an array of {@link Payload}s received from the wire. */ decode(payloads: Payload[]): Promise; + + /** + * Optionally return a context-bound codec for a specific serialization operation. + */ + withContext?(context: SerializationContext): PayloadCodec; } diff --git a/packages/common/src/converter/payload-converter.ts b/packages/common/src/converter/payload-converter.ts index ba8b7dce7..41eb98e6d 100644 --- a/packages/common/src/converter/payload-converter.ts +++ b/packages/common/src/converter/payload-converter.ts @@ -1,6 +1,7 @@ import { decode, encode } from '../encoding'; import { PayloadConverterError, ValueError } from '../errors'; import { Payload } from '../interfaces'; +import type { SerializationContext } from './serialization-context'; import { encodingKeys, encodingTypes, METADATA_ENCODING_KEY } from './types'; /** @@ -25,6 +26,11 @@ export interface PayloadConverter { * Converts a {@link Payload} back to a value. */ fromPayload(payload: Payload): T; + + /** + * Optionally return a context-bound converter for a specific serialization operation. + */ + withContext?(context: SerializationContext): PayloadConverter; } /** diff --git a/packages/common/src/converter/serialization-context.ts b/packages/common/src/converter/serialization-context.ts new file mode 100644 index 000000000..15df03c17 --- /dev/null +++ b/packages/common/src/converter/serialization-context.ts @@ -0,0 +1,115 @@ +import type { LoadedDataConverter } from './data-converter'; +import type { FailureConverter } from './failure-converter'; +import type { PayloadCodec } from './payload-codec'; +import type { PayloadConverter } from './payload-converter'; + +/** + * Context for workflow-level serialization operations. + */ +export interface WorkflowSerializationContext { + /** Discriminant for narrowing the {@link SerializationContext} union. */ + type: 'workflow'; + /** Namespace of the workflow. */ + namespace: string; + /** + * ID of the workflow that owns the payload being serialized. + * + * When creating/describing schedules, this may be the workflow ID prefix + * as configured, not the final workflow ID when the workflow is created. + */ + workflowId: string; + /** Workflow type name, when available. */ + workflowType?: string; +} + +/** + * Context for activity-level serialization operations. + */ +export interface ActivitySerializationContext { + /** Discriminant for narrowing the {@link SerializationContext} union. */ + type: 'activity'; + /** Namespace of the activity. */ + namespace: string; + /** Activity ID for this execution when provided by the activity context source. */ + activityId?: string; + /** Parent workflow ID when this activity is associated with a workflow. */ + workflowId?: string; + /** Parent workflow type when this activity is associated with a workflow. */ + workflowType?: string; + /** Whether the activity is a local activity started from a workflow. */ + isLocal: boolean; +} + +/** + * Context passed to data conversion interfaces so they can adjust serialization behavior. + */ +export type SerializationContext = WorkflowSerializationContext | ActivitySerializationContext; + +/** + * Return a payload converter bound to `context` if the converter supports context binding. + */ +export function withPayloadConverterContext( + converter: PayloadConverter, + context: SerializationContext +): PayloadConverter { + return converter.withContext?.(context) ?? converter; +} + +/** + * Return a failure converter bound to `context` if the converter supports context binding. + */ +export function withFailureConverterContext( + converter: FailureConverter, + context: SerializationContext +): FailureConverter { + return converter.withContext?.(context) ?? converter; +} + +/** + * Return a payload codec bound to `context` if the codec supports context binding. + */ +export function withPayloadCodecContext(codec: PayloadCodec, context: SerializationContext): PayloadCodec { + return codec.withContext?.(context) ?? codec; +} + +/** + * Return a loaded data converter where all components are context-bound when supported. + * + * **Internal — worker main thread only.** Do not call from within the workflow sandbox; + * doing so would pull unnecessary code into the workflow bundle, increasing memory footprint. + * Inside the sandbox, use the individual `withPayloadConverterContext` / `withFailureConverterContext` + * helpers instead. + * + * Uses identity (`===`) checks to skip allocation when no converter actually changed. + * A `withContext` implementation that returns a new object with identical behaviour will + * defeat this optimisation but is otherwise harmless. + */ +// ts-prune-ignore-next (imported via lib/converter/serialization-context) +export function withSerializationContext( + converter: LoadedDataConverter, + context: SerializationContext +): LoadedDataConverter { + const payloadConverter = withPayloadConverterContext(converter.payloadConverter, context); + const failureConverter = withFailureConverterContext(converter.failureConverter, context); + let codecsChanged = false; + const maybeBoundCodecs = converter.payloadCodecs.map((codec) => { + const maybeContextCodec = withPayloadCodecContext(codec, context); + if (maybeContextCodec !== codec) { + codecsChanged = true; + } + return maybeContextCodec; + }); + + if ( + payloadConverter === converter.payloadConverter && + failureConverter === converter.failureConverter && + !codecsChanged + ) { + return converter; + } + return { + payloadConverter, + failureConverter, + payloadCodecs: codecsChanged ? maybeBoundCodecs : converter.payloadCodecs, + }; +} diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index bc1de0e45..440644724 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -13,6 +13,11 @@ export * from './converter/data-converter'; export * from './converter/failure-converter'; export * from './converter/payload-codec'; export * from './converter/payload-converter'; +export type { + ActivitySerializationContext, + SerializationContext, + WorkflowSerializationContext, +} from './converter/serialization-context'; export * from './converter/types'; export * from './deprecated-time'; export * from './errors'; diff --git a/packages/test/src/activities/index.ts b/packages/test/src/activities/index.ts index 4a9349ab0..6db18c08a 100644 --- a/packages/test/src/activities/index.ts +++ b/packages/test/src/activities/index.ts @@ -20,6 +20,10 @@ export async function echo(message?: string): Promise { return message; } +export async function echoTracer(value: T): Promise { + return value; +} + export async function httpGet(url: string): Promise { return `hello from ${url}`; } diff --git a/packages/test/src/payload-converters/serialization-context-converter.ts b/packages/test/src/payload-converters/serialization-context-converter.ts new file mode 100644 index 000000000..93ae4416b --- /dev/null +++ b/packages/test/src/payload-converters/serialization-context-converter.ts @@ -0,0 +1,107 @@ +import { + ActivitySerializationContext, + DefaultFailureConverter, + FailureConverter, + Payload, + PayloadConverter, + ProtoFailure, + SerializationContext, + WorkflowSerializationContext, + defaultPayloadConverter, +} from '@temporalio/common'; + +export interface Tracer { + __tracer: string; + trace: string[]; +} + +export function makeTracer(name: string): Tracer { + return { __tracer: name, trace: [] }; +} + +export function isTracer(value: unknown): value is Tracer { + return ( + typeof value === 'object' && + value !== null && + '__tracer' in value && + typeof (value as any).__tracer === 'string' && + Array.isArray((value as any).trace) + ); +} + +export function workflowContextTag(context: WorkflowSerializationContext): string { + return `workflow:${context.namespace}:${context.workflowId}`; +} + +export function activityContextTag(context: ActivitySerializationContext): string { + return `activity:${context.namespace}:${context.workflowId ?? 'none'}:${context.activityId ?? 'none'}:${ + context.isLocal + }`; +} + +function contextTag(context: SerializationContext | undefined): string { + if (context == null) { + return 'none'; + } + if (context.type === 'activity') { + return activityContextTag(context); + } + return workflowContextTag(context); +} + +function appendTraceStep(value: unknown, step: string): unknown { + if (!isTracer(value)) { + return value; + } + return { + __tracer: value.__tracer, + trace: [...value.trace, step], + } satisfies Tracer; +} + +class ContextAwarePayloadConverter implements PayloadConverter { + constructor(private readonly context?: SerializationContext) {} + + toPayload(value: unknown): Payload { + return defaultPayloadConverter.toPayload(appendTraceStep(value, `to:${contextTag(this.context)}`)); + } + + fromPayload(payload: Payload): T { + const decoded = defaultPayloadConverter.fromPayload(payload); + return appendTraceStep(decoded, `from:${contextTag(this.context)}`) as T; + } + + withContext(context: SerializationContext): PayloadConverter { + if (this.context === context) { + return this; + } + return new ContextAwarePayloadConverter(context); + } +} + +const defaultFailureConverter = new DefaultFailureConverter(); + +class ContextAwareFailureConverter implements FailureConverter { + constructor(private readonly context?: SerializationContext) {} + + errorToFailure(err: unknown, payloadConverter: PayloadConverter): ProtoFailure { + const failure = defaultFailureConverter.errorToFailure(err, payloadConverter); + return { ...failure, message: `${failure.message ?? ''}|errorToFailure:${contextTag(this.context)}` }; + } + + failureToError(failure: ProtoFailure, payloadConverter: PayloadConverter): Error { + const err = defaultFailureConverter.failureToError(failure, payloadConverter); + err.message = `${err.message}|failureToError:${contextTag(this.context)}`; + return err; + } + + withContext(context: SerializationContext): FailureConverter { + if (this.context === context) { + return this; + } + return new ContextAwareFailureConverter(context); + } +} + +export const payloadConverter: PayloadConverter = new ContextAwarePayloadConverter(); +export const failureConverter: FailureConverter = new ContextAwareFailureConverter(); diff --git a/packages/test/src/test-serialization-context.ts b/packages/test/src/test-serialization-context.ts new file mode 100644 index 000000000..275396413 --- /dev/null +++ b/packages/test/src/test-serialization-context.ts @@ -0,0 +1,338 @@ +import { randomUUID } from 'node:crypto'; +import anyTest, { TestFn } from 'ava'; +import { filter } from 'rxjs/operators'; +import { firstValueFrom, Observable, Subject } from 'rxjs'; +import { Client } from '@temporalio/client'; +import { Info } from '@temporalio/activity'; +import { DataConverter } from '@temporalio/common'; +import { RUN_INTEGRATION_TESTS, TestWorkflowEnvironment, Worker, bundlerOptions } from './helpers'; +import * as activities from './activities'; +import { createActivities } from './activities/async-completer'; +import { + Tracer, + activityContextTag, + makeTracer, + workflowContextTag, +} from './payload-converters/serialization-context-converter'; +import { + serializationContextAsyncCompletionWorkflow, + serializationContextFinishSignal, + serializationContextQuery, + serializationContextUpdate, + serializationContextWorkflow, +} from './workflows/serialization-context'; + +interface IntegrationContext { + env: TestWorkflowEnvironment; +} + +const integrationTest = anyTest as TestFn; + +if (RUN_INTEGRATION_TESTS) { + integrationTest.before(async (t) => { + t.context = { + env: await TestWorkflowEnvironment.createLocal(), + }; + }); + + integrationTest.after.always(async (t) => { + await (t.context as Partial).env?.teardown?.(); + }); + + integrationTest.serial('consolidated workflow path uses expected serialization contexts', async (t) => { + const converterPath = require.resolve('./payload-converters/serialization-context-converter'); + const dataConverter: DataConverter = { + payloadConverterPath: converterPath, + failureConverterPath: converterPath, + }; + const taskQueue = `serialization-context-${randomUUID()}`; + const client = new Client({ + connection: t.context.env.connection, + dataConverter, + }); + + const worker = await Worker.create({ + connection: t.context.env.nativeConnection, + workflowsPath: require.resolve('./workflows'), + activities, + taskQueue, + dataConverter, + bundlerOptions, + }); + + const namespace = client.options.namespace; + const workflowId = randomUUID(); + const childWorkflowId = `child-${randomUUID()}`; + const signalTargetWorkflowId = `missing-signal-${randomUUID()}`; + const cancelTargetWorkflowId = `missing-cancel-${randomUUID()}`; + const workflowTag = workflowContextTag({ type: 'workflow', namespace, workflowId }); + const childWorkflowTag = workflowContextTag({ type: 'workflow', namespace, workflowId: childWorkflowId }); + const signalWorkflowTag = workflowContextTag({ type: 'workflow', namespace, workflowId: signalTargetWorkflowId }); + const cancelWorkflowTag = workflowContextTag({ type: 'workflow', namespace, workflowId: cancelTargetWorkflowId }); + + const { result, queryResult, updateResult } = await worker.runUntil(async () => { + const handle = await client.workflow.start(serializationContextWorkflow, { + workflowId, + taskQueue, + args: [makeTracer('start-input'), childWorkflowId, signalTargetWorkflowId, cancelTargetWorkflowId], + }); + + const queryResult = await handle.query(serializationContextQuery, makeTracer('query-input')); + const updateResult = await handle.executeUpdate(serializationContextUpdate, { + args: [makeTracer('update-input')], + }); + await handle.signal(serializationContextFinishSignal, makeTracer('finish-signal-input')); + + const result = await handle.result(); + return { result, queryResult, updateResult }; + }); + + // Workflow activities default activityId to seq, and this workflow schedules a single activity first. + const activityTag = activityContextTag({ + type: 'activity', + namespace, + workflowId, + activityId: '1', + isLocal: false, + }); + // Local activities share the activity seq counter, so the local activity (scheduled second) gets activityId '2'. + const localActivityTag = activityContextTag({ + type: 'activity', + namespace, + workflowId, + activityId: '2', + isLocal: true, + }); + + // Start input: client encodes → workflow decodes + t.deepEqual(result.startInput.trace, [`to:${workflowTag}`, `from:${workflowTag}`]); + + // Activity result: workflow encodes args → worker decodes args → worker encodes result → workflow decodes result + t.deepEqual(result.activityResult.trace, [ + `to:${activityTag}`, + `from:${activityTag}`, + `to:${activityTag}`, + `from:${activityTag}`, + ]); + + // Local activity result: same round-trip but with isLocal=true context + t.deepEqual(result.localActivityResult.trace, [ + `to:${localActivityTag}`, + `from:${localActivityTag}`, + `to:${localActivityTag}`, + `from:${localActivityTag}`, + ]); + + // Child workflow result: parent encodes input → child decodes args → child encodes return → parent decodes result + t.deepEqual(result.childResult.trace, [ + `to:${childWorkflowTag}`, + `from:${childWorkflowTag}`, + `to:${childWorkflowTag}`, + `from:${childWorkflowTag}`, + ]); + + // Query input: client encodes → workflow decodes + t.deepEqual(result.seenQueryInput.trace, [`to:${workflowTag}`, `from:${workflowTag}`]); + + // Query result: workflow encodes response → client decodes (round trip through codec runner) + t.deepEqual(queryResult.trace, [ + `to:${workflowTag}`, + `from:${workflowTag}`, + `to:${workflowTag}`, + `from:${workflowTag}`, + ]); + + // Update input: client encodes → workflow decodes + t.deepEqual(result.seenUpdateInput.trace, [`to:${workflowTag}`, `from:${workflowTag}`]); + + // Update result: workflow encodes response → client decodes + t.deepEqual(updateResult.trace, [ + `to:${workflowTag}`, + `from:${workflowTag}`, + `to:${workflowTag}`, + `from:${workflowTag}`, + ]); + + // Signal input: client encodes → workflow decodes + t.deepEqual(result.seenSignalInput.trace, [`to:${workflowTag}`, `from:${workflowTag}`]); + + // External signal failure: error message contains target workflow's context tag + t.true( + result.signalExternalErrorMessage.includes(`failureToError:${signalWorkflowTag}`), + `signalExternal error missing context tag: ${result.signalExternalErrorMessage}` + ); + + // External cancel failure: error message contains target workflow's context tag + t.true( + result.cancelExternalErrorMessage.includes(`failureToError:${cancelWorkflowTag}`), + `cancelExternal error missing context tag: ${result.cancelExternalErrorMessage}` + ); + }); + + integrationTest.serial('schedule action conversions use target workflow serialization context', async (t) => { + const converterPath = require.resolve('./payload-converters/serialization-context-converter'); + const dataConverter: DataConverter = { + payloadConverterPath: converterPath, + failureConverterPath: converterPath, + }; + + const client = new Client({ connection: t.context.env.connection, dataConverter }); + const namespace = client.options.namespace; + const scheduleId = `serialization-context-schedule-${randomUUID()}`; + const actionWorkflowId = `scheduled-${randomUUID()}`; + const workflowTag = workflowContextTag({ type: 'workflow', namespace, workflowId: actionWorkflowId }); + + const handle = await client.schedule.create({ + scheduleId, + spec: { + intervals: [{ every: '1h' }], + }, + action: { + type: 'startWorkflow', + workflowType: async (_arg: Tracer) => undefined, + workflowId: actionWorkflowId, + taskQueue: `serialization-context-schedule-${randomUUID()}`, + args: [makeTracer('schedule-arg')], + memo: { + tracer: makeTracer('schedule-memo'), + }, + }, + }); + + try { + const described = await handle.describe(); + const argTracer = described.action.type === 'startWorkflow' ? (described.action.args?.[0] as Tracer) : undefined; + const memoTracer = + described.action.type === 'startWorkflow' ? (described.action.memo?.tracer as Tracer) ?? undefined : undefined; + + t.truthy(argTracer); + t.truthy(memoTracer); + // Schedule arg/memo: client encodes with target workflow context → client decodes on describe + t.deepEqual(argTracer!.trace, [`to:${workflowTag}`, `from:${workflowTag}`]); + t.deepEqual(memoTracer!.trace, [`to:${workflowTag}`, `from:${workflowTag}`]); + } finally { + await handle.delete(); + } + }); + + integrationTest.serial('async completion uses withContext binding for serialization context', async (t) => { + const converterPath = require.resolve('./payload-converters/serialization-context-converter'); + const dataConverter: DataConverter = { + payloadConverterPath: converterPath, + failureConverterPath: converterPath, + }; + + const infoSubject = new Subject(); + const taskQueue = `serialization-context-async-${randomUUID()}`; + const worker = await Worker.create({ + connection: t.context.env.nativeConnection, + workflowsPath: require.resolve('./workflows'), + activities: createActivities(infoSubject), + taskQueue, + dataConverter, + bundlerOptions, + }); + const runPromise = worker.run(); + runPromise.catch(() => undefined); + + const client = new Client({ + connection: t.context.env.connection, + dataConverter, + }); + const namespace = client.options.namespace; + + async function activityStarted(workflowId: string): Promise { + return await firstValueFrom( + (infoSubject as Observable).pipe(filter((info) => info.workflowExecution.workflowId === workflowId)) + ); + } + + try { + const workflowIdById = `async-by-id-${randomUUID()}`; + const byIdHandle = await client.workflow.start(serializationContextAsyncCompletionWorkflow, { + workflowId: workflowIdById, + taskQueue, + }); + const byIdInfo = await activityStarted(workflowIdById); + await client.activity.complete( + { workflowId: workflowIdById, activityId: byIdInfo.activityId }, + makeTracer('async-by-id') + ); + const byIdResult = (await byIdHandle.result()) as Tracer; + const byIdActivityTag = activityContextTag({ + type: 'activity', + namespace, + workflowId: workflowIdById, + activityId: byIdInfo.activityId, + isLocal: false, + }); + const byIdWorkflowTag = workflowContextTag({ type: 'workflow', namespace, workflowId: workflowIdById }); + // By-ID without withContext(): client encodes with no context → worker decodes with activity context → + // workflow encodes return → client decodes + t.deepEqual(byIdResult.trace, [ + `to:none`, + `from:${byIdActivityTag}`, + `to:${byIdWorkflowTag}`, + `from:${byIdWorkflowTag}`, + ]); + + const workflowIdBoundToken = `async-bound-token-${randomUUID()}`; + const boundHandle = await client.workflow.start(serializationContextAsyncCompletionWorkflow, { + workflowId: workflowIdBoundToken, + taskQueue, + }); + const boundInfo = await activityStarted(workflowIdBoundToken); + await client.activity + .withContext({ type: 'workflow', namespace, workflowId: workflowIdBoundToken }) + .complete(boundInfo.taskToken, makeTracer('async-bound-token')); + const boundResult = (await boundHandle.result()) as Tracer; + const boundActivityTag = activityContextTag({ + type: 'activity', + namespace, + workflowId: workflowIdBoundToken, + activityId: boundInfo.activityId, + isLocal: false, + }); + const boundWorkflowTag = workflowContextTag({ type: 'workflow', namespace, workflowId: workflowIdBoundToken }); + // Bound token with withContext(): client encodes with workflow context → worker decodes with activity context → + // workflow encodes return → client decodes + t.deepEqual(boundResult.trace, [ + `to:${boundWorkflowTag}`, + `from:${boundActivityTag}`, + `to:${boundWorkflowTag}`, + `from:${boundWorkflowTag}`, + ]); + + const workflowIdUnboundToken = `async-unbound-token-${randomUUID()}`; + const unboundHandle = await client.workflow.start(serializationContextAsyncCompletionWorkflow, { + workflowId: workflowIdUnboundToken, + taskQueue, + }); + const unboundInfo = await activityStarted(workflowIdUnboundToken); + await client.activity.complete(unboundInfo.taskToken, makeTracer('async-unbound-token')); + const unboundResult = (await unboundHandle.result()) as Tracer; + const unboundActivityTag = activityContextTag({ + type: 'activity', + namespace, + workflowId: workflowIdUnboundToken, + activityId: unboundInfo.activityId, + isLocal: false, + }); + const unboundWorkflowTag = workflowContextTag({ + type: 'workflow', + namespace, + workflowId: workflowIdUnboundToken, + }); + // Unbound token without withContext(): same as by-ID — no client context + t.deepEqual(unboundResult.trace, [ + `to:none`, + `from:${unboundActivityTag}`, + `to:${unboundWorkflowTag}`, + `from:${unboundWorkflowTag}`, + ]); + } finally { + worker.shutdown(); + await runPromise; + } + }); +} diff --git a/packages/test/src/test-workflow-codec-runner.ts b/packages/test/src/test-workflow-codec-runner.ts new file mode 100644 index 000000000..1dd1bd591 --- /dev/null +++ b/packages/test/src/test-workflow-codec-runner.ts @@ -0,0 +1,75 @@ +import test from 'ava'; +import { coresdk } from '@temporalio/proto'; +import { Payload, PayloadCodec, SerializationContext, defaultPayloadConverter } from '@temporalio/common'; +import { WorkflowCodecRunner } from '@temporalio/worker/lib/workflow-codec-runner'; + +class ContextTrackingCodec implements PayloadCodec { + readonly calls: { op: 'encode' | 'decode'; context: SerializationContext | undefined }[]; + private readonly context?: SerializationContext; + + constructor( + context?: SerializationContext, + calls?: { op: 'encode' | 'decode'; context: SerializationContext | undefined }[] + ) { + this.context = context; + this.calls = calls ?? []; + } + + withContext(context: SerializationContext): PayloadCodec { + return new ContextTrackingCodec(context, this.calls); + } + + async encode(payloads: Payload[]): Promise { + this.calls.push({ op: 'encode', context: this.context }); + return payloads; + } + + async decode(payloads: Payload[]): Promise { + this.calls.push({ op: 'decode', context: this.context }); + return payloads; + } +} + +async function initRunner(codec: ContextTrackingCodec): Promise { + const runner = new WorkflowCodecRunner([codec], 'default', 'test-task-queue'); + await runner.decodeActivation({ + runId: 'test-run-id', + jobs: [{ initializeWorkflow: { workflowId: 'wf-id', workflowType: 'TestWorkflow', arguments: [] } }], + } satisfies coresdk.workflow_activation.IWorkflowActivation); + codec.calls.length = 0; // Clear init calls + return runner; +} + +function getRunState(runner: WorkflowCodecRunner): any { + return (runner as any).serializationContextsByRunId.get('test-run-id'); +} + +function defaultPayload(value: unknown): Payload { + const payload = defaultPayloadConverter.toPayload(value); + if (payload == null) { + throw new TypeError('Failed to encode payload'); + } + return payload; +} + +// Guard against seq map memory leaks: forgetRun must clear all tracked state for a workflow run. +test('forgetRun clears all state for a run', async (t) => { + const codec = new ContextTrackingCodec(); + const runner = await initRunner(codec); + + // Schedule an activity to populate seq state + await runner.encodeCompletion({ + runId: 'test-run-id', + successful: { + commands: [ + { scheduleActivity: { seq: 1, activityType: 'act', taskQueue: 'q', arguments: [defaultPayload('x')] } }, + ], + }, + } satisfies coresdk.workflow_completion.IWorkflowActivationCompletion); + + t.truthy(getRunState(runner)); + + runner.forgetRun('test-run-id'); + + t.is(getRunState(runner), undefined); +}); diff --git a/packages/test/src/workflows/index.ts b/packages/test/src/workflows/index.ts index 85c42ea45..5b3aad903 100644 --- a/packages/test/src/workflows/index.ts +++ b/packages/test/src/workflows/index.ts @@ -64,6 +64,7 @@ export * from './reusable-vm-disposal-bug'; export * from './run-activity-in-different-task-queue'; export * from './scope-cancelled-while-waiting-on-external-workflow-cancellation'; export * from './set-timeout-after-microtasks'; +export * from './serialization-context'; export * from './shared-cancellation-scopes'; export * from './noncancellable-awaited-in-root-scope'; export * from './noncancellable-in-noncancellable'; diff --git a/packages/test/src/workflows/serialization-context.ts b/packages/test/src/workflows/serialization-context.ts new file mode 100644 index 000000000..b3721af05 --- /dev/null +++ b/packages/test/src/workflows/serialization-context.ts @@ -0,0 +1,120 @@ +import { + ActivityCancellationType, + condition, + defineQuery, + defineSignal, + defineUpdate, + executeChild, + getExternalWorkflowHandle, + proxyActivities, + proxyLocalActivities, + setHandler, +} from '@temporalio/workflow'; + +// Intentionally duplicated from the test-side converter fixture because workflow isolate code cannot import Node-side modules. +export interface Tracer { + __tracer: string; + trace: string[]; +} + +export interface SerializationContextWorkflowResult { + startInput: Tracer; + activityResult: Tracer; + localActivityResult: Tracer; + childResult: Tracer; + seenQueryInput: Tracer; + seenUpdateInput: Tracer; + seenSignalInput: Tracer; + signalExternalErrorMessage: string; + cancelExternalErrorMessage: string; +} + +export const serializationContextQuery = defineQuery('serializationContextQuery'); +export const serializationContextUpdate = defineUpdate('serializationContextUpdate'); +export const serializationContextFinishSignal = defineSignal<[Tracer]>('serializationContextFinishSignal'); + +const { echoTracer, completeAsync } = proxyActivities<{ + echoTracer(value: Tracer): Promise; + completeAsync(): Promise; +}>({ + startToCloseTimeout: '1m', + scheduleToCloseTimeout: '30m', + retry: { maximumAttempts: 1 }, + cancellationType: ActivityCancellationType.WAIT_CANCELLATION_COMPLETED, +}); + +const { echoTracer: localEchoTracer } = proxyLocalActivities<{ + echoTracer(value: Tracer): Promise; +}>({ + startToCloseTimeout: '1m', + retry: { maximumAttempts: 1 }, +}); + +export async function serializationContextChildWorkflow(input: Tracer): Promise { + return input; +} + +export async function serializationContextWorkflow( + startInput: Tracer, + childWorkflowId: string, + externalSignalWorkflowId: string, + externalCancelWorkflowId: string +): Promise { + let seenQueryInput: Tracer | undefined; + let seenUpdateInput: Tracer | undefined; + let seenSignalInput: Tracer | undefined; + + setHandler(serializationContextQuery, (input) => { + seenQueryInput = input; + return input; + }); + setHandler(serializationContextUpdate, (input) => { + seenUpdateInput = input; + return input; + }); + setHandler(serializationContextFinishSignal, (input) => { + seenSignalInput = input; + }); + + const activityResult = await echoTracer({ __tracer: 'activity', trace: [] }); + const localActivityResult = await localEchoTracer({ __tracer: 'local-activity', trace: [] }); + const childResult = await executeChild(serializationContextChildWorkflow, { + workflowId: childWorkflowId, + args: [{ __tracer: 'child', trace: [] }], + }); + + let signalExternalErrorMessage = ''; + try { + await getExternalWorkflowHandle(externalSignalWorkflowId).signal('serializationContextMissingSignal', { + __tracer: 'external-signal', + trace: [], + }); + } catch (err) { + signalExternalErrorMessage = (err as Error).message; + } + + let cancelExternalErrorMessage = ''; + try { + await getExternalWorkflowHandle(externalCancelWorkflowId).cancel(); + } catch (err) { + cancelExternalErrorMessage = (err as Error).message; + } + + await condition(() => seenSignalInput !== undefined); + + return { + startInput, + activityResult, + localActivityResult, + childResult, + seenQueryInput: seenQueryInput!, + seenUpdateInput: seenUpdateInput!, + seenSignalInput: seenSignalInput!, + signalExternalErrorMessage, + cancelExternalErrorMessage, + }; +} + +export async function serializationContextAsyncCompletionWorkflow(): Promise { + return await completeAsync(); +} diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 9edc50344..5717a60bc 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -36,7 +36,9 @@ import { CancelledFailure, MetricMeter, ActivityCancellationDetails, + ActivitySerializationContext, } from '@temporalio/common'; +import { withSerializationContext } from '@temporalio/common/lib/converter/serialization-context'; import { decodeArrayFromPayloads, Decoded, @@ -824,7 +826,11 @@ export class Worker { protected _connection?: NativeConnection, protected readonly isReplayWorker: boolean = false ) { - this.workflowCodecRunner = new WorkflowCodecRunner(options.loadedDataConverter.payloadCodecs); + this.workflowCodecRunner = new WorkflowCodecRunner( + options.loadedDataConverter.payloadCodecs, + options.namespace, + options.taskQueue + ); } /** @@ -1066,6 +1072,7 @@ export class Worker { switch (variant) { case 'start': { let info: ActivityInfo | undefined = undefined; + let contextDataConverter: LoadedDataConverter = this.options.loadedDataConverter; try { if (activity !== undefined) { throw new IllegalStateError( @@ -1078,6 +1085,15 @@ export class Worker { this.options.namespace, this.options.taskQueue ); + const context: ActivitySerializationContext = { + type: 'activity', + namespace: info.workflowNamespace, + activityId: info.activityId, + workflowId: info.workflowExecution.workflowId, + workflowType: info.workflowType, + isLocal: info.isLocal, + }; + contextDataConverter = withSerializationContext(this.options.loadedDataConverter, context); const { activityType } = info; // Use the corresponding activity if it exists, otherwise, fallback to default activity function (if exists) @@ -1093,7 +1109,7 @@ export class Worker { } let args: unknown[]; try { - args = await decodeArrayFromPayloads(this.options.loadedDataConverter, task.start?.input); + args = await decodeArrayFromPayloads(contextDataConverter, task.start?.input); } catch (err) { throw ApplicationFailure.fromError(err, { message: `Failed to parse activity args for activity ${activityType}: ${errorMessage(err)}`, @@ -1111,7 +1127,7 @@ export class Worker { activity = new Activity( info, fn, - this.options.loadedDataConverter, + contextDataConverter, (details) => this.activityHeartbeatSubject.next({ type: 'heartbeat', @@ -1147,7 +1163,7 @@ export class Worker { type: 'result', result: { failed: { - failure: await encodeErrorToFailure(this.options.loadedDataConverter, error), + failure: await encodeErrorToFailure(contextDataConverter, error), }, }, }; @@ -1381,6 +1397,7 @@ export class Worker { tap(({ close }) => { this.numInFlightActivationsSubject.next(this.numInFlightActivationsSubject.value - 1); if (close) { + this.workflowCodecRunner.forgetRun(activations$.key); activations$.close(); this.numCachedWorkflowsSubject.next(this.numCachedWorkflowsSubject.value - 1); } @@ -1757,7 +1774,16 @@ export class Worker { let payload: Payload; try { try { - payload = await encodeToPayload(this.options.loadedDataConverter, details); + const context: ActivitySerializationContext = { + type: 'activity', + namespace: info.workflowNamespace, + activityId: info.activityId, + workflowId: info.workflowExecution.workflowId, + workflowType: info.workflowType, + isLocal: info.isLocal, + }; + const activityDataConverter = withSerializationContext(this.options.loadedDataConverter, context); + payload = await encodeToPayload(activityDataConverter, details); } catch (error: any) { this.logger.warn('Failed to encode heartbeat details, cancelling Activity', { error, @@ -2202,10 +2228,19 @@ async function extractActivityInfo( // NOTE: We trust core to supply all of these fields instead of checking for null and undefined everywhere const { taskToken } = task as NonNullableObject; const start = task.start as NonNullableObject; + const context: ActivitySerializationContext = { + type: 'activity', + namespace: start.workflowNamespace || activityNamespace, + activityId: start.activityId || undefined, + workflowId: start.workflowExecution?.workflowId ?? undefined, + workflowType: start.workflowType ?? undefined, + isLocal: start.isLocal, + }; + const contextDataConverter = withSerializationContext(dataConverter, context); const activityId = start.activityId; let heartbeatDetails = undefined; try { - heartbeatDetails = await decodeFromPayloadsAtIndex(dataConverter, 0, start.heartbeatDetails); + heartbeatDetails = await decodeFromPayloadsAtIndex(contextDataConverter, 0, start.heartbeatDetails); } catch (e) { throw ApplicationFailure.fromError(e, { message: `Failed to parse heartbeat details for activity ${activityId}: ${errorMessage(e)}`, diff --git a/packages/worker/src/workflow-codec-runner.ts b/packages/worker/src/workflow-codec-runner.ts index ffc0ead8e..fcf9d477d 100644 --- a/packages/worker/src/workflow-codec-runner.ts +++ b/packages/worker/src/workflow-codec-runner.ts @@ -1,4 +1,10 @@ -import { PayloadCodec } from '@temporalio/common'; +import { + ActivitySerializationContext, + PayloadCodec, + SerializationContext, + WorkflowSerializationContext, +} from '@temporalio/common'; +import { withPayloadCodecContext } from '@temporalio/common/lib/converter/serialization-context'; import { Decoded, decodeOptional, @@ -15,11 +21,138 @@ import { } from '@temporalio/common/lib/internal-non-workflow'; import { coresdk } from '@temporalio/proto'; +interface WorkflowCodecRunState { + workflowContext: WorkflowSerializationContext; + workflowType: string; + activityContexts: Map; + childWorkflowContexts: Map; + // signalExternalWorkflowExecution and requestCancelExternalWorkflowExecution use independent seq counters. + signalExternalWorkflowContexts: Map; + cancelExternalWorkflowContexts: Map; +} + /** * Helper class for decoding Workflow activations and encoding Workflow completions. */ export class WorkflowCodecRunner { - constructor(protected readonly codecs: PayloadCodec[]) {} + protected readonly serializationContextsByRunId = new Map(); + + constructor( + protected readonly codecs: PayloadCodec[], + protected readonly namespace: string, + protected readonly taskQueue: string + ) {} + + /** + * Forget all codec context state for a workflow run. + */ + public forgetRun(runId: string): void { + this.serializationContextsByRunId.delete(runId); + } + + protected codecsForContext(context: SerializationContext | undefined): PayloadCodec[] { + if (context == null || this.codecs.length === 0) { + return this.codecs; + } + let codecsChanged = false; + const maybeBoundCodecs = this.codecs.map((codec) => { + const maybeContextCodec = withPayloadCodecContext(codec, context); + if (maybeContextCodec !== codec) { + codecsChanged = true; + } + return maybeContextCodec; + }); + return codecsChanged ? maybeBoundCodecs : this.codecs; + } + + protected createRunState( + initializeWorkflow: coresdk.workflow_activation.IInitializeWorkflow + ): WorkflowCodecRunState | undefined { + if (typeof initializeWorkflow.workflowId !== 'string') { + return undefined; + } + return { + workflowContext: { + type: 'workflow', + namespace: this.namespace, + workflowId: initializeWorkflow.workflowId, + workflowType: initializeWorkflow.workflowType ?? undefined, + }, + workflowType: initializeWorkflow.workflowType ?? '', + activityContexts: new Map(), + childWorkflowContexts: new Map(), + signalExternalWorkflowContexts: new Map(), + cancelExternalWorkflowContexts: new Map(), + }; + } + + protected getOrCreateRunState( + runId: string, + initializeWorkflow: coresdk.workflow_activation.IInitializeWorkflow | undefined + ): WorkflowCodecRunState | undefined { + let state = this.serializationContextsByRunId.get(runId); + if (state || initializeWorkflow == null) { + return state; + } + state = this.createRunState(initializeWorkflow); + if (state != null) { + this.serializationContextsByRunId.set(runId, state); + } + return state; + } + + protected activityContextForCommand( + runState: WorkflowCodecRunState | undefined, + activityId: string | null | undefined, + isLocal: boolean + ): ActivitySerializationContext | undefined { + return runState + ? { + type: 'activity', + namespace: runState.workflowContext.namespace, + activityId: activityId ?? undefined, + workflowId: runState.workflowContext.workflowId, + workflowType: runState.workflowType, + isLocal, + } + : undefined; + } + + protected workflowContextForTarget( + runState: WorkflowCodecRunState | undefined, + workflowId: string | null | undefined + ): WorkflowSerializationContext | undefined { + return runState ? { type: 'workflow', namespace: this.namespace, workflowId: workflowId ?? '' } : undefined; + } + + protected setContextIfPresent( + map: Map | undefined, + seq: number | null | undefined, + context: T | undefined + ): void { + if (map != null && seq != null && context != null) { + map.set(seq, context); + } + } + + protected deleteBySeqIfPresent(map: Map | undefined, seq: number | null | undefined): void { + if (map != null && seq != null) { + map.delete(seq); + } + } + + protected deleteChildStartContextIfTerminal( + runState: WorkflowCodecRunState | undefined, + job: coresdk.workflow_activation.IWorkflowActivationJob, + seq: number | null | undefined + ): void { + if (seq != null) { + const start = job.resolveChildWorkflowExecutionStart; + if (start?.failed || start?.cancelled) { + runState?.childWorkflowContexts.delete(seq); + } + } + } /** * Run codec.decode on the Payloads in the Activation message. @@ -27,182 +160,247 @@ export class WorkflowCodecRunner { public async decodeActivation( activation: T ): Promise> { + const runId = activation.runId; + const initializeWorkflow = activation.jobs?.find((job) => job.initializeWorkflow)?.initializeWorkflow ?? undefined; + // A run state may be absent on sticky cache misses/restarts. Fall back to context-free decoding in that case. + const runState = runId ? this.getOrCreateRunState(runId, initializeWorkflow) : undefined; + const workflowContext = runState?.workflowContext; + return coresdk.workflow_activation.WorkflowActivation.fromObject(< Decoded >{ ...activation, jobs: activation.jobs ? await Promise.all( - activation.jobs.map(async (job) => ({ - ...job, - initializeWorkflow: job.initializeWorkflow - ? { - ...job.initializeWorkflow, - arguments: await decodeOptional(this.codecs, job.initializeWorkflow.arguments), - headers: noopDecodeMap(job.initializeWorkflow.headers), - continuedFailure: await decodeOptionalFailure(this.codecs, job.initializeWorkflow.continuedFailure), - memo: { - ...job.initializeWorkflow.memo, - fields: await decodeOptionalMap(this.codecs, job.initializeWorkflow.memo?.fields), - }, - lastCompletionResult: { - ...job.initializeWorkflow.lastCompletionResult, - payloads: await decodeOptional( - this.codecs, - job.initializeWorkflow.lastCompletionResult?.payloads - ), - }, - searchAttributes: job.initializeWorkflow.searchAttributes - ? { - ...job.initializeWorkflow.searchAttributes, - indexedFields: job.initializeWorkflow.searchAttributes.indexedFields - ? noopDecodeMap(job.initializeWorkflow.searchAttributes?.indexedFields) - : undefined, - } - : undefined, - } - : null, - queryWorkflow: job.queryWorkflow - ? { - ...job.queryWorkflow, - arguments: await decodeOptional(this.codecs, job.queryWorkflow.arguments), - headers: noopDecodeMap(job.queryWorkflow.headers), - } - : null, - doUpdate: job.doUpdate - ? { - ...job.doUpdate, - input: await decodeOptional(this.codecs, job.doUpdate.input), - headers: noopDecodeMap(job.doUpdate.headers), - } - : null, - signalWorkflow: job.signalWorkflow - ? { - ...job.signalWorkflow, - input: await decodeOptional(this.codecs, job.signalWorkflow.input), - headers: noopDecodeMap(job.signalWorkflow.headers), - } - : null, - resolveActivity: job.resolveActivity - ? { - ...job.resolveActivity, - result: job.resolveActivity.result - ? { - ...job.resolveActivity.result, - completed: job.resolveActivity.result.completed - ? { - ...job.resolveActivity.result.completed, - result: await decodeOptionalSingle( - this.codecs, - job.resolveActivity.result.completed.result - ), - } - : null, - failed: job.resolveActivity.result.failed - ? { - ...job.resolveActivity.result.failed, - failure: await decodeOptionalFailure( - this.codecs, - job.resolveActivity.result.failed.failure - ), - } - : null, - cancelled: job.resolveActivity.result.cancelled - ? { - ...job.resolveActivity.result.cancelled, - failure: await decodeOptionalFailure( - this.codecs, - job.resolveActivity.result.cancelled.failure - ), - } - : null, - } - : null, - } - : null, - resolveChildWorkflowExecution: job.resolveChildWorkflowExecution - ? { - ...job.resolveChildWorkflowExecution, - result: job.resolveChildWorkflowExecution.result - ? { - ...job.resolveChildWorkflowExecution.result, - completed: job.resolveChildWorkflowExecution.result.completed - ? { - ...job.resolveChildWorkflowExecution.result.completed, - result: await decodeOptionalSingle( - this.codecs, - job.resolveChildWorkflowExecution.result.completed.result - ), - } - : null, - failed: job.resolveChildWorkflowExecution.result.failed - ? { - ...job.resolveChildWorkflowExecution.result.failed, - failure: await decodeOptionalFailure( - this.codecs, - job.resolveChildWorkflowExecution.result.failed.failure - ), - } - : null, - cancelled: job.resolveChildWorkflowExecution.result.cancelled - ? { - ...job.resolveChildWorkflowExecution.result.cancelled, - failure: await decodeOptionalFailure( - this.codecs, - job.resolveChildWorkflowExecution.result.cancelled.failure - ), - } - : null, - } - : null, - } - : null, - resolveChildWorkflowExecutionStart: job.resolveChildWorkflowExecutionStart + activation.jobs.map(async (job) => { + const resolveActivitySeq = job.resolveActivity?.seq; + const resolveActivityContext = + resolveActivitySeq != null ? runState?.activityContexts.get(resolveActivitySeq) : undefined; + + const resolveChildSeq = job.resolveChildWorkflowExecution?.seq; + const resolveChildContext = + resolveChildSeq != null ? runState?.childWorkflowContexts.get(resolveChildSeq) : undefined; + + const resolveChildStartSeq = job.resolveChildWorkflowExecutionStart?.seq; + const resolveChildStartContext = + resolveChildStartSeq != null ? runState?.childWorkflowContexts.get(resolveChildStartSeq) : undefined; + + const resolveSignalExternalSeq = job.resolveSignalExternalWorkflow?.seq; + const resolveSignalExternalContext = + resolveSignalExternalSeq != null + ? runState?.signalExternalWorkflowContexts.get(resolveSignalExternalSeq) + : undefined; + + const resolveCancelExternalSeq = job.resolveRequestCancelExternalWorkflow?.seq; + const resolveCancelExternalContext = + resolveCancelExternalSeq != null + ? runState?.cancelExternalWorkflowContexts.get(resolveCancelExternalSeq) + : undefined; + + const initializeContext = job.initializeWorkflow?.workflowId ? { - ...job.resolveChildWorkflowExecutionStart, - cancelled: job.resolveChildWorkflowExecutionStart.cancelled - ? { - ...job.resolveChildWorkflowExecutionStart.cancelled, - failure: await decodeOptionalFailure( - this.codecs, - job.resolveChildWorkflowExecutionStart.cancelled.failure - ), - } - : null, + type: 'workflow' as const, + namespace: this.namespace, + workflowId: job.initializeWorkflow.workflowId, + workflowType: job.initializeWorkflow.workflowType ?? undefined, } - : null, - resolveNexusOperation: job.resolveNexusOperation - ? { - ...job.resolveNexusOperation, - result: { - completed: job.resolveNexusOperation.result?.completed - ? await decodeOptionalSingle(this.codecs, job.resolveNexusOperation.result?.completed) - : null, - failed: job.resolveNexusOperation.result?.failed - ? await decodeOptionalFailure(this.codecs, job.resolveNexusOperation.result?.failed) + : workflowContext; + const initializeCodecs = this.codecsForContext(initializeContext); + const workflowCodecs = this.codecsForContext(workflowContext); + const resolveActivityCodecs = this.codecsForContext(resolveActivityContext); + const resolveChildCodecs = this.codecsForContext(resolveChildContext); + const resolveChildStartCodecs = this.codecsForContext(resolveChildStartContext); + const resolveSignalExternalCodecs = this.codecsForContext(resolveSignalExternalContext); + const resolveCancelExternalCodecs = this.codecsForContext(resolveCancelExternalContext); + + const decodedJob = { + ...job, + initializeWorkflow: job.initializeWorkflow + ? { + ...job.initializeWorkflow, + arguments: await decodeOptional(initializeCodecs, job.initializeWorkflow.arguments), + headers: noopDecodeMap(job.initializeWorkflow.headers), + continuedFailure: await decodeOptionalFailure( + initializeCodecs, + job.initializeWorkflow.continuedFailure + ), + memo: { + ...job.initializeWorkflow.memo, + fields: await decodeOptionalMap(initializeCodecs, job.initializeWorkflow.memo?.fields), + }, + lastCompletionResult: { + ...job.initializeWorkflow.lastCompletionResult, + payloads: await decodeOptional( + initializeCodecs, + job.initializeWorkflow.lastCompletionResult?.payloads + ), + }, + searchAttributes: job.initializeWorkflow.searchAttributes + ? { + ...job.initializeWorkflow.searchAttributes, + indexedFields: job.initializeWorkflow.searchAttributes.indexedFields + ? noopDecodeMap(job.initializeWorkflow.searchAttributes?.indexedFields) + : undefined, + } + : undefined, + } + : null, + queryWorkflow: job.queryWorkflow + ? { + ...job.queryWorkflow, + arguments: await decodeOptional(workflowCodecs, job.queryWorkflow.arguments), + headers: noopDecodeMap(job.queryWorkflow.headers), + } + : null, + doUpdate: job.doUpdate + ? { + ...job.doUpdate, + input: await decodeOptional(workflowCodecs, job.doUpdate.input), + headers: noopDecodeMap(job.doUpdate.headers), + } + : null, + signalWorkflow: job.signalWorkflow + ? { + ...job.signalWorkflow, + input: await decodeOptional(workflowCodecs, job.signalWorkflow.input), + headers: noopDecodeMap(job.signalWorkflow.headers), + } + : null, + resolveActivity: job.resolveActivity + ? { + ...job.resolveActivity, + result: job.resolveActivity.result + ? { + ...job.resolveActivity.result, + completed: job.resolveActivity.result.completed + ? { + ...job.resolveActivity.result.completed, + result: await decodeOptionalSingle( + resolveActivityCodecs, + job.resolveActivity.result.completed.result + ), + } + : null, + failed: job.resolveActivity.result.failed + ? { + ...job.resolveActivity.result.failed, + failure: await decodeOptionalFailure( + resolveActivityCodecs, + job.resolveActivity.result.failed.failure + ), + } + : null, + cancelled: job.resolveActivity.result.cancelled + ? { + ...job.resolveActivity.result.cancelled, + failure: await decodeOptionalFailure( + resolveActivityCodecs, + job.resolveActivity.result.cancelled.failure + ), + } + : null, + } : null, - cancelled: job.resolveNexusOperation.result?.cancelled - ? await decodeOptionalFailure(this.codecs, job.resolveNexusOperation.result?.cancelled) + } + : null, + resolveChildWorkflowExecution: job.resolveChildWorkflowExecution + ? { + ...job.resolveChildWorkflowExecution, + result: job.resolveChildWorkflowExecution.result + ? { + ...job.resolveChildWorkflowExecution.result, + completed: job.resolveChildWorkflowExecution.result.completed + ? { + ...job.resolveChildWorkflowExecution.result.completed, + result: await decodeOptionalSingle( + resolveChildCodecs, + job.resolveChildWorkflowExecution.result.completed.result + ), + } + : null, + failed: job.resolveChildWorkflowExecution.result.failed + ? { + ...job.resolveChildWorkflowExecution.result.failed, + failure: await decodeOptionalFailure( + resolveChildCodecs, + job.resolveChildWorkflowExecution.result.failed.failure + ), + } + : null, + cancelled: job.resolveChildWorkflowExecution.result.cancelled + ? { + ...job.resolveChildWorkflowExecution.result.cancelled, + failure: await decodeOptionalFailure( + resolveChildCodecs, + job.resolveChildWorkflowExecution.result.cancelled.failure + ), + } + : null, + } : null, - timedOut: job.resolveNexusOperation.result?.cancelled - ? await decodeOptionalFailure(this.codecs, job.resolveNexusOperation.result?.timedOut) + } + : null, + resolveChildWorkflowExecutionStart: job.resolveChildWorkflowExecutionStart + ? { + ...job.resolveChildWorkflowExecutionStart, + cancelled: job.resolveChildWorkflowExecutionStart.cancelled + ? { + ...job.resolveChildWorkflowExecutionStart.cancelled, + failure: await decodeOptionalFailure( + resolveChildStartCodecs, + job.resolveChildWorkflowExecutionStart.cancelled.failure + ), + } : null, - }, - } - : null, - resolveSignalExternalWorkflow: job.resolveSignalExternalWorkflow - ? { - ...job.resolveSignalExternalWorkflow, - failure: await decodeOptionalFailure(this.codecs, job.resolveSignalExternalWorkflow.failure), - } - : null, - resolveRequestCancelExternalWorkflow: job.resolveRequestCancelExternalWorkflow - ? { - ...job.resolveRequestCancelExternalWorkflow, - failure: await decodeOptionalFailure(this.codecs, job.resolveRequestCancelExternalWorkflow.failure), - } - : null, - })) + } + : null, + resolveNexusOperation: job.resolveNexusOperation + ? { + ...job.resolveNexusOperation, + result: { + completed: job.resolveNexusOperation.result?.completed + ? await decodeOptionalSingle(workflowCodecs, job.resolveNexusOperation.result?.completed) + : null, + failed: job.resolveNexusOperation.result?.failed + ? await decodeOptionalFailure(workflowCodecs, job.resolveNexusOperation.result?.failed) + : null, + cancelled: job.resolveNexusOperation.result?.cancelled + ? await decodeOptionalFailure(workflowCodecs, job.resolveNexusOperation.result?.cancelled) + : null, + timedOut: job.resolveNexusOperation.result?.cancelled + ? await decodeOptionalFailure(workflowCodecs, job.resolveNexusOperation.result?.timedOut) + : null, + }, + } + : null, + resolveSignalExternalWorkflow: job.resolveSignalExternalWorkflow + ? { + ...job.resolveSignalExternalWorkflow, + failure: await decodeOptionalFailure( + resolveSignalExternalCodecs, + job.resolveSignalExternalWorkflow.failure + ), + } + : null, + resolveRequestCancelExternalWorkflow: job.resolveRequestCancelExternalWorkflow + ? { + ...job.resolveRequestCancelExternalWorkflow, + failure: await decodeOptionalFailure( + resolveCancelExternalCodecs, + job.resolveRequestCancelExternalWorkflow.failure + ), + } + : null, + }; + + this.deleteBySeqIfPresent(runState?.activityContexts, resolveActivitySeq); + this.deleteBySeqIfPresent(runState?.childWorkflowContexts, resolveChildSeq); + this.deleteChildStartContextIfTerminal(runState, job, resolveChildStartSeq); + this.deleteBySeqIfPresent(runState?.signalExternalWorkflowContexts, resolveSignalExternalSeq); + this.deleteBySeqIfPresent(runState?.cancelExternalWorkflowContexts, resolveCancelExternalSeq); + + return decodedJob; + }) ) : null, }) as Decoded; @@ -214,12 +412,19 @@ export class WorkflowCodecRunner { public async encodeCompletion( completion: coresdk.workflow_completion.IWorkflowActivationCompletion ): Promise { + const runId = completion.runId; + // A run state may be absent if no InitializeWorkflow has been seen for this worker process. + // Preserve compatibility by encoding without context in that case. + const runState = runId ? this.serializationContextsByRunId.get(runId) : undefined; + const workflowContext = runState?.workflowContext; + const workflowCodecs = this.codecsForContext(workflowContext); + const encodedCompletion: Encoded = { ...completion, failed: completion.failed ? { ...completion.failed, - failure: await encodeOptionalFailure(this.codecs, completion?.failed?.failure), + failure: await encodeOptionalFailure(workflowCodecs, completion?.failed?.failure), } : null, successful: completion.successful @@ -227,123 +432,193 @@ export class WorkflowCodecRunner { ...completion.successful, commands: completion.successful.commands ? await Promise.all( - completion.successful.commands.map( - async (command) => - >{ - ...command, - scheduleActivity: command.scheduleActivity - ? { - ...command.scheduleActivity, - arguments: await encodeOptional(this.codecs, command.scheduleActivity?.arguments), - // don't encode headers - headers: noopEncodeMap(command.scheduleActivity?.headers), - } - : undefined, - upsertWorkflowSearchAttributes: command.upsertWorkflowSearchAttributes - ? { - ...command.upsertWorkflowSearchAttributes, - searchAttributes: noopEncodeMap(command.upsertWorkflowSearchAttributes.searchAttributes), - } - : undefined, - respondToQuery: command.respondToQuery - ? { - ...command.respondToQuery, - succeeded: { - ...command.respondToQuery.succeeded, - response: await encodeOptionalSingle( - this.codecs, - command.respondToQuery.succeeded?.response - ), - }, - failed: await encodeOptionalFailure(this.codecs, command.respondToQuery.failed), - } - : undefined, - updateResponse: command.updateResponse - ? { - ...command.updateResponse, - rejected: await encodeOptionalFailure(this.codecs, command.updateResponse.rejected), - completed: await encodeOptionalSingle(this.codecs, command.updateResponse.completed), - } - : undefined, - completeWorkflowExecution: command.completeWorkflowExecution - ? { - ...command.completeWorkflowExecution, - result: await encodeOptionalSingle(this.codecs, command.completeWorkflowExecution.result), - } - : undefined, - failWorkflowExecution: command.failWorkflowExecution - ? { - ...command.failWorkflowExecution, - failure: await encodeOptionalFailure(this.codecs, command.failWorkflowExecution.failure), - } - : undefined, - continueAsNewWorkflowExecution: command.continueAsNewWorkflowExecution - ? { - ...command.continueAsNewWorkflowExecution, - arguments: await encodeOptional( - this.codecs, - command.continueAsNewWorkflowExecution.arguments + completion.successful.commands.map(async (command) => { + const scheduleActivitySeq = command.scheduleActivity?.seq; + const scheduleActivityContext = this.activityContextForCommand( + runState, + command.scheduleActivity?.activityId, + false + ); + + const scheduleLocalActivitySeq = command.scheduleLocalActivity?.seq; + const scheduleLocalActivityContext = this.activityContextForCommand( + runState, + command.scheduleLocalActivity?.activityId, + true + ); + + const childWorkflowSeq = command.startChildWorkflowExecution?.seq; + const childWorkflowId = + command.startChildWorkflowExecution?.workflowId ?? runState?.workflowContext.workflowId ?? ''; + const childWorkflowContext = this.workflowContextForTarget(runState, childWorkflowId); + + const signalExternalSeq = command.signalExternalWorkflowExecution?.seq; + const signalExternalTargetWorkflowId = + command.signalExternalWorkflowExecution?.workflowExecution?.workflowId ?? + command.signalExternalWorkflowExecution?.childWorkflowId ?? + runState?.workflowContext.workflowId ?? + ''; + const signalExternalContext = this.workflowContextForTarget( + runState, + signalExternalTargetWorkflowId + ); + + const cancelExternalSeq = command.requestCancelExternalWorkflowExecution?.seq; + const cancelExternalWorkflowId = + command.requestCancelExternalWorkflowExecution?.workflowExecution?.workflowId ?? + runState?.workflowContext.workflowId ?? + ''; + const cancelExternalContext = this.workflowContextForTarget(runState, cancelExternalWorkflowId); + + this.setContextIfPresent(runState?.activityContexts, scheduleActivitySeq, scheduleActivityContext); + this.setContextIfPresent( + runState?.activityContexts, + scheduleLocalActivitySeq, + scheduleLocalActivityContext + ); + this.setContextIfPresent(runState?.childWorkflowContexts, childWorkflowSeq, childWorkflowContext); + this.setContextIfPresent( + runState?.signalExternalWorkflowContexts, + signalExternalSeq, + signalExternalContext + ); + this.setContextIfPresent( + runState?.cancelExternalWorkflowContexts, + cancelExternalSeq, + cancelExternalContext + ); + const scheduleActivityCodecs = this.codecsForContext(scheduleActivityContext); + const scheduleLocalActivityCodecs = this.codecsForContext(scheduleLocalActivityContext); + const childWorkflowCodecs = this.codecsForContext(childWorkflowContext); + const signalExternalCodecs = this.codecsForContext(signalExternalContext); + + return >{ + ...command, + scheduleActivity: command.scheduleActivity + ? { + ...command.scheduleActivity, + arguments: await encodeOptional( + scheduleActivityCodecs, + command.scheduleActivity?.arguments + ), + // don't encode headers + headers: noopEncodeMap(command.scheduleActivity?.headers), + } + : undefined, + upsertWorkflowSearchAttributes: command.upsertWorkflowSearchAttributes + ? { + ...command.upsertWorkflowSearchAttributes, + searchAttributes: noopEncodeMap(command.upsertWorkflowSearchAttributes.searchAttributes), + } + : undefined, + respondToQuery: command.respondToQuery + ? { + ...command.respondToQuery, + succeeded: { + ...command.respondToQuery.succeeded, + response: await encodeOptionalSingle( + workflowCodecs, + command.respondToQuery.succeeded?.response ), - memo: await encodeMap(this.codecs, command.continueAsNewWorkflowExecution.memo), - // don't encode headers - headers: noopEncodeMap(command.continueAsNewWorkflowExecution.headers), - // don't encode searchAttributes - searchAttributes: noopEncodeMap(command.continueAsNewWorkflowExecution.searchAttributes), - } - : undefined, - startChildWorkflowExecution: command.startChildWorkflowExecution - ? { - ...command.startChildWorkflowExecution, - input: await encodeOptional(this.codecs, command.startChildWorkflowExecution.input), - memo: await encodeMap(this.codecs, command.startChildWorkflowExecution.memo), - // don't encode headers - headers: noopEncodeMap(command.startChildWorkflowExecution.headers), - // don't encode searchAttributes - searchAttributes: noopEncodeMap(command.startChildWorkflowExecution.searchAttributes), - } - : undefined, - signalExternalWorkflowExecution: command.signalExternalWorkflowExecution - ? { - ...command.signalExternalWorkflowExecution, - args: await encodeOptional(this.codecs, command.signalExternalWorkflowExecution.args), - headers: noopEncodeMap(command.signalExternalWorkflowExecution.headers), - } - : undefined, - scheduleLocalActivity: command.scheduleLocalActivity - ? { - ...command.scheduleLocalActivity, - arguments: await encodeOptional(this.codecs, command.scheduleLocalActivity.arguments), - // don't encode headers - headers: noopEncodeMap(command.scheduleLocalActivity.headers), - } - : undefined, - scheduleNexusOperation: command.scheduleNexusOperation - ? { - ...command.scheduleNexusOperation, - input: await encodeOptionalSingle(this.codecs, command.scheduleNexusOperation.input), - } - : undefined, - modifyWorkflowProperties: command.modifyWorkflowProperties + }, + failed: await encodeOptionalFailure(workflowCodecs, command.respondToQuery.failed), + } + : undefined, + updateResponse: command.updateResponse + ? { + ...command.updateResponse, + rejected: await encodeOptionalFailure(workflowCodecs, command.updateResponse.rejected), + completed: await encodeOptionalSingle(workflowCodecs, command.updateResponse.completed), + } + : undefined, + completeWorkflowExecution: command.completeWorkflowExecution + ? { + ...command.completeWorkflowExecution, + result: await encodeOptionalSingle( + workflowCodecs, + command.completeWorkflowExecution.result + ), + } + : undefined, + failWorkflowExecution: command.failWorkflowExecution + ? { + ...command.failWorkflowExecution, + failure: await encodeOptionalFailure(workflowCodecs, command.failWorkflowExecution.failure), + } + : undefined, + continueAsNewWorkflowExecution: command.continueAsNewWorkflowExecution + ? { + ...command.continueAsNewWorkflowExecution, + arguments: await encodeOptional( + workflowCodecs, + command.continueAsNewWorkflowExecution.arguments + ), + memo: await encodeMap(workflowCodecs, command.continueAsNewWorkflowExecution.memo), + // don't encode headers + headers: noopEncodeMap(command.continueAsNewWorkflowExecution.headers), + // don't encode searchAttributes + searchAttributes: noopEncodeMap(command.continueAsNewWorkflowExecution.searchAttributes), + } + : undefined, + startChildWorkflowExecution: command.startChildWorkflowExecution + ? { + ...command.startChildWorkflowExecution, + input: await encodeOptional(childWorkflowCodecs, command.startChildWorkflowExecution.input), + memo: await encodeMap(childWorkflowCodecs, command.startChildWorkflowExecution.memo), + // don't encode headers + headers: noopEncodeMap(command.startChildWorkflowExecution.headers), + // don't encode searchAttributes + searchAttributes: noopEncodeMap(command.startChildWorkflowExecution.searchAttributes), + } + : undefined, + signalExternalWorkflowExecution: command.signalExternalWorkflowExecution + ? { + ...command.signalExternalWorkflowExecution, + args: await encodeOptional( + signalExternalCodecs, + command.signalExternalWorkflowExecution.args + ), + headers: noopEncodeMap(command.signalExternalWorkflowExecution.headers), + } + : undefined, + scheduleLocalActivity: command.scheduleLocalActivity + ? { + ...command.scheduleLocalActivity, + arguments: await encodeOptional( + scheduleLocalActivityCodecs, + command.scheduleLocalActivity.arguments + ), + // don't encode headers + headers: noopEncodeMap(command.scheduleLocalActivity.headers), + } + : undefined, + scheduleNexusOperation: command.scheduleNexusOperation + ? { + ...command.scheduleNexusOperation, + input: await encodeOptionalSingle(workflowCodecs, command.scheduleNexusOperation.input), + } + : undefined, + modifyWorkflowProperties: command.modifyWorkflowProperties + ? { + ...command.modifyWorkflowProperties, + upsertedMemo: { + ...command.modifyWorkflowProperties.upsertedMemo, + fields: await encodeMap( + workflowCodecs, + command.modifyWorkflowProperties.upsertedMemo?.fields + ), + }, + } + : undefined, + userMetadata: + command.userMetadata && (command.userMetadata.summary || command.userMetadata.details) ? { - ...command.modifyWorkflowProperties, - upsertedMemo: { - ...command.modifyWorkflowProperties.upsertedMemo, - fields: await encodeMap( - this.codecs, - command.modifyWorkflowProperties.upsertedMemo?.fields - ), - }, + summary: await encodeOptionalSingle(workflowCodecs, command.userMetadata.summary), + details: await encodeOptionalSingle(workflowCodecs, command.userMetadata.details), } : undefined, - userMetadata: - command.userMetadata && (command.userMetadata.summary || command.userMetadata.details) - ? { - summary: await encodeOptionalSingle(this.codecs, command.userMetadata.summary), - details: await encodeOptionalSingle(this.codecs, command.userMetadata.details), - } - : undefined, - } - ) ?? [] + }; + }) ?? [] ) : null, } diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index a76e9b844..d52529a14 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -15,6 +15,8 @@ import { WorkflowSignalAnnotatedType, WorkflowUpdateAnnotatedType, ProtoFailure, + WorkflowSerializationContext, + ActivitySerializationContext, ApplicationFailure, WorkflowUpdateType, WorkflowUpdateValidatorType, @@ -25,6 +27,10 @@ import { VersioningBehavior, WorkflowDefinitionOptions, } from '@temporalio/common'; +import { + withFailureConverterContext, + withPayloadConverterContext, +} from '@temporalio/common/lib/converter/serialization-context'; import { decodeSearchAttributes, decodeTypedSearchAttributes, @@ -173,6 +179,19 @@ export class Activator implements ActivationHandler { cancelWorkflow: new Map>(), }; + /** + * Context maps used to bridge command encoding and activation decoding. + * + * Populate on command emission and consume on resolve jobs. + */ + readonly serializationContexts = { + activity: new Map(), + childWorkflow: new Map(), + // signalWorkflow and cancelWorkflow maintain independent seq counters; keep separate context maps. + signalExternalWorkflow: new Map(), + cancelExternalWorkflow: new Map(), + }; + /** * Holds buffered Update calls until a handler is registered */ @@ -550,12 +569,18 @@ export class Activator implements ActivationHandler { public startWorkflow(activation: coresdk.workflow_activation.IInitializeWorkflow): void { const execute = composeInterceptors(this.interceptors.inbound, 'execute', this.startWorkflowNextHandler.bind(this)); + const payloadConverter = withPayloadConverterContext(this.payloadConverter, { + type: 'workflow', + namespace: this.info.namespace, + workflowId: this.info.workflowId, + workflowType: this.info.workflowType, + }); untrackPromise( executeWithLifecycleLogging(() => execute({ headers: activation.headers ?? {}, - args: arrayFromPayloads(this.payloadConverter, activation.arguments), + args: arrayFromPayloads(payloadConverter, activation.arguments), }) ).then(this.completeWorkflow.bind(this), this.handleWorkflowFailure.bind(this)) ); @@ -563,6 +588,14 @@ export class Activator implements ActivationHandler { public initializeWorkflow(activation: coresdk.workflow_activation.IInitializeWorkflow): void { const { continuedFailure, lastCompletionResult, memo, searchAttributes } = activation; + const workflowContext = { + type: 'workflow' as const, + namespace: this.info.namespace, + workflowId: this.info.workflowId, + workflowType: this.info.workflowType, + }; + const payloadConverter = withPayloadConverterContext(this.payloadConverter, workflowContext); + const failureConverter = withFailureConverterContext(this.failureConverter, workflowContext); // Most things related to initialization have already been handled in the constructor this.mutateWorkflowInfo((info) => ({ @@ -571,12 +604,10 @@ export class Activator implements ActivationHandler { searchAttributes: decodeSearchAttributes(searchAttributes?.indexedFields), typedSearchAttributes: decodeTypedSearchAttributes(searchAttributes?.indexedFields), - memo: mapFromPayloads(this.payloadConverter, memo?.fields), - lastResult: fromPayloadsAtIndex(this.payloadConverter, 0, lastCompletionResult?.payloads), + memo: mapFromPayloads(payloadConverter, memo?.fields), + lastResult: fromPayloadsAtIndex(payloadConverter, 0, lastCompletionResult?.payloads), lastFailure: - continuedFailure != null - ? this.failureConverter.failureToError(continuedFailure, this.payloadConverter) - : undefined, + continuedFailure != null ? failureConverter.failureToError(continuedFailure, payloadConverter) : undefined, })); if (this.workflowDefinitionOptionsGetter) { this.versioningBehavior = this.workflowDefinitionOptionsGetter().versioningBehavior; @@ -599,23 +630,32 @@ export class Activator implements ActivationHandler { if (!activation.result) { throw new TypeError('Got ResolveActivity activation with no result'); } - const { resolve, reject } = this.consumeCompletion('activity', getSeq(activation)); + const seq = getSeq(activation); + const { resolve, reject } = this.consumeCompletion('activity', seq); + const activityContext = this.serializationContexts.activity.get(seq); + this.serializationContexts.activity.delete(seq); + const payloadConverter = activityContext + ? withPayloadConverterContext(this.payloadConverter, activityContext) + : this.payloadConverter; + const failureConverter = activityContext + ? withFailureConverterContext(this.failureConverter, activityContext) + : this.failureConverter; if (activation.result.completed) { const completed = activation.result.completed; - const result = completed.result ? this.payloadConverter.fromPayload(completed.result) : undefined; + const result = completed.result ? payloadConverter.fromPayload(completed.result) : undefined; resolve(result); } else if (activation.result.failed) { const { failure } = activation.result.failed; if (failure == null) { throw new TypeError('Got failed result with no failure attribute'); } - reject(this.failureToError(failure)); + reject(failureConverter.failureToError(failure, payloadConverter)); } else if (activation.result.cancelled) { const { failure } = activation.result.cancelled; if (failure == null) { throw new TypeError('Got cancelled result with no failure attribute'); } - reject(this.failureToError(failure)); + reject(failureConverter.failureToError(failure, payloadConverter)); } else if (activation.result.backoff) { reject(new LocalActivityDoBackoff(activation.result.backoff)); } @@ -624,7 +664,11 @@ export class Activator implements ActivationHandler { public resolveChildWorkflowExecutionStart( activation: coresdk.workflow_activation.IResolveChildWorkflowExecutionStart ): void { - const { resolve, reject } = this.consumeCompletion('childWorkflowStart', getSeq(activation)); + const seq = getSeq(activation); + const { resolve, reject } = this.consumeCompletion('childWorkflowStart', seq); + // Keep context until ResolveChildWorkflowExecution to decode the child result with child workflow identity. + // For failed/cancelled start, no completion resolve follows, so consume context immediately. + const childContext = this.serializationContexts.childWorkflow.get(seq); if (activation.succeeded) { if (!activation.succeeded.runId) { throw new TypeError('Got ResolveChildWorkflowExecutionStart with no runId'); @@ -644,11 +688,19 @@ export class Activator implements ActivationHandler { activation.failed.workflowType ) ); + this.serializationContexts.childWorkflow.delete(seq); } else if (activation.cancelled) { if (!activation.cancelled.failure) { throw new TypeError('Got no failure in cancelled variant'); } - reject(this.failureToError(activation.cancelled.failure)); + const payloadConverter = childContext + ? withPayloadConverterContext(this.payloadConverter, childContext) + : this.payloadConverter; + const failureConverter = childContext + ? withFailureConverterContext(this.failureConverter, childContext) + : this.failureConverter; + reject(failureConverter.failureToError(activation.cancelled.failure, payloadConverter)); + this.serializationContexts.childWorkflow.delete(seq); } else { throw new TypeError('Got ResolveChildWorkflowExecutionStart with no status'); } @@ -658,23 +710,32 @@ export class Activator implements ActivationHandler { if (!activation.result) { throw new TypeError('Got ResolveChildWorkflowExecution activation with no result'); } - const { resolve, reject } = this.consumeCompletion('childWorkflowComplete', getSeq(activation)); + const seq = getSeq(activation); + const { resolve, reject } = this.consumeCompletion('childWorkflowComplete', seq); + const childContext = this.serializationContexts.childWorkflow.get(seq); + this.serializationContexts.childWorkflow.delete(seq); + const payloadConverter = childContext + ? withPayloadConverterContext(this.payloadConverter, childContext) + : this.payloadConverter; + const failureConverter = childContext + ? withFailureConverterContext(this.failureConverter, childContext) + : this.failureConverter; if (activation.result.completed) { const completed = activation.result.completed; - const result = completed.result ? this.payloadConverter.fromPayload(completed.result) : undefined; + const result = completed.result ? payloadConverter.fromPayload(completed.result) : undefined; resolve(result); } else if (activation.result.failed) { const { failure } = activation.result.failed; if (failure == null) { throw new TypeError('Got failed result with no failure attribute'); } - reject(this.failureToError(failure)); + reject(failureConverter.failureToError(failure, payloadConverter)); } else if (activation.result.cancelled) { const { failure } = activation.result.cancelled; if (failure == null) { throw new TypeError('Got cancelled result with no failure attribute'); } - reject(this.failureToError(failure)); + reject(failureConverter.failureToError(failure, payloadConverter)); } } @@ -702,7 +763,12 @@ export class Activator implements ActivationHandler { const seq = getSeq(activation); if (activation.result?.completed) { - const result = this.payloadConverter.fromPayload(activation.result.completed); + const result = withPayloadConverterContext(this.payloadConverter, { + type: 'workflow', + namespace: this.info.namespace, + workflowId: this.info.workflowId, + workflowType: this.info.workflowType, + }).fromPayload(activation.result.completed); // It is possible for ResolveNexusOperation to be received without a prior ResolveNexusOperationStart, // e.g. because the handler completed the Operation synchronously. @@ -775,9 +841,15 @@ export class Activator implements ActivationHandler { queryType === ENHANCED_STACK_TRACE_QUERY_NAME; const interceptors = isInternalQuery ? [] : this.interceptors.inbound; const execute = composeInterceptors(interceptors, 'handleQuery', this.queryWorkflowNextHandler.bind(this)); + const payloadConverter = withPayloadConverterContext(this.payloadConverter, { + type: 'workflow', + namespace: this.info.namespace, + workflowId: this.info.workflowId, + workflowType: this.info.workflowType, + }); execute({ queryName: queryType, - args: arrayFromPayloads(this.payloadConverter, activation.arguments), + args: arrayFromPayloads(payloadConverter, activation.arguments), queryId, headers: headers ?? {}, }).then( @@ -827,12 +899,20 @@ export class Activator implements ActivationHandler { return; } - const makeInput = (): UpdateInput => ({ - updateId, - args: arrayFromPayloads(this.payloadConverter, activation.input), - name, - headers: headers ?? {}, - }); + const makeInput = (): UpdateInput => { + const payloadConverter = withPayloadConverterContext(this.payloadConverter, { + type: 'workflow', + namespace: this.info.namespace, + workflowId: this.info.workflowId, + workflowType: this.info.workflowType, + }); + return { + updateId, + args: arrayFromPayloads(payloadConverter, activation.input), + name, + headers: headers ?? {}, + }; + }; // The implementation below is responsible for upholding, and constrained // by, the following contract: @@ -1003,8 +1083,14 @@ export class Activator implements ActivationHandler { const signalExecutionNum = this.signalHandlerExecutionSeq++; this.inProgressSignals.set(signalExecutionNum, { name: signalName, unfinishedPolicy }); const execute = composeInterceptors(interceptors, 'handleSignal', this.signalWorkflowNextHandler.bind(this)); + const payloadConverter = withPayloadConverterContext(this.payloadConverter, { + type: 'workflow', + namespace: this.info.namespace, + workflowId: this.info.workflowId, + workflowType: this.info.workflowType, + }); execute({ - args: arrayFromPayloads(this.payloadConverter, activation.input), + args: arrayFromPayloads(payloadConverter, activation.input), signalName, headers: headers ?? {}, }) @@ -1029,9 +1115,18 @@ export class Activator implements ActivationHandler { } public resolveSignalExternalWorkflow(activation: coresdk.workflow_activation.IResolveSignalExternalWorkflow): void { - const { resolve, reject } = this.consumeCompletion('signalWorkflow', getSeq(activation)); + const seq = getSeq(activation); + const { resolve, reject } = this.consumeCompletion('signalWorkflow', seq); + const signalContext = this.serializationContexts.signalExternalWorkflow.get(seq); + this.serializationContexts.signalExternalWorkflow.delete(seq); + const payloadConverter = signalContext + ? withPayloadConverterContext(this.payloadConverter, signalContext) + : this.payloadConverter; + const failureConverter = signalContext + ? withFailureConverterContext(this.failureConverter, signalContext) + : this.failureConverter; if (activation.failure) { - reject(this.failureToError(activation.failure)); + reject(failureConverter.failureToError(activation.failure, payloadConverter)); } else { resolve(undefined); } @@ -1040,9 +1135,18 @@ export class Activator implements ActivationHandler { public resolveRequestCancelExternalWorkflow( activation: coresdk.workflow_activation.IResolveRequestCancelExternalWorkflow ): void { - const { resolve, reject } = this.consumeCompletion('cancelWorkflow', getSeq(activation)); + const seq = getSeq(activation); + const { resolve, reject } = this.consumeCompletion('cancelWorkflow', seq); + const cancelContext = this.serializationContexts.cancelExternalWorkflow.get(seq); + this.serializationContexts.cancelExternalWorkflow.delete(seq); + const payloadConverter = cancelContext + ? withPayloadConverterContext(this.payloadConverter, cancelContext) + : this.payloadConverter; + const failureConverter = cancelContext + ? withFailureConverterContext(this.failureConverter, cancelContext) + : this.failureConverter; if (activation.failure) { - reject(this.failureToError(activation.failure)); + reject(failureConverter.failureToError(activation.failure, payloadConverter)); } else { resolve(undefined); } @@ -1204,8 +1308,14 @@ export class Activator implements ActivationHandler { } private completeQuery(queryId: string, result: unknown): void { + const payloadConverter = withPayloadConverterContext(this.payloadConverter, { + type: 'workflow', + namespace: this.info.namespace, + workflowId: this.info.workflowId, + workflowType: this.info.workflowType, + }); this.pushCommand({ - respondToQuery: { queryId, succeeded: { response: this.payloadConverter.toPayload(result) } }, + respondToQuery: { queryId, succeeded: { response: payloadConverter.toPayload(result) } }, }); } @@ -1223,8 +1333,14 @@ export class Activator implements ActivationHandler { } private completeUpdate(protocolInstanceId: string, result: unknown): void { + const payloadConverter = withPayloadConverterContext(this.payloadConverter, { + type: 'workflow', + namespace: this.info.namespace, + workflowId: this.info.workflowId, + workflowType: this.info.workflowType, + }); this.pushCommand({ - updateResponse: { protocolInstanceId, completed: this.payloadConverter.toPayload(result) }, + updateResponse: { protocolInstanceId, completed: payloadConverter.toPayload(result) }, }); } @@ -1262,10 +1378,16 @@ export class Activator implements ActivationHandler { } private completeWorkflow(result: unknown): void { + const payloadConverter = withPayloadConverterContext(this.payloadConverter, { + type: 'workflow', + namespace: this.info.namespace, + workflowId: this.info.workflowId, + workflowType: this.info.workflowType, + }); this.pushCommand( { completeWorkflowExecution: { - result: this.payloadConverter.toPayload(result), + result: payloadConverter.toPayload(result), }, }, true @@ -1273,11 +1395,27 @@ export class Activator implements ActivationHandler { } errorToFailure(err: unknown): ProtoFailure { - return this.failureConverter.errorToFailure(err, this.payloadConverter); + const workflowContext = { + type: 'workflow' as const, + namespace: this.info.namespace, + workflowId: this.info.workflowId, + workflowType: this.info.workflowType, + }; + const payloadConverter = withPayloadConverterContext(this.payloadConverter, workflowContext); + const failureConverter = withFailureConverterContext(this.failureConverter, workflowContext); + return failureConverter.errorToFailure(err, payloadConverter); } failureToError(failure: ProtoFailure): Error { - return this.failureConverter.failureToError(failure, this.payloadConverter); + const workflowContext = { + type: 'workflow' as const, + namespace: this.info.namespace, + workflowId: this.info.workflowId, + workflowType: this.info.workflowType, + }; + const payloadConverter = withPayloadConverterContext(this.payloadConverter, workflowContext); + const failureConverter = withFailureConverterContext(this.failureConverter, workflowContext); + return failureConverter.failureToError(failure, payloadConverter); } } diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 4492d73b7..d80e6cbaf 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -36,6 +36,11 @@ import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { temporal } from '@temporalio/proto'; import { deepMerge } from '@temporalio/common/lib/internal-workflow'; import { throwIfReservedName } from '@temporalio/common/lib/reserved'; +import { + type ActivitySerializationContext, + type WorkflowSerializationContext, + withPayloadConverterContext, +} from '@temporalio/common/lib/converter/serialization-context'; import { CancellationScope, registerSleepImplementation } from './cancellation-scope'; import { UpdateScope } from './update-scope'; import { @@ -93,6 +98,12 @@ export function addDefaultWorkflowOptions( */ function timerNextHandler({ seq, durationMs, options }: TimerInput) { const activator = getActivator(); + const payloadConverter = withPayloadConverterContext(activator.payloadConverter, { + type: 'workflow', + namespace: activator.info.namespace, + workflowId: activator.info.workflowId, + workflowType: activator.info.workflowType, + }); return new Promise((resolve, reject) => { const scope = CancellationScope.current(); if (scope.consideredCancelled) { @@ -119,7 +130,7 @@ function timerNextHandler({ seq, durationMs, options }: TimerInput) { seq, startToFireTimeout: msToTs(durationMs), }, - userMetadata: userMetadataToPayload(activator.payloadConverter, options?.summary, undefined), + userMetadata: userMetadataToPayload(payloadConverter, options?.summary, undefined), }); activator.completions.timer.set(seq, { resolve, @@ -167,6 +178,17 @@ const validateLocalActivityOptions = validateActivityOptions; function scheduleActivityNextHandler({ options, args, headers, seq, activityType }: ActivityInput): Promise { const activator = getActivator(); validateActivityOptions(options); + const taskQueue = options.taskQueue || activator.info.taskQueue; + const activityId = options.activityId ?? `${seq}`; + const context: ActivitySerializationContext = { + type: 'activity', + namespace: activator.info.namespace, + activityId, + workflowId: activator.info.workflowId, + workflowType: activator.info.workflowType, + isLocal: false, + }; + const payloadConverter = withPayloadConverterContext(activator.payloadConverter, context); return new Promise((resolve, reject) => { const scope = CancellationScope.current(); if (scope.consideredCancelled) { @@ -190,11 +212,11 @@ function scheduleActivityNextHandler({ options, args, headers, seq, activityType activator.pushCommand({ scheduleActivity: { seq, - activityId: options.activityId ?? `${seq}`, + activityId, activityType, - arguments: toPayloads(activator.payloadConverter, ...args), + arguments: toPayloads(payloadConverter, ...args), retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined, - taskQueue: options.taskQueue || activator.info.taskQueue, + taskQueue, heartbeatTimeout: msOptionalToTs(options.heartbeatTimeout), scheduleToCloseTimeout: msOptionalToTs(options.scheduleToCloseTimeout), startToCloseTimeout: msOptionalToTs(options.startToCloseTimeout), @@ -205,8 +227,10 @@ function scheduleActivityNextHandler({ options, args, headers, seq, activityType versioningIntent: versioningIntentToProto(options.versioningIntent), // eslint-disable-line @typescript-eslint/no-deprecated priority: options.priority ? compilePriority(options.priority) : undefined, }, - userMetadata: userMetadataToPayload(activator.payloadConverter, options.summary, undefined), + userMetadata: userMetadataToPayload(payloadConverter, options.summary, undefined), }); + // Add at scheduling time; consumed and removed on resolveActivity in Activator. + activator.serializationContexts.activity.set(seq, context); activator.completions.activity.set(seq, { resolve, reject, @@ -227,6 +251,16 @@ async function scheduleLocalActivityNextHandler({ originalScheduleTime, }: LocalActivityInput): Promise { const activator = getActivator(); + const activityId = `${seq}`; + const context: ActivitySerializationContext = { + type: 'activity', + namespace: activator.info.namespace, + activityId, + workflowId: activator.info.workflowId, + workflowType: activator.info.workflowType, + isLocal: true, + }; + const payloadConverter = withPayloadConverterContext(activator.payloadConverter, context); // Eagerly fail the local activity (which will in turn fail the workflow task. // Do not fail on replay where the local activities may not be registered on the replay worker. if (!activator.info.unsafe.isReplaying && !activator.registeredActivityNames.has(activityType)) { @@ -260,9 +294,9 @@ async function scheduleLocalActivityNextHandler({ attempt, originalScheduleTime, // Intentionally not exposing activityId as an option - activityId: `${seq}`, + activityId, activityType, - arguments: toPayloads(activator.payloadConverter, ...args), + arguments: toPayloads(payloadConverter, ...args), retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined, scheduleToCloseTimeout: msOptionalToTs(options.scheduleToCloseTimeout), startToCloseTimeout: msOptionalToTs(options.startToCloseTimeout), @@ -271,8 +305,10 @@ async function scheduleLocalActivityNextHandler({ headers, cancellationType: encodeActivityCancellationType(options.cancellationType), }, - userMetadata: userMetadataToPayload(activator.payloadConverter, options.summary, undefined), + userMetadata: userMetadataToPayload(payloadConverter, options.summary, undefined), }); + // Add at scheduling time; consumed and removed on resolveActivity in Activator. + activator.serializationContexts.activity.set(seq, context); activator.completions.activity.set(seq, { resolve, reject, @@ -363,6 +399,13 @@ function startChildWorkflowExecutionNextHandler({ }: StartChildWorkflowExecutionInput): Promise<[Promise, Promise]> { const activator = getActivator(); const workflowId = options.workflowId ?? uuid4(); + const context: WorkflowSerializationContext = { + type: 'workflow', + namespace: activator.info.namespace, + workflowId, + workflowType, + }; + const payloadConverter = withPayloadConverterContext(activator.payloadConverter, context); const startPromise = new Promise((resolve, reject) => { const scope = CancellationScope.current(); if (scope.consideredCancelled) { @@ -388,7 +431,7 @@ function startChildWorkflowExecutionNextHandler({ seq, workflowId, workflowType, - input: toPayloads(activator.payloadConverter, ...options.args), + input: toPayloads(payloadConverter, ...options.args), retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined, taskQueue: options.taskQueue || activator.info.taskQueue, workflowExecutionTimeout: msOptionalToTs(options.workflowExecutionTimeout), @@ -404,12 +447,14 @@ function startChildWorkflowExecutionNextHandler({ options.searchAttributes || options.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated ? encodeUnifiedSearchAttributes(options.searchAttributes, options.typedSearchAttributes) // eslint-disable-line @typescript-eslint/no-deprecated : undefined, - memo: options.memo && mapToPayloads(activator.payloadConverter, options.memo), + memo: options.memo && mapToPayloads(payloadConverter, options.memo), versioningIntent: versioningIntentToProto(options.versioningIntent), // eslint-disable-line @typescript-eslint/no-deprecated priority: options.priority ? compilePriority(options.priority) : undefined, }, - userMetadata: userMetadataToPayload(activator.payloadConverter, options?.staticSummary, options?.staticDetails), + userMetadata: userMetadataToPayload(payloadConverter, options?.staticSummary, options?.staticDetails), }); + // Add at scheduling time; consumed and removed on child resolve jobs in Activator. + activator.serializationContexts.childWorkflow.set(seq, context); activator.completions.childWorkflowStart.set(seq, { resolve, reject, @@ -437,6 +482,13 @@ function startChildWorkflowExecutionNextHandler({ function signalWorkflowNextHandler({ seq, signalName, args, target, headers }: SignalWorkflowInput) { const activator = getActivator(); + const targetWorkflowId = (target.type === 'external' ? target.workflowExecution.workflowId : target.childWorkflowId)!; + const context: WorkflowSerializationContext = { + type: 'workflow', + namespace: activator.info.namespace, + workflowId: targetWorkflowId, + }; + const payloadConverter = withPayloadConverterContext(activator.payloadConverter, context); return new Promise((resolve, reject) => { const scope = CancellationScope.current(); if (scope.consideredCancelled) { @@ -457,7 +509,7 @@ function signalWorkflowNextHandler({ seq, signalName, args, target, headers }: S activator.pushCommand({ signalExternalWorkflowExecution: { seq, - args: toPayloads(activator.payloadConverter, ...args), + args: toPayloads(payloadConverter, ...args), headers, signalName, ...(target.type === 'external' @@ -473,6 +525,8 @@ function signalWorkflowNextHandler({ seq, signalName, args, target, headers }: S }, }); + // Add at scheduling time; consumed and removed on resolveSignalExternalWorkflow in Activator. + activator.serializationContexts.signalExternalWorkflow.set(seq, context); activator.completions.signalWorkflow.set(seq, { resolve, reject }); }); } @@ -729,6 +783,12 @@ export function getExternalWorkflowHandle(workflowId: string, runId?: string): E }, }, }); + // Add at scheduling time; consumed and removed on resolveRequestCancelExternalWorkflow in Activator. + activator.serializationContexts.cancelExternalWorkflow.set(seq, { + type: 'workflow', + namespace: activator.info.namespace, + workflowId, + }); activator.completions.cancelWorkflow.set(seq, { resolve, reject }); }); }, @@ -1009,14 +1069,20 @@ export function makeContinueAsNewFunc( }; return (...args: Parameters): Promise => { + const payloadConverter = withPayloadConverterContext(activator.payloadConverter, { + type: 'workflow', + namespace: activator.info.namespace, + workflowId: activator.info.workflowId, + workflowType: activator.info.workflowType, + }); const fn = composeInterceptors(activator.interceptors.outbound, 'continueAsNew', async (input) => { const { headers, args, options } = input; throw new ContinueAsNew({ workflowType: options.workflowType, - arguments: toPayloads(activator.payloadConverter, ...args), + arguments: toPayloads(payloadConverter, ...args), headers, taskQueue: options.taskQueue, - memo: options.memo && mapToPayloads(activator.payloadConverter, options.memo), + memo: options.memo && mapToPayloads(payloadConverter, options.memo), searchAttributes: options.searchAttributes || options.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated ? encodeUnifiedSearchAttributes(options.searchAttributes, options.typedSearchAttributes) // eslint-disable-line @typescript-eslint/no-deprecated @@ -1664,6 +1730,12 @@ export function upsertSearchAttributes(searchAttributes: SearchAttributes | Sear */ export function upsertMemo(memo: Record): void { const activator = assertInWorkflowContext('Workflow.upsertMemo(...) may only be used from a Workflow Execution.'); + const payloadConverter = withPayloadConverterContext(activator.payloadConverter, { + type: 'workflow', + namespace: activator.info.namespace, + workflowId: activator.info.workflowId, + workflowType: activator.info.workflowType, + }); if (memo == null) { throw new Error('memo must be a non-null Record'); @@ -1673,7 +1745,7 @@ export function upsertMemo(memo: Record): void { modifyWorkflowProperties: { upsertedMemo: { fields: mapToPayloads( - activator.payloadConverter, + payloadConverter, // Convert null to undefined Object.fromEntries(Object.entries(memo).map(([k, v]) => [k, v ?? undefined])) ),