feat: add serialization context to data conversion interfaces#1950
feat: add serialization context to data conversion interfaces#1950
Conversation
Simplify the serialization context plumbing across the SDK: - AsyncCompletionClient: remove auto-derivation fallback and context constructor param; only withContext() sets context - Activity: collapse duplicate contextDataConverter param into the existing dataConverter param - Worker: remove ActivitySerializationContextInput indirection, derive context from ActivityInfo at call site - WorkflowCodecRunner: remove WeakMap cache, 8 wrapper methods, and dead ?? workflowContext fallbacks; inline codecsForContext() at call sites using boolean-flag pattern - Workflow internals: remove auto-derivation fallback in resolveActivity, remove unused workflowId parameter from errorToFailure/failureToError, remove incorrect self-workflow fallback in signal context - Workflow handlers: deduplicate context construction (create once, reuse) - Schedule helpers: simplify context derivation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
a02fbf0 to
5d11fee
Compare
|
Sorry I don't know how to lessen the diff for Open to suggestions as the diff looks worse than it actually is |
Add integration tests verifying full serialization trace for every code path: workflow start, activity, child workflow, query, update, signal, external signal/cancel, schedule actions, and async completion. Uses a Tracer type that accumulates context tags at each serialization boundary, with t.deepEqual on full trace arrays to prevent false positives.
53340c9 to
638ce22
Compare
chris-olszewski
left a comment
There was a problem hiding this comment.
Serializing some initial comments
| await worker.runUntil(async () => { | ||
| const handle = await client.workflow.start(serializationContextWorkflow, { | ||
| workflowId, | ||
| taskQueue, | ||
| args: [makeTracer('start-input'), childWorkflowId, signalTargetWorkflowId, cancelTargetWorkflowId], | ||
| }); | ||
|
|
||
| const queryResult = await handle.query(serializationContextQuery, makeTracer('query-input')); | ||
| const updateResult = await handle.executeUpdate(serializationContextUpdate, { | ||
| args: [makeTracer('update-input')], | ||
| }); | ||
| await handle.signal(serializationContextFinishSignal, makeTracer('finish-signal-input')); | ||
|
|
||
| const result = await handle.result(); |
There was a problem hiding this comment.
Small suggestion, but I prefer keeping assertions outside of the worker function.
| await worker.runUntil(async () => { | |
| const handle = await client.workflow.start(serializationContextWorkflow, { | |
| workflowId, | |
| taskQueue, | |
| args: [makeTracer('start-input'), childWorkflowId, signalTargetWorkflowId, cancelTargetWorkflowId], | |
| }); | |
| const queryResult = await handle.query(serializationContextQuery, makeTracer('query-input')); | |
| const updateResult = await handle.executeUpdate(serializationContextUpdate, { | |
| args: [makeTracer('update-input')], | |
| }); | |
| await handle.signal(serializationContextFinishSignal, makeTracer('finish-signal-input')); | |
| const result = await handle.result(); | |
| const { result, queryResult, updateResult } = await worker.runUntil(async () => { | |
| const handle = await client.workflow.start(serializationContextWorkflow, { | |
| workflowId, | |
| taskQueue, | |
| args: [makeTracer('start-input'), childWorkflowId, signalTargetWorkflowId, cancelTargetWorkflowId], | |
| }); | |
| const queryResult = await handle.query(serializationContextQuery, makeTracer('query-input')); | |
| const updateResult = await handle.executeUpdate(serializationContextUpdate, { | |
| args: [makeTracer('update-input')], | |
| }); | |
| await handle.signal(serializationContextFinishSignal, makeTracer('finish-signal-input')); | |
| const result = await handle.result(); | |
| return { result, queryResult, updateResult }; | |
| }); |
| export function withSerializationContext( | ||
| converter: LoadedDataConverter, | ||
| context: SerializationContext | ||
| ): LoadedDataConverter { | ||
| const payloadConverter = withPayloadConverterContext(converter.payloadConverter, context); | ||
| const failureConverter = withFailureConverterContext(converter.failureConverter, context); | ||
| let codecsChanged = false; | ||
| const maybeBoundCodecs = converter.payloadCodecs.map((codec) => { | ||
| const maybeContextCodec = withPayloadCodecContext(codec, context); | ||
| if (maybeContextCodec !== codec) { | ||
| codecsChanged = true; | ||
| } | ||
| return maybeContextCodec; | ||
| }); | ||
| const payloadCodecs = codecsChanged ? maybeBoundCodecs : converter.payloadCodecs; | ||
|
|
||
| if ( | ||
| payloadConverter === converter.payloadConverter && | ||
| failureConverter === converter.failureConverter && | ||
| !codecsChanged | ||
| ) { | ||
| return converter; | ||
| } | ||
| return { payloadConverter, failureConverter, payloadCodecs }; | ||
| } |
There was a problem hiding this comment.
Given our use === here, we might want to inform users via doc comment about the implications of returning this from withContext in their implementations. An alternative would be to just check if withContext is defined on any of the converters or codecs and assume change if one is present.
There was a problem hiding this comment.
IIUC, this is only a memory optimization, right? If so, the fact that === might give false positive is not that much of a worry.
| const dataConverter = this.context | ||
| ? withSerializationContext(this.dataConverter, this.context) | ||
| : this.dataConverter; |
There was a problem hiding this comment.
Might be worth making this snippet a getter to use in complete/fail/reportCancellation/heartbeat.
packages/worker/src/worker.ts
Outdated
| this.options.namespace, | ||
| this.options.taskQueue | ||
| ); | ||
| contextDataConverter = withSerializationContext(this.options.loadedDataConverter, { |
There was a problem hiding this comment.
Do we need to be concerned about not reaching this line and using the data converter without context in the catch?
There was a problem hiding this comment.
The current behavior should be correct — if we fail, it's because either:
- IllegalStateError — we don't have meaningful context to apply
- extractActivityInfo threw — we don't have valid info to build context from
In both cases, context-free encoding is the only option since the information needed to construct the context doesn't exist yet
packages/workflow/src/workflow.ts
Outdated
| import { deepMerge } from '@temporalio/common/lib/internal-workflow'; | ||
| import { throwIfReservedName } from '@temporalio/common/lib/reserved'; | ||
| import { | ||
| ActivitySerializationContext, |
There was a problem hiding this comment.
| ActivitySerializationContext, | |
| type ActivitySerializationContext, |
| /** | ||
| * Context for workflow-level serialization operations. | ||
| */ | ||
| export interface WorkflowSerializationContext { |
There was a problem hiding this comment.
We have workflowType? in Activity context, but not here. Of course, that information is not always available, but when it is, shouldn't we also include it here?
Or put another way, how can that information be useful to serializers on an activity, but not needed on a workflow?
| /** | ||
| * Context for workflow-level serialization operations. | ||
| */ | ||
| export interface WorkflowSerializationContext { |
There was a problem hiding this comment.
We should consider adding some form of discriminant on serialization contexts. Maybe a type: 'workflow'.
Two reasons:
- As it is now, the only way to determine whether the current serialization belongs to a workflow or an activity, and therefore which fields can be accessed, is to first do
"isLocal" in context(becauseisLocalis the only field which is guaranteed to be set in one context but not he other). - There will likely be other types of serialization context in the future, which may further complicate proper use.
| /** Parent workflow type when this activity is associated with a workflow. */ | ||
| workflowType?: string; | ||
| /** Whether the activity is a local activity started from a workflow. */ | ||
| isLocal: boolean; |
There was a problem hiding this comment.
Thought: with that information on hand, that means we (i.e. the sdk, not the user), could now skip codecs on local activities's input args, as those are never transferred through the network/persisted to history anyway. That optimization was not possible until now.
| * Return a loaded data converter where all components are context-bound when supported. | ||
| */ | ||
| // ts-prune-ignore-next (imported via lib/converter/serialization-context) | ||
| export function withSerializationContext( |
There was a problem hiding this comment.
Can you please confirm that this function is not intended to be ever be used in workflow context?
There was a problem hiding this comment.
It's only used in worker.ts at the moment, if that's what you're asking.
It calls withPayloadCodecContext so necessarily I don't think it can be in workflow context (or is that what you're implying?)
There was a problem hiding this comment.
Yeah, that's what I means. I think we should explicitly state that assumption in the typedoc. That's an internal API only, and should be obvious to TS SDK maintainers, but can be easily missed by our colleagues that are less familiar with implications of the TS' main thread vs workerthreads design.
To clarify my concern: I think calling this function from within the workflow sandbox would actually work without any immediate issue, but could result in inclusion of unnecessary code in the workflow bundle, thus increasing the memory footprint. That would be an easily missed bug, hence worth a warning.
| return appendTraceStep(decoded, `from:${contextTag(this.context)}`) as T; | ||
| } | ||
|
|
||
| withContext(context: SerializationContext): PayloadConverter { |
There was a problem hiding this comment.
Kind of disappointing that users must systematically write this function (which will basically always be the same code if they are context aware) and that they must make toPayload and fromPayload (or similar in other types of serializers) deal with the possibility of being called without a context, which we know should never happen if they support context and using a recent sdk.
mjameswh
left a comment
There was a problem hiding this comment.
Please don't merge yet. I want to discuss DX a bit more before committing to this API.
packages/worker/src/worker.ts
Outdated
| const context: ActivitySerializationContext = { | ||
| namespace: info.workflowNamespace, | ||
| activityId: info.activityId, | ||
| ...(info.workflowExecution.workflowId ? { workflowId: info.workflowExecution.workflowId } : {}), |
There was a problem hiding this comment.
nit: Why the intermediate object+spread operator in this case (and workflowType below)? I don't think there's any reason to differentiate the "property set to undefined" vs "property not set at all" cases here, so you can take a shorter form.
packages/worker/src/worker.ts
Outdated
| const context: ActivitySerializationContext = { | ||
| namespace: info.workflowNamespace, | ||
| activityId: info.activityId, | ||
| ...(info.workflowExecution.workflowId ? { workflowId: info.workflowExecution.workflowId } : {}), |
|
I'll continue review another day. Please hold. |
- Add discriminant field (type: 'workflow' | 'activity') to context types for clean union narrowing without 'isLocal' in context checks - Add optional workflowType to WorkflowSerializationContext, populated where available - Simplify conditional spread patterns in worker.ts to direct assignment - Extract contextDataConverter getter in AsyncCompletionClient - Fix type-only import in workflow.ts - Move test assertions outside worker.runUntil - Add JSDoc on withSerializationContext (worker-only, === optimization) - Replace unit tests with integration coverage: remove 10 implementation- coupled unit tests, keep forgetRun leak guard, add local activity integration test Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Addressed PR feedback and reduced the test code (removed unit tests, not enough signal for them to be worth keeping) |
Summary
Closes: #1661
Add serialization context to the data conversion interfaces (
PayloadConverter,FailureConverter,PayloadCodec), enabling converters to vary behavior based on which workflow or activity owns the data being serialized. Brings the TypeScript SDK to parity with the Python SDK's serialization context support.How it works
Converters can optionally implement
withContext(context: SerializationContext)to receive a context-bound copy. If not implemented, the converter is used as-is.Context is threaded at every serialization boundary:
Seq-based context tracking:
Both the workflow activator (
internals.ts) and theWorkflowCodecRunnermaintain parallel maps from command sequence number → serialization context. When a command is issued (e.g.scheduleActivity), the context is stored keyed by seq. When the corresponding resolve job arrives (e.g.resolveActivity), the stored context is retrieved, used for deserialization, and deleted. This is necessary because scheduling and resolution happen in separate activation cycles.What to pay attention to
PayloadConverter/FailureConverter(workflow isolate side), while the codec runner tracks contexts forPayloadCodec(worker side). They follow the same lifecycle but operate independently.AsyncCompletionClient.withContext()— the only way to bind context. By-ID and by-token paths without it use the bare converter.Test plan
test-serialization-context.ts— Integration tests using aTracertype that accumulates context tags at each serialization hop. Full trace arrays are asserted witht.deepEqualto catch false positives. Covers: workflow start, activity, child workflow, query, update, signal, external signal/cancel, schedule actions, async completion (by-id, bound-token, unbound-token).test-workflow-codec-runner.ts— 9 unit tests verifying seq map contents and codec context binding for: activity, local activity, child workflow round-trips, workflow-level commands, signal/cancel independence, context-free fallback, forgetRun.🤖 Generated with Claude Code