Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 98 additions & 23 deletions crates/client/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,28 +213,48 @@ impl ClientWorkerSetImpl {
Ok(())
}

fn unregister(
&mut self,
worker_instance_key: Uuid,
) -> Result<Arc<dyn ClientWorker + Send + Sync>, anyhow::Error> {
let worker = self
.all_workers
.remove(&worker_instance_key)
.ok_or_else(|| {
anyhow::anyhow!("Worker with worker_instance_key {worker_instance_key} not found")
})?;
/// Slot provider should be unregistered at the beginning of worker shutdown, in order to disable
/// eager workflow start.
fn unregister_slot_provider(&mut self, worker_instance_key: Uuid) -> Result<(), anyhow::Error> {
let worker = self.all_workers.get(&worker_instance_key).ok_or_else(|| {
anyhow::anyhow!("Worker not in all_workers during slot provider unregister")
})?;

let slot_key = SlotKey::new(
worker.namespace().to_string(),
worker.task_queue().to_string(),
);

if let Some(slot_vec) = self.slot_providers.get_mut(&slot_key) {
slot_vec.retain(|info| info.worker_id != worker_instance_key);
if slot_vec.is_empty() {
self.slot_providers.remove(&slot_key);
}
}
Ok(())
}

fn finalize_unregister(
&mut self,
worker_instance_key: Uuid,
) -> Result<Arc<dyn ClientWorker + Send + Sync>, anyhow::Error> {
if let Some(worker) = self.all_workers.get(&worker_instance_key)
&& let Some(slot_vec) = self.slot_providers.get(&SlotKey::new(
worker.namespace().to_string(),
worker.task_queue().to_string(),
))
&& slot_vec
.iter()
.any(|info| info.worker_id == worker_instance_key)
{
return Err(anyhow::anyhow!(
"Worker still in slot_providers during finalize"
));
}

let worker = self
.all_workers
.remove(&worker_instance_key)
.ok_or_else(|| anyhow::anyhow!("Worker not found in all_workers"))?;

if let Some(w) = self.shared_worker.get_mut(worker.namespace()) {
let (callback, is_empty) = w.unregister_callback(worker.worker_instance_key());
Expand Down Expand Up @@ -323,12 +343,24 @@ impl ClientWorkerSet {
.register(worker, skip_client_worker_set_check)
}

/// Unregisters a local worker, typically when that worker starts shutdown.
pub fn unregister_worker(
/// Disables Eager Workflow Start for this worker. This must be called before
/// `finalize_unregister`, otherwise `finalize_unregister` will return an err.
pub fn unregister_slot_provider(&self, worker_instance_key: Uuid) -> Result<(), anyhow::Error> {
self.worker_manager
.write()
.unregister_slot_provider(worker_instance_key)
}

/// Finalizes unregistering of worker from client. This must be called at the end of worker
/// shutdown in order to finalize shutdown for worker heartbeat properly. Must call after
/// `unregister_slot_provider`, otherwise an err will be returned.
pub fn finalize_unregister(
&self,
worker_instance_key: Uuid,
) -> Result<Arc<dyn ClientWorker + Send + Sync>, anyhow::Error> {
self.worker_manager.write().unregister(worker_instance_key)
self.worker_manager
.write()
.finalize_unregister(worker_instance_key)
}

/// Returns the worker grouping key, which is unique for each worker.
Expand Down Expand Up @@ -666,9 +698,12 @@ mod tests {
assert_eq!(3, manager.num_providers());

let count = worker_keys.iter().fold(0, |count, key| {
manager.unregister_worker(*key).unwrap();
manager.unregister_slot_provider(*key).unwrap();
manager.finalize_unregister(*key).unwrap();
// expect error since worker is already unregistered
let result = manager.unregister_worker(*key);
let result = manager.unregister_slot_provider(*key);
assert!(result.is_err());
let result = manager.finalize_unregister(*key);
assert!(result.is_err());
count + 1
});
Expand Down Expand Up @@ -896,7 +931,10 @@ mod tests {
assert_eq!(providers[1].build_id, Some("build-1".to_string()));
}

manager.unregister_worker(worker2_instance_key).unwrap();
manager
.unregister_slot_provider(worker2_instance_key)
.unwrap();
manager.finalize_unregister(worker2_instance_key).unwrap();

{
let impl_ref = manager.worker_manager.read();
Expand Down Expand Up @@ -1016,7 +1054,10 @@ mod tests {
drop(impl_ref);

// Unregister first worker
manager.unregister_worker(worker_instance_key1).unwrap();
manager
.unregister_slot_provider(worker_instance_key1)
.unwrap();
manager.finalize_unregister(worker_instance_key1).unwrap();

// After unregistering first worker: 1 slot provider, 1 heartbeat worker, shared worker still exists
assert_eq!(1, manager.num_providers());
Expand All @@ -1036,7 +1077,10 @@ mod tests {
drop(impl_ref);

// Unregister second worker
manager.unregister_worker(worker_instance_key2).unwrap();
manager
.unregister_slot_provider(worker_instance_key2)
.unwrap();
manager.finalize_unregister(worker_instance_key2).unwrap();

// After unregistering last worker: 0 slot providers, 0 heartbeat workers, shared worker is removed
assert_eq!(0, manager.num_providers());
Expand Down Expand Up @@ -1447,8 +1491,11 @@ mod tests {
);

manager
.unregister_worker(wf_worker_key)
.expect("should unregister workflow worker");
.unregister_slot_provider(wf_worker_key)
.expect("should unregister slot provider for workflow worker");
manager
.finalize_unregister(wf_worker_key)
.expect("should finalize unregister for workflow worker");

// Activity worker should still be registered
assert_eq!(1, manager.num_providers());
Expand All @@ -1460,9 +1507,37 @@ mod tests {
);

manager
.unregister_worker(act_worker_key)
.expect("should unregister activity worker");
.unregister_slot_provider(act_worker_key)
.expect("should unregister slot provider for activity worker");
manager
.finalize_unregister(act_worker_key)
.expect("should finalize unregister for activity worker");

assert_eq!(0, manager.num_providers());
}

#[test]
fn worker_unregister_order() {
let manager = ClientWorkerSet::new();
let worker = new_mock_provider_with_heartbeat(
"namespace1".to_string(),
"queue1".to_string(),
true,
None,
);
let worker_instance_key = worker.worker_instance_key();
manager.register_worker(Arc::new(worker), false).unwrap();

let res = manager.finalize_unregister(worker_instance_key);
assert!(res.is_err());
let err_string = res.err().map(|e| e.to_string()).unwrap();
assert!(err_string.contains("Worker still in slot_providers during finalize"));

// previous incorrect call to finalize_unregister should not cause any state leaks when
// properly removed later
manager
.unregister_slot_provider(worker_instance_key)
.unwrap();
manager.finalize_unregister(worker_instance_key).unwrap();
}
}
20 changes: 8 additions & 12 deletions crates/sdk-core/src/core_tests/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,19 +333,15 @@ async fn worker_shutdown_api(#[case] use_cache: bool, #[case] api_success: bool)
.returning(|| ("test-core".to_string(), "0.0.0".to_string()));
mock.expect_identity()
.returning(|| "test-identity".to_string());
if use_cache {
if api_success {
mock.expect_shutdown_worker()
.times(1)
.returning(|_, _| Ok(ShutdownWorkerResponse {}));
} else {
// worker.shutdown() should succeed even if shutdown_worker fails
mock.expect_shutdown_worker()
.times(1)
.returning(|_, _| Err(tonic::Status::unavailable("fake shutdown error")));
}
if api_success {
mock.expect_shutdown_worker()
.times(1)
.returning(|_, _| Ok(ShutdownWorkerResponse {}));
} else {
mock.expect_shutdown_worker().times(0);
// worker.shutdown() should succeed even if shutdown_worker fails
mock.expect_shutdown_worker()
.times(1)
.returning(|_, _| Err(tonic::Status::unavailable("fake shutdown error")));
}

let t = canned_histories::single_timer("1");
Expand Down
6 changes: 2 additions & 4 deletions crates/sdk-core/src/pollers/poll_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,13 +960,11 @@ mod tests {
// longer.
assert!(
elapsed >= Duration::from_millis(200),
"Should wait at least the interrupt period. Elapsed: {:?}",
elapsed
"Should wait at least the interrupt period. Elapsed: {elapsed:?}",
);
assert!(
elapsed < Duration::from_secs(1),
"Should not wait too long. Elapsed: {:?}",
elapsed
"Should not wait too long. Elapsed: {elapsed:?}",
);

// Clean up
Expand Down
4 changes: 1 addition & 3 deletions crates/sdk-core/src/worker/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use temporalio_common::{
},
deployment,
enums::v1::{
TaskQueueKind, VersioningBehavior, WorkerStatus, WorkerVersioningMode,
WorkflowTaskFailedCause,
TaskQueueKind, VersioningBehavior, WorkerVersioningMode, WorkflowTaskFailedCause,
},
failure::v1::Failure,
nexus,
Expand Down Expand Up @@ -696,7 +695,6 @@ impl WorkerClient for WorkerClientBag {
) -> Result<ShutdownWorkerResponse> {
let mut final_heartbeat = final_heartbeat;
if let Some(w) = final_heartbeat.as_mut() {
w.status = WorkerStatus::Shutdown.into();
self.set_heartbeat_client_fields(w);
}
let mut request = ShutdownWorkerRequest {
Expand Down
4 changes: 3 additions & 1 deletion crates/sdk-core/src/worker/client/mocks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::*;
use futures_util::Future;
use futures_util::{Future, FutureExt};
use std::sync::{Arc, LazyLock};
use temporalio_client::worker::ClientWorkerSet;

Expand Down Expand Up @@ -53,6 +53,8 @@ pub(crate) fn mock_manual_worker_client() -> MockManualWorkerClient {
r.expect_workers()
.returning(|| DEFAULT_WORKERS_REGISTRY.clone());
r.expect_is_mock().returning(|| true);
r.expect_shutdown_worker()
.returning(|_, _| async { Ok(ShutdownWorkerResponse {}) }.boxed());
r.expect_sdk_name_and_version()
.returning(|| ("test-core".to_string(), "0.0.0".to_string()));
r.expect_identity()
Expand Down
67 changes: 39 additions & 28 deletions crates/sdk-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,12 @@ impl WorkerTrait for Worker {
);
}
self.shutdown_token.cancel();
{
*self.status.write() = WorkerStatus::ShuttingDown;
}
// First, unregister worker from the client
// First, disable Eager Workflow Start
if !self.client_worker_registrator.shared_namespace_worker {
let _res = self
.client
.workers()
.unregister_worker(self.worker_instance_key);
.unregister_slot_provider(self.worker_instance_key);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Shutdown status not set on initiation

initiate_shutdown no longer updates self.status to WorkerStatus::ShuttingDown. Callers that use initiate_shutdown to begin shutdown (before awaiting shutdown/finalize_shutdown) will keep sending heartbeats with Running, delaying/obscuring shutdown signaling and breaking server-side “seen ShuttingDown then no heartbeat” detection.

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want ShuttingDown state to be set when we send the worker_shutdown RPC call


// Push a BumpStream message to the workflow activation queue. This ensures that
Expand Down Expand Up @@ -350,10 +347,13 @@ impl Worker {
CT: Into<AnyClient>,
{
// Unregister worker from current client, register in new client at the end
self.client
.workers()
.unregister_slot_provider(self.worker_instance_key)?;
let client_worker = self
.client
.workers()
.unregister_worker(self.worker_instance_key)?;
.finalize_unregister(self.worker_instance_key)?;

let new_worker_client = super::init_worker_client(
self.config.namespace.clone(),
Expand Down Expand Up @@ -737,31 +737,35 @@ impl Worker {
/// completed
async fn shutdown(&self) {
self.initiate_shutdown();
if let Some(workflows) = &self.workflows
&& let Some(name) = workflows.get_sticky_queue_name()
{
let heartbeat = self
.client_worker_registrator
.heartbeat_manager
.as_ref()
.map(|hm| hm.heartbeat_callback.clone()());

// This is a best effort call and we can still shutdown the worker if it fails
match self.client.shutdown_worker(name, heartbeat).await {
Err(err)
if !matches!(
err.code(),
tonic::Code::Unimplemented | tonic::Code::Unavailable
) =>
{
warn!(
"shutdown_worker rpc errored during worker shutdown: {:?}",
err
);
}
_ => {}
*self.status.write() = WorkerStatus::ShuttingDown;
}
let heartbeat = self
.client_worker_registrator
.heartbeat_manager
.as_ref()
.map(|hm| hm.heartbeat_callback.clone()());
let sticky_name = self
.workflows
.as_ref()
.and_then(|wf| wf.get_sticky_queue_name())
.unwrap_or_default();
// This is a best effort call and we can still shutdown the worker if it fails
match self.client.shutdown_worker(sticky_name, heartbeat).await {
Err(err)
if !matches!(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Empty sticky queue sent on shutdown

shutdown() now always calls shutdown_worker and uses unwrap_or_default() for sticky_name, which becomes an empty string when no sticky queue is used (e.g., max_cached_workflows == 0 or workflow polling disabled). If the server treats an empty sticky_task_queue as invalid when implemented, this can cause noisy warnings and failed shutdown signaling.

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional, we want to start always sending shutdown_worker, not just on sticky queue

err.code(),
tonic::Code::Unimplemented | tonic::Code::Unavailable
) =>
{
warn!(
"shutdown_worker rpc errored during worker shutdown: {:?}",
err
);
}
_ => {}
}

// We need to wait for all local activities to finish so no more workflow task heartbeats
// will be generated
if let Some(la_mgr) = &self.local_act_mgr {
Expand Down Expand Up @@ -797,6 +801,13 @@ impl Worker {
if let Some(b) = self.at_task_mgr {
b.shutdown().await;
}
// Only after worker is fully shutdown do we remove the heartbeat callback
// from SharedNamespaceWorker, allowing for accurate worker shutdown
// from Server POV
let _res = self
.client
.workers()
.finalize_unregister(self.worker_instance_key);
}

pub(crate) fn shutdown_token(&self) -> CancellationToken {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ fn after_shutdown_checks(
);
assert_eq!(heartbeat.sdk_name, "temporal-core");
assert_eq!(heartbeat.sdk_version, "0.1.0");
assert_eq!(heartbeat.status, WorkerStatus::Shutdown as i32);
assert_eq!(heartbeat.status, WorkerStatus::ShuttingDown as i32);

assert_eq!(start_time.load().unwrap(), heartbeat.start_time.unwrap());
assert_ne!(
Expand Down
Loading