Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions packages/client/src/async-completion-client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { status as grpcStatus } from '@grpc/grpc-js';
import { ensureTemporalFailure } from '@temporalio/common';
import { ensureTemporalFailure, SerializationContext } from '@temporalio/common';
import { withSerializationContext } from '@temporalio/common/lib/converter/serialization-context';
import { encodeErrorToFailure, encodeToPayloads } from '@temporalio/common/lib/internal-non-workflow';
import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow';
import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers';
Expand Down Expand Up @@ -81,6 +82,7 @@ export interface FullActivityId {
*/
export class AsyncCompletionClient extends BaseClient {
public readonly options: LoadedAsyncCompletionClientOptions;
private context?: SerializationContext;

constructor(options?: AsyncCompletionClientOptions) {
super(options);
Expand All @@ -101,6 +103,20 @@ export class AsyncCompletionClient extends BaseClient {
return this.connection.workflowService;
}

/**
* Return a client instance with all serialization operations bound to `context`.
*/
withContext(context: SerializationContext): AsyncCompletionClient {
const client = new AsyncCompletionClient({
connection: this.connection,
dataConverter: this.dataConverter,
identity: this.options.identity,
namespace: this.options.namespace,
});
client.context = context;
return client;
}

/**
* Transforms grpc errors into well defined TS errors.
*/
Expand All @@ -127,7 +143,10 @@ export class AsyncCompletionClient extends BaseClient {
async complete(fullActivityId: FullActivityId, result: unknown): Promise<void>;

async complete(taskTokenOrFullActivityId: Uint8Array | FullActivityId, result: unknown): Promise<void> {
const payloads = await encodeToPayloads(this.dataConverter, result);
const dataConverter = this.context
? withSerializationContext(this.dataConverter, this.context)
: this.dataConverter;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth making this snippet a getter to use in complete/fail/reportCancellation/heartbeat.

const payloads = await encodeToPayloads(dataConverter, result);
try {
if (taskTokenOrFullActivityId instanceof Uint8Array) {
await this.workflowService.respondActivityTaskCompleted({
Expand Down Expand Up @@ -159,7 +178,10 @@ export class AsyncCompletionClient extends BaseClient {
async fail(fullActivityId: FullActivityId, err: unknown): Promise<void>;

async fail(taskTokenOrFullActivityId: Uint8Array | FullActivityId, err: unknown): Promise<void> {
const failure = await encodeErrorToFailure(this.dataConverter, ensureTemporalFailure(err));
const dataConverter = this.context
? withSerializationContext(this.dataConverter, this.context)
: this.dataConverter;
const failure = await encodeErrorToFailure(dataConverter, ensureTemporalFailure(err));
try {
if (taskTokenOrFullActivityId instanceof Uint8Array) {
await this.workflowService.respondActivityTaskFailed({
Expand Down Expand Up @@ -191,7 +213,10 @@ export class AsyncCompletionClient extends BaseClient {
reportCancellation(fullActivityId: FullActivityId, details?: unknown): Promise<void>;

async reportCancellation(taskTokenOrFullActivityId: Uint8Array | FullActivityId, details?: unknown): Promise<void> {
const payloads = await encodeToPayloads(this.dataConverter, details);
const dataConverter = this.context
? withSerializationContext(this.dataConverter, this.context)
: this.dataConverter;
const payloads = await encodeToPayloads(dataConverter, details);
try {
if (taskTokenOrFullActivityId instanceof Uint8Array) {
await this.workflowService.respondActivityTaskCanceled({
Expand Down Expand Up @@ -223,7 +248,10 @@ export class AsyncCompletionClient extends BaseClient {
heartbeat(fullActivityId: FullActivityId, details?: unknown): Promise<void>;

async heartbeat(taskTokenOrFullActivityId: Uint8Array | FullActivityId, details?: unknown): Promise<void> {
const payloads = await encodeToPayloads(this.dataConverter, details);
const dataConverter = this.context
? withSerializationContext(this.dataConverter, this.context)
: this.dataConverter;
const payloads = await encodeToPayloads(dataConverter, details);
let cancelRequested = false;
let paused = false;
let reset = false;
Expand Down
10 changes: 7 additions & 3 deletions packages/client/src/schedule-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ export class ScheduleClient extends BaseClient {
scheduleId: opts.scheduleId,
schedule: {
spec: encodeScheduleSpec(opts.spec),
action: await encodeScheduleAction(this.dataConverter, opts.action, headers),
action: await encodeScheduleAction(this.dataConverter, this.options.namespace, opts.action, headers),
policies: encodeSchedulePolicies(opts.policies),
state: encodeScheduleState(opts.state),
},
Expand Down Expand Up @@ -298,7 +298,7 @@ export class ScheduleClient extends BaseClient {
scheduleId,
schedule: {
spec: encodeScheduleSpec(opts.spec),
action: await encodeScheduleAction(this.dataConverter, opts.action, header),
action: await encodeScheduleAction(this.dataConverter, this.options.namespace, opts.action, header),
policies: encodeSchedulePolicies(opts.policies),
state: encodeScheduleState(opts.state),
},
Expand Down Expand Up @@ -432,7 +432,11 @@ export class ScheduleClient extends BaseClient {
return {
scheduleId,
spec: decodeScheduleSpec(raw.schedule.spec),
action: await decodeScheduleAction(this.client.dataConverter, raw.schedule.action),
action: await decodeScheduleAction(
this.client.dataConverter,
this.client.options.namespace,
raw.schedule.action
),
memo: await decodeMapFromPayloads(this.client.dataConverter, raw.memo?.fields),
searchAttributes: decodeSearchAttributes(raw.searchAttributes?.indexedFields),
typedSearchAttributes: decodeTypedSearchAttributes(raw.searchAttributes?.indexedFields),
Expand Down
25 changes: 18 additions & 7 deletions packages/client/src/schedule-helpers.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import Long from 'long';
import {
WorkflowSerializationContext,
compilePriority,
compileRetryPolicy,
decodePriority,
decompileRetryPolicy,
extractWorkflowType,
LoadedDataConverter,
} from '@temporalio/common';
import { withSerializationContext } from '@temporalio/common/lib/converter/serialization-context';
import { encodeUserMetadata, decodeUserMetadata } from '@temporalio/common/lib/internal-non-workflow/codec-helpers';
import {
encodeUnifiedSearchAttributes,
Expand Down Expand Up @@ -247,16 +249,19 @@ export function encodeScheduleSpec(spec: ScheduleSpec): temporal.api.schedule.v1

export async function encodeScheduleAction(
dataConverter: LoadedDataConverter,
namespace: string,
action: CompiledScheduleAction,
headers: Headers
): Promise<temporal.api.schedule.v1.IScheduleAction> {
const context: WorkflowSerializationContext = { namespace, workflowId: action.workflowId };
const contextDataConverter = withSerializationContext(dataConverter, context);
return {
startWorkflow: {
workflowId: action.workflowId,
workflowType: {
name: action.workflowType,
},
input: { payloads: await encodeToPayloads(dataConverter, ...action.args) },
input: { payloads: await encodeToPayloads(contextDataConverter, ...action.args) },
taskQueue: {
kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL,
name: action.taskQueue,
Expand All @@ -265,15 +270,15 @@ export async function encodeScheduleAction(
workflowRunTimeout: msOptionalToTs(action.workflowRunTimeout),
workflowTaskTimeout: msOptionalToTs(action.workflowTaskTimeout),
retryPolicy: action.retry ? compileRetryPolicy(action.retry) : undefined,
memo: action.memo ? { fields: await encodeMapToPayloads(dataConverter, action.memo) } : undefined,
memo: action.memo ? { fields: await encodeMapToPayloads(contextDataConverter, action.memo) } : undefined,
searchAttributes:
action.searchAttributes || action.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated
? {
indexedFields: encodeUnifiedSearchAttributes(action.searchAttributes, action.typedSearchAttributes), // eslint-disable-line @typescript-eslint/no-deprecated
}
: undefined,
header: { fields: headers },
userMetadata: await encodeUserMetadata(dataConverter, action.staticSummary, action.staticDetails),
userMetadata: await encodeUserMetadata(contextDataConverter, action.staticSummary, action.staticDetails),
priority: action.priority ? compilePriority(action.priority) : undefined,
},
};
Expand Down Expand Up @@ -321,20 +326,26 @@ export function decodeScheduleSpec(pb: temporal.api.schedule.v1.IScheduleSpec):

export async function decodeScheduleAction(
dataConverter: LoadedDataConverter,
namespace: string,
pb: temporal.api.schedule.v1.IScheduleAction
): Promise<ScheduleDescriptionAction> {
if (pb.startWorkflow) {
const { staticSummary, staticDetails } = await decodeUserMetadata(dataConverter, pb.startWorkflow?.userMetadata);
const workflowId = pb.startWorkflow.workflowId!;
const contextDataConverter = withSerializationContext(dataConverter, { namespace, workflowId });
const { staticSummary, staticDetails } = await decodeUserMetadata(
contextDataConverter,
pb.startWorkflow?.userMetadata
);
return {
type: 'startWorkflow',

workflowId: pb.startWorkflow.workflowId!,
workflowId,

workflowType: pb.startWorkflow.workflowType!.name!,

taskQueue: pb.startWorkflow.taskQueue!.name!,
args: await decodeArrayFromPayloads(dataConverter, pb.startWorkflow.input?.payloads),
memo: await decodeMapFromPayloads(dataConverter, pb.startWorkflow.memo?.fields),
args: await decodeArrayFromPayloads(contextDataConverter, pb.startWorkflow.input?.payloads),
memo: await decodeMapFromPayloads(contextDataConverter, pb.startWorkflow.memo?.fields),
retry: decompileRetryPolicy(pb.startWorkflow.retryPolicy),
searchAttributes: decodeSearchAttributes(pb.startWorkflow.searchAttributes?.indexedFields),
typedSearchAttributes: decodeTypedSearchAttributes(pb.startWorkflow.searchAttributes?.indexedFields),
Expand Down
Loading
Loading