Skip to content

Commit 2dab40d

Browse files
committed
add test, removed proxyActivities overloads instead added withSummaries field to add a summary to an activity
1 parent 7b3aa1c commit 2dab40d

File tree

2 files changed

+206
-70
lines changed

2 files changed

+206
-70
lines changed

packages/test/src/test-integration-workflows.ts

+78-2
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@ import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
88
import { TestWorkflowEnvironment } from '@temporalio/testing';
99
import { CancelReason } from '@temporalio/worker/lib/activity';
1010
import * as workflow from '@temporalio/workflow';
11-
import { defineQuery, defineSignal } from '@temporalio/workflow';
11+
import { defineQuery, defineSignal, setHandler } from '@temporalio/workflow';
1212
import { SdkFlags } from '@temporalio/workflow/lib/flags';
13-
import { ActivityCancellationType, ApplicationFailure, WorkflowExecutionAlreadyStartedError } from '@temporalio/common';
13+
import { ActivityCancellationType, ApplicationFailure, JsonPayloadConverter, WorkflowExecutionAlreadyStartedError } from '@temporalio/common';
1414
import { signalSchedulingWorkflow } from './activities/helpers';
1515
import { activityStartedSignal } from './workflows/definitions';
1616
import * as workflows from './workflows';
1717
import { Context, helpers, makeTestFunction } from './helpers-integration';
1818
import { overrideSdkInternalFlag } from './mock-internal-flags';
1919
import { asSdkLoggerSink, loadHistory, RUN_TIME_SKIPPING_TESTS } from './helpers';
20+
import { temporal } from '@temporalio/proto';
2021

2122
const test = makeTestFunction({
2223
workflowsPath: __filename,
@@ -1303,3 +1304,78 @@ test('Count workflow executions', async (t) => {
13031304
],
13041305
});
13051306
});
1307+
1308+
export async function userMetadataWorkflow(): Promise<string> {
1309+
let done = false;
1310+
const signalDef = defineSignal('done')
1311+
setHandler(signalDef, () => { done = true })
1312+
1313+
// That workflow should call an activity (with summary)
1314+
const { activityWithSummary } = workflow
1315+
.proxyActivities({ scheduleToCloseTimeout: '10s' })
1316+
.withSummaries({
1317+
activityWithSummary: 'activity summary'
1318+
})
1319+
await activityWithSummary()
1320+
// Should have a timer (with summary)
1321+
await workflow.sleep(5, "timer summary")
1322+
// Set current details
1323+
workflow.setCurrentDetails('current wf details');
1324+
// Unblock on var -> query current details (or return)
1325+
await workflow.condition(() => done);
1326+
return workflow.getCurrentDetails();
1327+
}
1328+
1329+
test('User metadata', async (t) => {
1330+
const { createWorker, startWorkflow } = helpers(t);
1331+
const worker = await createWorker({
1332+
activities: {
1333+
async activityWithSummary() {}
1334+
}
1335+
});
1336+
1337+
await worker.runUntil(async () => {
1338+
// Start a workflow with static details
1339+
const handle = await startWorkflow(userMetadataWorkflow, {
1340+
staticSummary: "wf static summary",
1341+
staticDetails: "wf static details"
1342+
});
1343+
// Describe workflow -> static summary, static details
1344+
const desc = await handle.describe();
1345+
t.true(desc.staticSummary === 'wf static summary');
1346+
t.true(desc.staticDetails === 'wf static details');
1347+
1348+
await handle.signal('done');
1349+
const res = await handle.result();
1350+
t.true(res === 'current wf details');
1351+
1352+
// Get history events for timer and activity summaries.
1353+
const resp = await t.context.env.client.workflowService.getWorkflowExecutionHistory(
1354+
{
1355+
namespace: t.context.env.client.options.namespace,
1356+
execution: {
1357+
workflowId: handle.workflowId,
1358+
runId: handle.firstExecutionRunId
1359+
},
1360+
}
1361+
);
1362+
const jsonConverter = new JsonPayloadConverter();
1363+
for (const event of resp.history?.events ?? []) {
1364+
if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) {
1365+
t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.summary ?? {}), 'wf static summary');
1366+
t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.details ?? {}), 'wf static details');
1367+
} else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED) {
1368+
t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.summary ?? {}), 'activity summary');
1369+
} else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_TIMER_STARTED) {
1370+
t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.summary ?? {}), 'timer summary');
1371+
}
1372+
}
1373+
1374+
// Run metadata query -> get current details
1375+
const wfMetadata = await handle.query('__temporal_workflow_metadata') as temporal.api.sdk.v1.IWorkflowMetadata;
1376+
t.deepEqual(wfMetadata.definition?.signalDefinitions?.length, 1);
1377+
t.deepEqual(wfMetadata.definition?.signalDefinitions?.[0].name, 'done');
1378+
t.deepEqual(wfMetadata.definition?.queryDefinitions?.length, 3); // default queries
1379+
t.deepEqual(wfMetadata.currentDetails, 'current wf details');
1380+
});
1381+
});

