From 5dd603f8ce731f4208298f492f3e1b162cf50932 Mon Sep 17 00:00:00 2001 From: Mike Nichols Date: Wed, 19 Mar 2025 14:54:10 -0400 Subject: [PATCH 1/2] add background heartbeating example --- .../package.json | 3 + .../src/activities-test.ts | 61 ++++++++++++ .../src/activities.ts | 98 ++++++++++++++++++- 3 files changed, 161 insertions(+), 1 deletion(-) create mode 100644 activities-cancellation-heartbeating/src/activities-test.ts diff --git a/activities-cancellation-heartbeating/package.json b/activities-cancellation-heartbeating/package.json index c57d4857..cdeb1bc9 100644 --- a/activities-cancellation-heartbeating/package.json +++ b/activities-cancellation-heartbeating/package.json @@ -28,13 +28,16 @@ "@temporalio/workflow": "^1.11.6" }, "devDependencies": { + "@temporalio/testing": "^1.11.7", "@tsconfig/node18": "^18.2.4", + "@types/mocha": "^10.0.10", "@types/node": "^22.9.1", "@typescript-eslint/eslint-plugin": "^8.18.0", "@typescript-eslint/parser": "^8.18.0", "eslint": "^8.57.1", "eslint-config-prettier": "^9.1.0", "eslint-plugin-deprecation": "^3.0.0", + "mocha": "^11.1.0", "nodemon": "^3.1.7", "prettier": "^3.4.2", "ts-node": "^10.9.2", diff --git a/activities-cancellation-heartbeating/src/activities-test.ts b/activities-cancellation-heartbeating/src/activities-test.ts new file mode 100644 index 00000000..6a1cb6ed --- /dev/null +++ b/activities-cancellation-heartbeating/src/activities-test.ts @@ -0,0 +1,61 @@ +import { MockActivityEnvironment } from '@temporalio/testing'; +import * as assert from 'node:assert'; +import { ApplicationFailure, Context, heartbeat, sleep } from '@temporalio/activity'; +import { CancelledFailure } from '@temporalio/workflow'; +import * as console from 'node:console'; +import { ActivityExecutionDetails, myLongRunningActivity } from './activities'; + +describe('MyWorkflowActivities', function() { + + describe('when background heartbeating', function() { + let testEnv: MockActivityEnvironment + beforeEach(async function() { + testEnv = new MockActivityEnvironment({ + startToCloseTimeoutMs: 2000, + heartbeatTimeoutMs: 200, + heartbeatDetails: 0, + }) + }) + it('should sent details back', async function() { + let actual: ActivityExecutionDetails = { + heartbeatsReported: 0, + mainOperationResult: undefined, + err: undefined, + } + + actual = await testEnv.run(myLongRunningActivity) + assert.equal(actual.heartbeatsReported, 18) + }) + }) + describe('when background heartbeating received cancellation notice', function() { + let testEnv: MockActivityEnvironment + beforeEach(async function() { + testEnv = new MockActivityEnvironment({ + startToCloseTimeoutMs: 2000, + heartbeatTimeoutMs: 200, + heartbeatDetails: 0, + }) + }) + it('should sent details back', async function() { + + const cancelPromise= async (): Promise => { + return new Promise(resolve => { + setTimeout(() => {}, 200) + testEnv.cancel('verify CancelledFailure bubbles up') + resolve() + }) + } + const runPromise= async (): Promise => { + return await testEnv.run(myLongRunningActivity) + } + interface fulfilled { + value: ActivityExecutionDetails + } + const actual = await Promise.allSettled([cancelPromise(), runPromise()]) + assert.ok(actual[1]) + assert.equal("fulfilled", actual[1].status) + // @ts-ignore + assert.ok(actual[1].value.err instanceof CancelledFailure) + }) + }) +}) \ No newline at end of file diff --git a/activities-cancellation-heartbeating/src/activities.ts b/activities-cancellation-heartbeating/src/activities.ts index 956b11b7..74ed29e2 100644 --- a/activities-cancellation-heartbeating/src/activities.ts +++ b/activities-cancellation-heartbeating/src/activities.ts @@ -1,5 +1,14 @@ // @@@SNIPSTART typescript-activity-fake-progress -import { activityInfo, log, sleep, CancelledFailure, heartbeat } from '@temporalio/activity'; +import { + activityInfo, + log, + sleep, + CancelledFailure, + heartbeat, + Context, + ApplicationFailure +} from '@temporalio/activity'; +import console from 'node:console'; export async function fakeProgress(sleepIntervalMs = 1000): Promise { try { @@ -22,3 +31,90 @@ export async function fakeProgress(sleepIntervalMs = 1000): Promise { } } // @@@SNIPEND + +// @@@SNIPSTART typescript-activity-long-running +export interface ActivityExecutionDetails { + heartbeatsReported: number + mainOperationResult: string | undefined + err: Error | undefined +} +export async function myLongRunningActivity(): Promise { + const ctx = Context.current() + const details: ActivityExecutionDetails = { + heartbeatsReported: ctx.info.heartbeatDetails || 0, + mainOperationResult: undefined, + err: undefined + } + const logger = ctx.log + const heartbeatTimeoutMs = ctx.info.heartbeatTimeoutMs + if(!heartbeatTimeoutMs) { + throw ApplicationFailure.nonRetryable("heartbeat is required", "ERR_MISSING_HEARTBEAT_TIMEOUT") + } + const heartbeatInterval = heartbeatTimeoutMs / 2 + + // mainOperation is the "real" work we are doing in the Activity + async function mainOperation(): Promise { + const successMessage = 'operation successful' + // use startToClose as basis for overall ops timeouts + const timeout = ctx.info.startToCloseTimeoutMs - 100 + + return new Promise((resolve, reject) => { + logger.debug('simulating operation for (ms)', {timeout}) + // this simulates some lengthy operation like a report generation or API call + // we avoid using `sleep` so that the operation won't receive a CancelledFailure directly + setTimeout(() => { + // capture the operation result + details.mainOperationResult = successMessage + resolve(successMessage) + }, timeout) + }) + } + // doHeartbeat creates the regular looped heartbeat we need + async function doHeartbeat():Promise { + // noinspection InfiniteLoopJS + logger.debug('heartbeating every (ms)',{heartbeatInterval}) + return new Promise((resolve, reject) => { + return (async function periodic() { + while(!details.err && !details.mainOperationResult) { + try { + // this will return a CancelledFailure if the server replies as such + // since we arent catching it it will bubble up to the main operation + await sleep(heartbeatInterval) + // you can pass in details to the heartbeat if you like to preserve "where" you are + heartbeat(++details.heartbeatsReported) + } catch (err) { + // demonstrate how to test for cancellation + if(err instanceof CancelledFailure) { + logger.error('cancelling heartbeat due to cancellation', {err}) + } + logger.error('heartbeat received failure', {err}) + reject(err) + // exit loop + throw err + } + } + })() + }) + } + // _race_ the heartbeat and mainOperation so that any failure from either mainOperation or heartbeat to arrive + // will resolve the Promise collection. This is important for the CancelledFailure to be handled appropriately. + // Cancellation of the process inside the mainOperation is outside the scope of this sample, but + // you might need to abort processes explicitly upon Cancellation from Workflow. + // For example, with https://developer.mozilla.org/en-US/docs/Web/API/AbortController + try { + const result: string | void = await Promise.race([doHeartbeat(), mainOperation()]) + logger.debug('received result', {result}) + } catch (err) { + logger.error('Activity received error', {err}) + details.err = err as Error + if(err instanceof CancelledFailure) { + // we could rethrow the error here or ignore it (as we have done here) + // throw it. log it. sorted. :) + } + + } + return details +} + + +// @@@SNIPEND \ No newline at end of file From 210188ad3b3ccd0826cb1793092b9da5a60fbddd Mon Sep 17 00:00:00 2001 From: Mike Nichols Date: Wed, 19 Mar 2025 15:13:50 -0400 Subject: [PATCH 2/2] clean up files --- .../src/activities-test.ts | 17 ++++------------- .../src/activities.ts | 3 +-- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/activities-cancellation-heartbeating/src/activities-test.ts b/activities-cancellation-heartbeating/src/activities-test.ts index 6a1cb6ed..11db2ebe 100644 --- a/activities-cancellation-heartbeating/src/activities-test.ts +++ b/activities-cancellation-heartbeating/src/activities-test.ts @@ -1,11 +1,9 @@ import { MockActivityEnvironment } from '@temporalio/testing'; import * as assert from 'node:assert'; -import { ApplicationFailure, Context, heartbeat, sleep } from '@temporalio/activity'; import { CancelledFailure } from '@temporalio/workflow'; -import * as console from 'node:console'; import { ActivityExecutionDetails, myLongRunningActivity } from './activities'; -describe('MyWorkflowActivities', function() { +describe('LongRunningActivity Test', function() { describe('when background heartbeating', function() { let testEnv: MockActivityEnvironment @@ -17,13 +15,7 @@ describe('MyWorkflowActivities', function() { }) }) it('should sent details back', async function() { - let actual: ActivityExecutionDetails = { - heartbeatsReported: 0, - mainOperationResult: undefined, - err: undefined, - } - - actual = await testEnv.run(myLongRunningActivity) + const actual: ActivityExecutionDetails = await testEnv.run(myLongRunningActivity) assert.equal(actual.heartbeatsReported, 18) }) }) @@ -48,12 +40,11 @@ describe('MyWorkflowActivities', function() { const runPromise= async (): Promise => { return await testEnv.run(myLongRunningActivity) } - interface fulfilled { - value: ActivityExecutionDetails - } + const actual = await Promise.allSettled([cancelPromise(), runPromise()]) assert.ok(actual[1]) assert.equal("fulfilled", actual[1].status) + // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore assert.ok(actual[1].value.err instanceof CancelledFailure) }) diff --git a/activities-cancellation-heartbeating/src/activities.ts b/activities-cancellation-heartbeating/src/activities.ts index 74ed29e2..4161ee5c 100644 --- a/activities-cancellation-heartbeating/src/activities.ts +++ b/activities-cancellation-heartbeating/src/activities.ts @@ -8,7 +8,6 @@ import { Context, ApplicationFailure } from '@temporalio/activity'; -import console from 'node:console'; export async function fakeProgress(sleepIntervalMs = 1000): Promise { try { @@ -58,7 +57,7 @@ export async function myLongRunningActivity(): Promise // use startToClose as basis for overall ops timeouts const timeout = ctx.info.startToCloseTimeoutMs - 100 - return new Promise((resolve, reject) => { + return new Promise((resolve) => { logger.debug('simulating operation for (ms)', {timeout}) // this simulates some lengthy operation like a report generation or API call // we avoid using `sleep` so that the operation won't receive a CancelledFailure directly