Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added logic for Periodically checks if the workers are running, and restarts them if necessary. #331

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import io.temporal.client.WorkflowClient
import io.temporal.client.WorkflowOptions
import io.temporal.serviceclient.WorkflowServiceStubs
import io.temporal.serviceclient.WorkflowServiceStubsOptions
import io.temporal.worker.Worker
import io.temporal.worker.WorkerFactory
import mu.KotlinLogging
import java.time.Instant
import java.time.OffsetDateTime
import java.time.ZoneOffset
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit


/**
Expand All @@ -39,10 +42,24 @@ class WorkflowEngine(private val temporalConfig: TemporalConfig) {

private val healthCheckSystem = HealthCheckTemporalServer(temporalConfig)

private var factory: WorkerFactory? = null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to use lateinit qualifier on factory to avoid nullability and double bangs in the code below.
https://kotlinlang.org/docs/properties.html#late-initialized-properties-and-variables


private val workers = mutableMapOf<String, Worker>()

private val scheduler = Executors.newSingleThreadScheduledExecutor()

init {
initializeTemporalClient()
startWorkerMonitor()
}

private fun initializeTemporalClient() {
runCatching {
service = WorkflowServiceStubs.newServiceStubs(serviceOptions)
client = WorkflowClient.newInstance(service)
factory = WorkerFactory.newInstance(client)
}.onFailure { ex ->
logger.error("Failed to initialize Temporal client: ${ex.message}")
}
}

Expand Down Expand Up @@ -70,14 +87,23 @@ class WorkflowEngine(private val temporalConfig: TemporalConfig) {
CronUtils.checkValid(cronSchedule)

client ?: throw IllegalArgumentException("Workflow client is not established")
factory ?: throw IllegalArgumentException("Worker factory is not initialized")

val factory = WorkerFactory.newInstance(client)
val worker = factory?.newWorker(taskName)
if (worker != null) {
worker.registerWorkflowImplementationTypes(workflowImpl)
worker.registerActivitiesImplementations(activitiesImpl)
workers[taskName] = worker
logger.info("Workflow and Activity registered for task queue: $taskName")
} else {
throw IllegalStateException("Failed to create a worker for task queue: $taskName")
}

val worker = factory.newWorker(taskName)
worker.registerWorkflowImplementationTypes(workflowImpl)
worker.registerActivitiesImplementations(activitiesImpl)
logger.info("Workflow and Activity successfully registered")
factory.start()
if (!factory!!.isStarted) {
factory!!.start()
logger.info("Worker factory started")
}

val workflowOptions = WorkflowOptions.newBuilder()
.setTaskQueue(taskName)
Expand Down Expand Up @@ -131,6 +157,48 @@ class WorkflowEngine(private val temporalConfig: TemporalConfig) {
return getWorkflows(filterOnlyRunning = false)
}

/**
* Periodically checks if the workers are running, and restarts them if necessary.
*/
private fun startWorkerMonitor() {
scheduler.scheduleAtFixedRate({
repeat(workers.keys.size) {
try {
if (factory == null || !factory!!.isStarted) {
logger.warn("Worker factory is not running. Restarting workers...")
restartWorkers()
}
} catch (e: Exception) {
logger.error("Error in worker monitoring: ${e.message}")
}
}
}, 10, 30, TimeUnit.SECONDS)
}

/**
* Restart all registered workers.
*/
private fun restartWorkers() {
initializeTemporalClient()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't the temporal client already initialized in the init block?

workers.forEach { (taskName, _) ->
factory!!.newWorker(taskName)
logger.info("Restarting worker for task queue: $taskName")
}

factory!!.start()
logger.info("Worker factory restarted")
}

/**
* Shutdown workers gracefully.
*/
fun shutdown() {
logger.info("Shutting down Temporal workers...")
factory?.shutdown()
service?.shutdown()
scheduler.shutdown()
}

/**
* Retrieve the workflows, either all or just the ones running.
*
Expand Down