packages/workflow/src/workflow.ts

+128-68
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ import {
4040
SignalWorkflowInput,
4141
StartChildWorkflowExecutionInput,
4242
TimerInput,
43+
UserMetadata,
44+
WorkflowCommandOptions,
4345
} from './interceptors';
4446
import {
4547
ChildWorkflowCancellationType,
@@ -82,6 +84,27 @@ export function addDefaultWorkflowOptions<T extends Workflow>(
8284
};
8385
}
8486

87+
function addUserMetadata(userMetadata?: UserMetadata): temporal.api.sdk.v1.IUserMetadata | undefined {
88+
if (userMetadata == null) {
89+
return undefined
90+
}
91+
92+
const jsonConverter = new JsonPayloadConverter();
93+
return {
94+
summary: jsonConverter.toPayload(userMetadata.summary),
95+
details: jsonConverter.toPayload(userMetadata.details),
96+
}
97+
}
98+
99+
function addWorkflowCommandOptions(cmdOpts?: WorkflowCommandOptions): object {
100+
if (cmdOpts == null) {
101+
return {}
102+
}
103+
return {
104+
userMetadata: addUserMetadata(cmdOpts.userMetadata)
105+
}
106+
}
107+
85108
/**
86109
* Push a startTimer command into state accumulator and register completion
87110
*/
@@ -113,9 +136,7 @@ function timerNextHandler(input: TimerInput) {
113136
seq: input.seq,
114137
startToFireTimeout: msToTs(input.durationMs),
115138
},
116-
userMetadata: {
117-
summary: new JsonPayloadConverter().toPayload(input.cmdOpts?.userMetadata?.summary)
118-
}
139+
...addWorkflowCommandOptions(input.cmdOpts)
119140
});
120141
activator.completions.timer.set(input.seq, {
121142
resolve,
@@ -199,9 +220,7 @@ function scheduleActivityNextHandler({ options, args, headers, seq, activityType
199220
doNotEagerlyExecute: !(options.allowEagerDispatch ?? true),
200221
versioningIntent: versioningIntentToProto(options.versioningIntent),
201222
},
202-
userMetadata: {
203-
summary: new JsonPayloadConverter().toPayload(cmdOpts?.userMetadata?.summary)
204-
}
223+
...addWorkflowCommandOptions(cmdOpts)
205224
});
206225
activator.completions.activity.set(seq, {
207226
resolve,
@@ -268,9 +287,7 @@ async function scheduleLocalActivityNextHandler({
268287
headers,
269288
cancellationType: encodeActivityCancellationType(options.cancellationType),
270289
},
271-
userMetadata: {
272-
summary: new JsonPayloadConverter().toPayload(cmdOpts?.userMetadata?.summary)
273-
}
290+
...addWorkflowCommandOptions(cmdOpts)
274291
});
275292
activator.completions.activity.set(seq, {
276293
resolve,
@@ -514,51 +531,100 @@ export type ActivityInterfaceFor<T> = {
514531
[K in keyof T]: T[K] extends ActivityFunction ? T[K] : typeof NotAnActivityMethod;
515532
};
516533

534+
/**
535+
* Extends ActivityInterfaceFor to include the withSummaries method
536+
*/
537+
export type ActivityInterfaceWithSummaries<A> = ActivityInterfaceFor<A> & {
538+
/**
539+
* Provide descriptive summaries for activities
540+
* @param summaries Record mapping activity names to their summary descriptions
541+
* @returns A new proxy with the provided summaries
542+
*/
543+
withSummaries(summaries: Record<string, string>): ActivityInterfaceFor<A>;
544+
};
545+
517546
/**
518547
* Configure Activity functions with given {@link ActivityOptions}.
519548
*
520549
* This method may be called multiple times to setup Activities with different options.
521550
*
522551
* @return a {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Proxy | Proxy} for
523552
* which each attribute is a callable Activity function
524-
*/
525-
export function proxyActivities<A = UntypedActivities>(options: ActivityOptions): ActivityInterfaceFor<A>;
526-
527-
/**
528-
* Configure Activity functions with given {@link ActivityOptions} and a summary.
529553
*
530-
* @param options Activity options
531-
* @param summary A description of the activity's purpose, useful for debugging and monitoring
532-
* @return a {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Proxy | Proxy} for
533-
* which each attribute is a callable Activity function with the provided summary
554+
* @example
555+
* ```ts
556+
* import { proxyActivities } from '@temporalio/workflow';
557+
* import * as activities from '../activities';
558+
*
559+
* // Setup Activities from module exports
560+
* const { httpGet, otherActivity } = proxyActivities<typeof activities>({
561+
* startToCloseTimeout: '30 minutes',
562+
* });
563+
*
564+
* // Setup Activities with summaries for better observability
565+
* const {
566+
* httpGet,
567+
* processData,
568+
* saveResults
569+
* } = proxyActivities<typeof activities>({
570+
* startToCloseTimeout: '10m',
571+
* }).withSummaries({
572+
* httpGet: 'Fetches data from external API',
573+
* processData: 'Processes the fetched data',
574+
* saveResults: 'Saves processed results to database'
575+
* });
576+
*
577+
* // Setup Activities from an explicit interface (e.g. when defined by another SDK)
578+
* interface JavaActivities {
579+
* httpGetFromJava(url: string): Promise<string>
580+
* someOtherJavaActivity(arg1: number, arg2: string): Promise<string>;
581+
* }
582+
*
583+
* const {
584+
* httpGetFromJava,
585+
* someOtherJavaActivity
586+
* } = proxyActivities<JavaActivities>({
587+
* taskQueue: 'java-worker-taskQueue',
588+
* startToCloseTimeout: '5m',
589+
* });
590+
*
591+
* export function execute(): Promise<void> {
592+
* const response = await httpGet("http://example.com");
593+
* // ...
594+
* }
595+
* ```
534596
*/
535-
export function proxyActivities<A = UntypedActivities>(
536-
options: ActivityOptions,
537-
summary: string
538-
): ActivityInterfaceFor<A>;
539-
540597
export function proxyActivities<A = UntypedActivities>(
541598
options: ActivityOptions,
542-
summary?: string
543-
): ActivityInterfaceFor<A> {
599+
): ActivityInterfaceWithSummaries<A> {
544600
if (options === undefined) {
545601
throw new TypeError('options must be defined');
546602
}
547603
// Validate as early as possible for immediate user feedback
548604
validateActivityOptions(options);
549-
return new Proxy(
550-
{},
551-
{
552-
get(_, activityType) {
553-
if (typeof activityType !== 'string') {
554-
throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`);
605+
606+
function createActivityProxy(summaries: Record<string, string> = {}): ActivityInterfaceWithSummaries<A> {
607+
return new Proxy({} as ActivityInterfaceWithSummaries<A>, {
608+
get(_, prop) {
609+
if (prop === 'withSummaries') {
610+
return function withSummaries(newSummaries: Record<string, string>): ActivityInterfaceFor<A> {
611+
return createActivityProxy(newSummaries);
612+
};
555613
}
614+
615+
if (typeof prop !== 'string') {
616+
throw new TypeError(`Only strings are supported for Activity types, got: ${String(prop)}`);
617+
}
618+
556619
return function activityProxyFunction(...args: unknown[]): Promise<unknown> {
557-
return scheduleActivity(activityType, args, options, summary);
620+
const summary = summaries[prop];
621+
return scheduleActivity(prop, args, options, summary);
558622
};
559623
},
560-
}
561-
) as any;
624+
});
625+
}
626+
627+
return createActivityProxy();
562628
}
563629

564630
/**
@@ -571,43 +637,37 @@ export function proxyActivities<A = UntypedActivities>(
571637
*
572638
* @see {@link proxyActivities} for examples
573639
*/
574-
export function proxyLocalActivities<A = UntypedActivities>(options: LocalActivityOptions): ActivityInterfaceFor<A>;
575-
576-
/**
577-
* Configure Local Activity functions with given {@link LocalActivityOptions} and a summary.
578-
*
579-
* @param options Local activity options
580-
* @param summary A description of the activity's purpose, useful for debugging and monitoring
581-
* @return a {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Proxy | Proxy}
582-
* for which each attribute is a callable Activity function with the provided summary
583-
*/
584-
export function proxyLocalActivities<A = UntypedActivities>(
585-
options: LocalActivityOptions,
586-
summary: string
587-
): ActivityInterfaceFor<A>;
588-
589640
export function proxyLocalActivities<A = UntypedActivities>(
590-
options: LocalActivityOptions,
591-
summary?: string
592-
): ActivityInterfaceFor<A> {
641+
options: LocalActivityOptions
642+
): ActivityInterfaceWithSummaries<A> {
593643
if (options === undefined) {
594644
throw new TypeError('options must be defined');
595645
}
596646
// Validate as early as possible for immediate user feedback
597647
validateLocalActivityOptions(options);
598-
return new Proxy(
599-
{},
600-
{
601-
get(_, activityType) {
602-
if (typeof activityType !== 'string') {
603-
throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`);
648+
649+
function createLocalActivityProxy(summaries: Record<string, string> = {}): ActivityInterfaceWithSummaries<A> {
650+
return new Proxy({} as ActivityInterfaceWithSummaries<A>, {
651+
get(_, prop) {
652+
if (prop === 'withSummaries') {
653+
return function withSummaries(newSummaries: Record<string, string>): ActivityInterfaceFor<A> {
654+
return createLocalActivityProxy(newSummaries);
655+
};
656+
}
657+
658+
if (typeof prop !== 'string') {
659+
throw new TypeError(`Only strings are supported for Activity types, got: ${String(prop)}`);
604660
}
605-
return function localActivityProxyFunction(...args: unknown[]) {
606-
return scheduleLocalActivity(activityType, args, options, summary);
661+
662+
return function localActivityProxyFunction(...args: unknown[]): Promise<unknown> {
663+
const summary = summaries[prop];
664+
return scheduleLocalActivity(prop, args, options, summary);
607665
};
608666
},
609-
}
610-
) as any;
667+
});
668+
}
669+
670+
return createLocalActivityProxy();
611671
}
612672

613673
// TODO: deprecate this patch after "enough" time has passed
@@ -977,13 +1037,13 @@ export function makeContinueAsNewFunc<F extends Workflow>(
9771037
* @example
9781038
*
9791039
* ```ts
980-
*import { continueAsNew } from '@temporalio/workflow';
981-
import { SearchAttributeType } from '@temporalio/common';
1040+
* import { continueAsNew } from '@temporalio/workflow';
1041+
* import { SearchAttributeType } from '@temporalio/common';
9821042
*
983-
*export async function myWorkflow(n: number): Promise<void> {
984-
* // ... Workflow logic
985-
* await continueAsNew<typeof myWorkflow>(n + 1);
986-
*}
1043+
* export async function myWorkflow(n: number): Promise<void> {
1044+
* // ... Workflow logic
1045+
* await continueAsNew<typeof myWorkflow>(n + 1);
1046+
* }
9871047
* ```
9881048
*/
9891049
export function continueAsNew<F extends Workflow>(...args: Parameters<F>): Promise<never> {

0 commit comments

Comments
 (0)