diff --git a/pstatus-notifications-workflow-ktor/src/main/kotlin/Application.kt b/pstatus-notifications-workflow-ktor/src/main/kotlin/Application.kt index f601a72b..9de10838 100644 --- a/pstatus-notifications-workflow-ktor/src/main/kotlin/Application.kt +++ b/pstatus-notifications-workflow-ktor/src/main/kotlin/Application.kt @@ -26,7 +26,7 @@ fun KoinApplication.loadKoinModules(environment: ApplicationEnvironment): KoinAp val serviceTarget = environment.config.tryGetString("temporal.service_target") ?: "localhost:7233" val namespace = environment.config.tryGetString("temporal.namespace") ?: "default" val temporalConfig = TemporalConfig(serviceTarget, namespace) - single { + single(createdAtStart = true) { WorkflowEngine(temporalConfig) } } diff --git a/pstatus-notifications-workflow-ktor/src/main/kotlin/model/WorkflowStatus.kt b/pstatus-notifications-workflow-ktor/src/main/kotlin/model/WorkflowStatus.kt index 84f52ee9..91fa5f84 100644 --- a/pstatus-notifications-workflow-ktor/src/main/kotlin/model/WorkflowStatus.kt +++ b/pstatus-notifications-workflow-ktor/src/main/kotlin/model/WorkflowStatus.kt @@ -8,17 +8,23 @@ import java.time.OffsetDateTime * * @property workflowId String * @property taskName String + * @property taskQueue String * @property description String * @property status String + * @property workerAttached Boolean * @property schedule CronSchedule + * @property workflowImplClassName String? * @constructor */ data class WorkflowStatus( val workflowId: String, val taskName: String, + val taskQueue: String, val description: String, + val workerAttached: Boolean, val status: String, - val schedule: CronSchedule + val schedule: CronSchedule, + val workflowImplClassName: String? ) /** diff --git a/pstatus-notifications-workflow-ktor/src/main/kotlin/service/WorkflowStatusService.kt b/pstatus-notifications-workflow-ktor/src/main/kotlin/service/WorkflowStatusService.kt index ce9257e3..abfd6007 100644 --- a/pstatus-notifications-workflow-ktor/src/main/kotlin/service/WorkflowStatusService.kt +++ b/pstatus-notifications-workflow-ktor/src/main/kotlin/service/WorkflowStatusService.kt @@ -1,6 +1,5 @@ package gov.cdc.ocio.processingnotifications.service -import gov.cdc.ocio.processingnotifications.model.WorkflowStatus import gov.cdc.ocio.processingnotifications.temporal.WorkflowEngine import org.koin.core.component.KoinComponent import org.koin.core.component.inject @@ -13,9 +12,18 @@ class WorkflowStatusService : KoinComponent { /** * Get and return all the Temporal workflows. */ - fun getAllWorkflows(): List { + fun getAllWorkflows(): List> { try { - return workflowEngine.getAllWorkflows() + return workflowEngine.getAllWorkflows().map { + // Only provide a subset of the workflow status to callers as the other data is superfluous. + mapOf( + "workflowId" to it.workflowId, + "taskName" to it.taskName, + "description" to it.description, + "status" to it.status, + "schedule" to it.schedule + ) + } } catch (e: Exception) { throw Exception("Error occurred while checking for workflows: ${e.message}") } diff --git a/pstatus-notifications-workflow-ktor/src/main/kotlin/temporal/WorkflowEngine.kt b/pstatus-notifications-workflow-ktor/src/main/kotlin/temporal/WorkflowEngine.kt index 038ed9b9..0bffb4a4 100644 --- a/pstatus-notifications-workflow-ktor/src/main/kotlin/temporal/WorkflowEngine.kt +++ b/pstatus-notifications-workflow-ktor/src/main/kotlin/temporal/WorkflowEngine.kt @@ -1,5 +1,6 @@ package gov.cdc.ocio.processingnotifications.temporal +import gov.cdc.ocio.processingnotifications.activity.NotificationActivitiesImpl import gov.cdc.ocio.processingnotifications.config.TemporalConfig import gov.cdc.ocio.processingnotifications.model.CronSchedule import gov.cdc.ocio.processingnotifications.model.WorkflowStatus @@ -8,6 +9,9 @@ import io.temporal.api.enums.v1.WorkflowExecutionStatus import io.temporal.api.workflow.v1.WorkflowExecutionInfo import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest import io.temporal.api.workflowservice.v1.ListWorkflowExecutionsRequest +import io.temporal.api.workflowservice.v1.DescribeTaskQueueRequest +import io.temporal.api.taskqueue.v1.TaskQueue +import io.temporal.api.enums.v1.TaskQueueType import io.temporal.client.WorkflowClient import io.temporal.client.WorkflowClientOptions import io.temporal.client.WorkflowOptions @@ -18,15 +22,30 @@ import mu.KotlinLogging import java.time.Instant import java.time.OffsetDateTime import java.time.ZoneOffset +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit /** - * Workflow engine class which creates a grpC client instance of the temporal server - * using which it registers the workflow and the activity implementation - * Also,using the workflow options the client creates a new workflow stub - * Note : CRON expression is used to set the schedule + * Workflow engine class which creates a grpC client instance of the temporal server + * using which it registers the workflow and the activity implementation + * Also,using the workflow options the client creates a new workflow stub + * Note: CRON expression is used to set the schedule + * + * @property temporalConfig TemporalConfig + * @property logger KLogger + * @property serviceOptions (WorkflowServiceStubsOptions..WorkflowServiceStubsOptions?) + * @property service WorkflowServiceStubs + * @property client WorkflowClient + * @property factory WorkerFactory + * @property workers MutableMap + * @property scheduler [@EnhancedForWarnings(ScheduledExecutorService)] (ScheduledExecutorService..ScheduledExecutorService?) + * @property healthCheckSystem HealthCheckTemporalServer + * @constructor */ -class WorkflowEngine(private val temporalConfig: TemporalConfig) { +class WorkflowEngine( + private val temporalConfig: TemporalConfig +) { private val logger = KotlinLogging.logger {} @@ -34,19 +53,32 @@ class WorkflowEngine(private val temporalConfig: TemporalConfig) { .setTarget(temporalConfig.serviceTarget) .build() - private var service: WorkflowServiceStubs? = null + private val clientOptions = WorkflowClientOptions.newBuilder() + .setNamespace(temporalConfig.namespace) + .build() + + private lateinit var service: WorkflowServiceStubs + + private lateinit var client: WorkflowClient + + private lateinit var factory: WorkerFactory - private var client: WorkflowClient? = null + private val scheduler = Executors.newSingleThreadScheduledExecutor() private val healthCheckSystem = HealthCheckTemporalServer(temporalConfig) init { + initializeTemporalClient() + startWorkerMonitor() + } + + private fun initializeTemporalClient() { runCatching { service = WorkflowServiceStubs.newServiceStubs(serviceOptions) - - client = WorkflowClient.newInstance(service, WorkflowClientOptions.newBuilder() - .setNamespace(temporalConfig.namespace) - .build()) + client = WorkflowClient.newInstance(service, clientOptions) + factory = WorkerFactory.newInstance(client) + }.onFailure { ex -> + logger.error("Failed to initialize Temporal client: ${ex.message}") } } @@ -54,18 +86,18 @@ class WorkflowEngine(private val temporalConfig: TemporalConfig) { * Sets up a temporal workflow. * * @param description String - * @param taskName String + * @param taskQueue String * @param cronSchedule String * @param workflowImpl Class * @param activitiesImpl T2 * @param workflowImplInterface Class * @return T3? - * @throws IllegalArgumentException + * @throws IllegalStateException */ - @Throws(IllegalArgumentException::class) + @Throws(IllegalStateException::class) fun setupWorkflow( description: String, - taskName: String, + taskQueue: String, cronSchedule: String, workflowImpl: Class, activitiesImpl: T2, @@ -73,23 +105,31 @@ class WorkflowEngine(private val temporalConfig: TemporalConfig) { ): T3 { CronUtils.checkValid(cronSchedule) - client ?: throw IllegalArgumentException("Workflow client is not established") + val worker = factory.newWorker(taskQueue) + worker?.let { + it.registerWorkflowImplementationTypes(workflowImpl) + it.registerActivitiesImplementations(activitiesImpl) + logger.info("Workflow and Activity registered for task queue: $taskQueue") + } ?: error("Failed to create a worker for task queue: $taskQueue") - val factory = WorkerFactory.newInstance(client) - - val worker = factory.newWorker(taskName) - worker.registerWorkflowImplementationTypes(workflowImpl) - worker.registerActivitiesImplementations(activitiesImpl) logger.info("Workflow and Activity successfully registered") - factory.start() + if (!factory.isStarted) { + factory.start() + logger.info("Worker factory started") + } val workflowOptions = WorkflowOptions.newBuilder() - .setTaskQueue(taskName) - .setMemo(mapOf("description" to description)) + .setTaskQueue(taskQueue) + .setMemo( + mapOf( + "description" to description, + "workflowImplClassName" to workflowImpl.name + ) + ) .setCronSchedule(cronSchedule) // Cron schedule: 15 5 * * 1-5 - Every week day at 5:15a .build() - val workflow = client!!.newWorkflowStub( + val workflow = client.newWorkflowStub( workflowImplInterface, workflowOptions ) @@ -105,7 +145,7 @@ class WorkflowEngine(private val temporalConfig: TemporalConfig) { fun cancelWorkflow(workflowId: String) { try { // Retrieve the workflow by its ID - val workflow = client?.newUntypedWorkflowStub(workflowId) + val workflow = client.newUntypedWorkflowStub(workflowId) // Cancel the workflow workflow?.cancel() @@ -135,6 +175,46 @@ class WorkflowEngine(private val temporalConfig: TemporalConfig) { return getWorkflows(filterOnlyRunning = false) } + /** + * Periodically checks if the workers are running, and restarts them if necessary. + */ + private fun startWorkerMonitor() { + scheduler.scheduleAtFixedRate({ + runCatching { + checkWorkersAttached() + }.onFailure { + logger.error("Error in worker monitoring: ${it.message}") + } + }, 10, 30, TimeUnit.SECONDS) + } + + /** + * Restart all registered workers. + */ + private fun checkWorkersAttached() { + val runningWorkflows = getRunningWorkflows() + runningWorkflows.forEach { workflow -> + if (!workflow.workerAttached) { + val taskQueue = workflow.taskQueue + logger.warn("Restarting worker for task queue: $taskQueue") + val workflowImplClassName = workflow.workflowImplClassName + workflowImplClassName ?: error("Unknown workflow implementation, can't restart worker") + val workflowImpl = Class.forName(workflowImplClassName) + restartWorker(taskQueue, workflowImpl) + } + } + } + + /** + * Shutdown workers gracefully. + */ + fun shutdown() { + logger.info("Shutting down Temporal workers...") + factory.shutdown() + service.shutdown() + scheduler.shutdown() + } + /** * Retrieve the workflows, either all or just the ones running. * @@ -143,7 +223,7 @@ class WorkflowEngine(private val temporalConfig: TemporalConfig) { */ private fun getWorkflows(filterOnlyRunning: Boolean): List { val query = when (filterOnlyRunning) { - true -> "ExecutionStatus='RUNNING'" // Filter for running workflows + true -> "ExecutionStatus = 'Running'" // Filter for running workflows false -> "" } val request = ListWorkflowExecutionsRequest.newBuilder() @@ -152,7 +232,7 @@ class WorkflowEngine(private val temporalConfig: TemporalConfig) { .build() // Fetch workflows - val response = service?.blockingStub()?.listWorkflowExecutions(request) + val response = service.blockingStub()?.listWorkflowExecutions(request) val results = response?.executionsList?.map { executionInfo -> // Log workflow executions @@ -164,10 +244,14 @@ class WorkflowEngine(private val temporalConfig: TemporalConfig) { } val nextExecution = runCatching { CronUtils.nextExecution(cronScheduleRaw) }.getOrNull() val taskName = executionInfo.type.name + val taskQueue = executionInfo.taskQueue val ts = executionInfo.executionTime val lastRun = OffsetDateTime.ofInstant(Instant.ofEpochSecond(ts.seconds, ts.nanos.toLong()), ZoneOffset.UTC) val descPayload = runCatching { executionInfo.memo.getFieldsOrThrow("description") } val description = descPayload.getOrNull()?.data?.toStringUtf8()?.replace("\"", "") ?: "unknown" + val workflowImplClassNamePayload = runCatching { executionInfo.memo.getFieldsOrThrow("workflowImplClassName") } + val workflowImplClassName = workflowImplClassNamePayload.getOrNull()?.data?.toStringUtf8()?.replace("\"", "") + val workerAttached = workerHasPoller(executionInfo.taskQueue) val cronSchedule = CronSchedule( cron = cronScheduleRaw, @@ -180,15 +264,69 @@ class WorkflowEngine(private val temporalConfig: TemporalConfig) { WorkflowStatus( executionInfo.execution.workflowId, taskName, + taskQueue, description, + workerAttached, executionInfo.status.name, - cronSchedule + cronSchedule, + workflowImplClassName ) } return results ?: listOf() } + /** + * Determines if the task queue provided has an active poller. If it doesn't, it means there is no worker + * attached. + * + * @param taskQueue String + * @return Boolean - Returns true if a worker is attached to this queue, false otherwise. + */ + private fun workerHasPoller(taskQueue: String): Boolean { + val describeTaskQueueResponse = service.blockingStub() + .describeTaskQueue( + DescribeTaskQueueRequest.newBuilder() + .setNamespace(temporalConfig.namespace) + .setTaskQueue( + TaskQueue.newBuilder().setName(taskQueue).build() + ) + .setTaskQueueType(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW) + .build() + ) + + val pollers = describeTaskQueueResponse.pollersList + logger.info("Number of active workers: ${pollers.size}") + + if (pollers.isEmpty()) { + logger.info("No workers are currently polling this task queue!") + } + return pollers.isNotEmpty() + } + + /** + * Restart a worker with the provided task queue and workflow implementation class. + * + * @param taskQueue String + * @param workflowImpl Class<*> + */ + private fun restartWorker(taskQueue: String, workflowImpl: Class<*>) { + // Create a Worker Factory + val factory = WorkerFactory.newInstance(client) + + // Register a worker on the same task queue + val worker = factory.newWorker(taskQueue) + + // Register workflow and activities + worker.registerWorkflowImplementationTypes(workflowImpl) + worker.registerActivitiesImplementations(NotificationActivitiesImpl()) + + // Start the worker + factory.start() + + logger.info("Worker started and polling for tasks...") + } + /** * Get the raw cron schedule string for a given workflow execution info. * @@ -200,15 +338,17 @@ class WorkflowEngine(private val temporalConfig: TemporalConfig) { return null val req = GetWorkflowExecutionHistoryRequest.newBuilder() - .setNamespace(client?.options?.namespace) + .setNamespace(client.options?.namespace) .setExecution(wfExecInfo.execution) .build() - val res = service?.blockingStub()?.getWorkflowExecutionHistory(req) - - val firstHistoryEvent = res?.history?.eventsList?.get(0) - - return firstHistoryEvent?.workflowExecutionStartedEventAttributes?.cronSchedule + runCatching { + service.blockingStub()?.getWorkflowExecutionHistory(req) + }.onSuccess { + val firstHistoryEvent = it?.history?.eventsList?.get(0) + return firstHistoryEvent?.workflowExecutionStartedEventAttributes?.cronSchedule + } + return null } fun doHealthCheck() = healthCheckSystem.doHealthCheck()