diff --git a/activities-cancellation-heartbeating/package.json b/activities-cancellation-heartbeating/package.json index c57d4857..cdeb1bc9 100644 --- a/activities-cancellation-heartbeating/package.json +++ b/activities-cancellation-heartbeating/package.json @@ -28,13 +28,16 @@ "@temporalio/workflow": "^1.11.6" }, "devDependencies": { + "@temporalio/testing": "^1.11.7", "@tsconfig/node18": "^18.2.4", + "@types/mocha": "^10.0.10", "@types/node": "^22.9.1", "@typescript-eslint/eslint-plugin": "^8.18.0", "@typescript-eslint/parser": "^8.18.0", "eslint": "^8.57.1", "eslint-config-prettier": "^9.1.0", "eslint-plugin-deprecation": "^3.0.0", + "mocha": "^11.1.0", "nodemon": "^3.1.7", "prettier": "^3.4.2", "ts-node": "^10.9.2", diff --git a/activities-cancellation-heartbeating/src/activities-test.ts b/activities-cancellation-heartbeating/src/activities-test.ts new file mode 100644 index 00000000..11db2ebe --- /dev/null +++ b/activities-cancellation-heartbeating/src/activities-test.ts @@ -0,0 +1,52 @@ +import { MockActivityEnvironment } from '@temporalio/testing'; +import * as assert from 'node:assert'; +import { CancelledFailure } from '@temporalio/workflow'; +import { ActivityExecutionDetails, myLongRunningActivity } from './activities'; + +describe('LongRunningActivity Test', function() { + + describe('when background heartbeating', function() { + let testEnv: MockActivityEnvironment + beforeEach(async function() { + testEnv = new MockActivityEnvironment({ + startToCloseTimeoutMs: 2000, + heartbeatTimeoutMs: 200, + heartbeatDetails: 0, + }) + }) + it('should sent details back', async function() { + const actual: ActivityExecutionDetails = await testEnv.run(myLongRunningActivity) + assert.equal(actual.heartbeatsReported, 18) + }) + }) + describe('when background heartbeating received cancellation notice', function() { + let testEnv: MockActivityEnvironment + beforeEach(async function() { + testEnv = new MockActivityEnvironment({ + startToCloseTimeoutMs: 2000, + heartbeatTimeoutMs: 200, + heartbeatDetails: 0, + }) + }) + it('should sent details back', async function() { + + const cancelPromise= async (): Promise => { + return new Promise(resolve => { + setTimeout(() => {}, 200) + testEnv.cancel('verify CancelledFailure bubbles up') + resolve() + }) + } + const runPromise= async (): Promise => { + return await testEnv.run(myLongRunningActivity) + } + + const actual = await Promise.allSettled([cancelPromise(), runPromise()]) + assert.ok(actual[1]) + assert.equal("fulfilled", actual[1].status) + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + assert.ok(actual[1].value.err instanceof CancelledFailure) + }) + }) +}) \ No newline at end of file diff --git a/activities-cancellation-heartbeating/src/activities.ts b/activities-cancellation-heartbeating/src/activities.ts index 956b11b7..4161ee5c 100644 --- a/activities-cancellation-heartbeating/src/activities.ts +++ b/activities-cancellation-heartbeating/src/activities.ts @@ -1,5 +1,13 @@ // @@@SNIPSTART typescript-activity-fake-progress -import { activityInfo, log, sleep, CancelledFailure, heartbeat } from '@temporalio/activity'; +import { + activityInfo, + log, + sleep, + CancelledFailure, + heartbeat, + Context, + ApplicationFailure +} from '@temporalio/activity'; export async function fakeProgress(sleepIntervalMs = 1000): Promise { try { @@ -22,3 +30,90 @@ export async function fakeProgress(sleepIntervalMs = 1000): Promise { } } // @@@SNIPEND + +// @@@SNIPSTART typescript-activity-long-running +export interface ActivityExecutionDetails { + heartbeatsReported: number + mainOperationResult: string | undefined + err: Error | undefined +} +export async function myLongRunningActivity(): Promise { + const ctx = Context.current() + const details: ActivityExecutionDetails = { + heartbeatsReported: ctx.info.heartbeatDetails || 0, + mainOperationResult: undefined, + err: undefined + } + const logger = ctx.log + const heartbeatTimeoutMs = ctx.info.heartbeatTimeoutMs + if(!heartbeatTimeoutMs) { + throw ApplicationFailure.nonRetryable("heartbeat is required", "ERR_MISSING_HEARTBEAT_TIMEOUT") + } + const heartbeatInterval = heartbeatTimeoutMs / 2 + + // mainOperation is the "real" work we are doing in the Activity + async function mainOperation(): Promise { + const successMessage = 'operation successful' + // use startToClose as basis for overall ops timeouts + const timeout = ctx.info.startToCloseTimeoutMs - 100 + + return new Promise((resolve) => { + logger.debug('simulating operation for (ms)', {timeout}) + // this simulates some lengthy operation like a report generation or API call + // we avoid using `sleep` so that the operation won't receive a CancelledFailure directly + setTimeout(() => { + // capture the operation result + details.mainOperationResult = successMessage + resolve(successMessage) + }, timeout) + }) + } + // doHeartbeat creates the regular looped heartbeat we need + async function doHeartbeat():Promise { + // noinspection InfiniteLoopJS + logger.debug('heartbeating every (ms)',{heartbeatInterval}) + return new Promise((resolve, reject) => { + return (async function periodic() { + while(!details.err && !details.mainOperationResult) { + try { + // this will return a CancelledFailure if the server replies as such + // since we arent catching it it will bubble up to the main operation + await sleep(heartbeatInterval) + // you can pass in details to the heartbeat if you like to preserve "where" you are + heartbeat(++details.heartbeatsReported) + } catch (err) { + // demonstrate how to test for cancellation + if(err instanceof CancelledFailure) { + logger.error('cancelling heartbeat due to cancellation', {err}) + } + logger.error('heartbeat received failure', {err}) + reject(err) + // exit loop + throw err + } + } + })() + }) + } + // _race_ the heartbeat and mainOperation so that any failure from either mainOperation or heartbeat to arrive + // will resolve the Promise collection. This is important for the CancelledFailure to be handled appropriately. + // Cancellation of the process inside the mainOperation is outside the scope of this sample, but + // you might need to abort processes explicitly upon Cancellation from Workflow. + // For example, with https://developer.mozilla.org/en-US/docs/Web/API/AbortController + try { + const result: string | void = await Promise.race([doHeartbeat(), mainOperation()]) + logger.debug('received result', {result}) + } catch (err) { + logger.error('Activity received error', {err}) + details.err = err as Error + if(err instanceof CancelledFailure) { + // we could rethrow the error here or ignore it (as we have done here) + // throw it. log it. sorted. :) + } + + } + return details +} + + +// @@@SNIPEND \ No newline at end of file