Skip to content

Commit 2a137b0

Browse files
committed
Eliminate worker trait & move worker config into core
1 parent 326089e commit 2a137b0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+1449
-1724
lines changed

crates/common/src/errors.rs

Lines changed: 0 additions & 85 deletions
This file was deleted.

crates/common/src/lib.rs

Lines changed: 0 additions & 214 deletions
Original file line numberDiff line numberDiff line change
@@ -12,228 +12,14 @@ mod activity_definition;
1212
pub mod data_converters;
1313
#[cfg(feature = "envconfig")]
1414
pub mod envconfig;
15-
pub mod errors;
1615
#[doc(hidden)]
1716
pub mod fsm_trait;
1817
pub mod protos;
1918
pub mod telemetry;
2019
pub mod worker;
2120

22-
use crate::{
23-
errors::{
24-
CompleteActivityError, CompleteNexusError, CompleteWfError, PollError,
25-
WorkerValidationError,
26-
},
27-
protos::coresdk::{
28-
ActivityHeartbeat, ActivityTaskCompletion,
29-
activity_task::ActivityTask,
30-
nexus::{NexusTask, NexusTaskCompletion},
31-
workflow_activation::WorkflowActivation,
32-
workflow_completion::WorkflowActivationCompletion,
33-
},
34-
worker::WorkerConfig,
35-
};
36-
use std::sync::Arc;
37-
use uuid::Uuid;
38-
3921
pub use activity_definition::ActivityDefinition;
4022

41-
// TODO [rust-sdk-branch]: Move to core
42-
/// This trait is the primary way by which language specific SDKs interact with the core SDK.
43-
/// It represents one worker, which has a (potentially shared) client for connecting to the service
44-
/// and is bound to a specific task queue.
45-
#[async_trait::async_trait]
46-
pub trait Worker: Send + Sync {
47-
/// Validate that the worker can properly connect to server, plus any other validation that
48-
/// needs to be done asynchronously. Lang SDKs should call this function once before calling
49-
/// any others.
50-
async fn validate(&self) -> Result<(), WorkerValidationError>;
51-
52-
/// Ask the worker for some work, returning a [WorkflowActivation]. It is then the language
53-
/// SDK's responsibility to call the appropriate workflow code with the provided inputs. Blocks
54-
/// indefinitely until such work is available or [Worker::shutdown] is called.
55-
///
56-
/// It is important to understand that all activations must be responded to. There can only
57-
/// be one outstanding activation for a particular run of a workflow at any time. If an
58-
/// activation is not responded to, it will cause that workflow to become stuck forever.
59-
///
60-
/// See [WorkflowActivation] for more details on the expected behavior of lang w.r.t activation
61-
/// & job processing.
62-
///
63-
/// Do not call poll concurrently. It handles polling the server concurrently internally.
64-
async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollError>;
65-
66-
/// Ask the worker for some work, returning an [ActivityTask]. It is then the language SDK's
67-
/// responsibility to call the appropriate activity code with the provided inputs. Blocks
68-
/// indefinitely until such work is available or [Worker::shutdown] is called.
69-
///
70-
/// Do not call poll concurrently. It handles polling the server concurrently internally.
71-
async fn poll_activity_task(&self) -> Result<ActivityTask, PollError>;
72-
73-
/// Ask the worker for some nexus related work. It is then the language SDK's
74-
/// responsibility to call the appropriate nexus operation handler code with the provided
75-
/// inputs. Blocks indefinitely until such work is available or [Worker::shutdown] is called.
76-
///
77-
/// All tasks must be responded to for shutdown to complete.
78-
///
79-
/// Do not call poll concurrently. It handles polling the server concurrently internally.
80-
async fn poll_nexus_task(&self) -> Result<NexusTask, PollError>;
81-
82-
/// Tell the worker that a workflow activation has completed. May (and should) be freely called
83-
/// concurrently. The future may take some time to resolve, as fetching more events might be
84-
/// necessary for completion to... complete - thus SDK implementers should make sure they do
85-
/// not serialize completions.
86-
async fn complete_workflow_activation(
87-
&self,
88-
completion: WorkflowActivationCompletion,
89-
) -> Result<(), CompleteWfError>;
90-
91-
/// Tell the worker that an activity has finished executing. May (and should) be freely called
92-
/// concurrently.
93-
async fn complete_activity_task(
94-
&self,
95-
completion: ActivityTaskCompletion,
96-
) -> Result<(), CompleteActivityError>;
97-
98-
/// Tell the worker that a nexus task has completed. May (and should) be freely called
99-
/// concurrently.
100-
async fn complete_nexus_task(
101-
&self,
102-
completion: NexusTaskCompletion,
103-
) -> Result<(), CompleteNexusError>;
104-
105-
/// Notify the Temporal service that an activity is still alive. Long running activities that
106-
/// take longer than `activity_heartbeat_timeout` to finish must call this function in order to
107-
/// report progress, otherwise the activity will timeout and a new attempt will be scheduled.
108-
///
109-
/// The first heartbeat request will be sent immediately, subsequent rapid calls to this
110-
/// function will result in heartbeat requests being aggregated and the last one received during
111-
/// the aggregation period will be sent to the server, where that period is defined as half the
112-
/// heartbeat timeout.
113-
///
114-
/// Unlike Java/Go SDKs we do not return cancellation status as part of heartbeat response and
115-
/// instead send it as a separate activity task to the lang, decoupling heartbeat and
116-
/// cancellation processing.
117-
///
118-
/// For now activity still need to send heartbeats if they want to receive cancellation
119-
/// requests. In the future we will change this and will dispatch cancellations more
120-
/// proactively. Note that this function does not block on the server call and returns
121-
/// immediately. Underlying validation errors are swallowed and logged, this has been agreed to
122-
/// be optimal behavior for the user as we don't want to break activity execution due to badly
123-
/// configured heartbeat options.
124-
fn record_activity_heartbeat(&self, details: ActivityHeartbeat);
125-
126-
/// Request that a workflow be evicted by its run id. This will generate a workflow activation
127-
/// with the eviction job inside it to be eventually returned by
128-
/// [Worker::poll_workflow_activation]. If the workflow had any existing outstanding activations,
129-
/// such activations are invalidated and subsequent completions of them will do nothing and log
130-
/// a warning.
131-
fn request_workflow_eviction(&self, run_id: &str);
132-
133-
/// Return this worker's config
134-
fn get_config(&self) -> &WorkerConfig;
135-
136-
/// Initiate shutdown. See [Worker::shutdown], this is just a sync version that starts the
137-
/// process. You can then wait on `shutdown` or [Worker::finalize_shutdown].
138-
fn initiate_shutdown(&self);
139-
140-
/// Initiates async shutdown procedure, eventually ceases all polling of the server and shuts
141-
/// down this worker. [Worker::poll_workflow_activation] and [Worker::poll_activity_task] should
142-
/// be called until both return a `ShutDown` error to ensure that all outstanding work is
143-
/// complete. This means that the lang sdk will need to call
144-
/// [Worker::complete_workflow_activation] and [Worker::complete_activity_task] for those
145-
/// workflows & activities until they are done. At that point, the lang SDK can end the process,
146-
/// or drop the [Worker] instance via [Worker::finalize_shutdown], which will close the
147-
/// connection and free resources. If you have set [WorkerConfig::task_types] to exclude
148-
/// [worker::WorkerTaskTypes::activity_only()], you may skip calling [Worker::poll_activity_task].
149-
///
150-
/// Lang implementations should use [Worker::initiate_shutdown] followed by
151-
/// [Worker::finalize_shutdown].
152-
async fn shutdown(&self);
153-
154-
/// Completes shutdown and frees all resources. You should avoid simply dropping workers, as
155-
/// this does not allow async tasks to report any panics that may have occurred cleanly.
156-
///
157-
/// This should be called only after [Worker::shutdown] has resolved and/or both polling
158-
/// functions have returned `ShutDown` errors.
159-
async fn finalize_shutdown(self);
160-
161-
/// Unique identifier for this worker instance.
162-
/// This must be stable across the worker's lifetime and unique per instance.
163-
fn worker_instance_key(&self) -> Uuid;
164-
}
165-
166-
#[async_trait::async_trait]
167-
impl<W> Worker for Arc<W>
168-
where
169-
W: Worker + ?Sized,
170-
{
171-
async fn validate(&self) -> Result<(), WorkerValidationError> {
172-
(**self).validate().await
173-
}
174-
175-
async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollError> {
176-
(**self).poll_workflow_activation().await
177-
}
178-
179-
async fn poll_activity_task(&self) -> Result<ActivityTask, PollError> {
180-
(**self).poll_activity_task().await
181-
}
182-
183-
async fn poll_nexus_task(&self) -> Result<NexusTask, PollError> {
184-
(**self).poll_nexus_task().await
185-
}
186-
187-
async fn complete_workflow_activation(
188-
&self,
189-
completion: WorkflowActivationCompletion,
190-
) -> Result<(), CompleteWfError> {
191-
(**self).complete_workflow_activation(completion).await
192-
}
193-
194-
async fn complete_activity_task(
195-
&self,
196-
completion: ActivityTaskCompletion,
197-
) -> Result<(), CompleteActivityError> {
198-
(**self).complete_activity_task(completion).await
199-
}
200-
201-
async fn complete_nexus_task(
202-
&self,
203-
completion: NexusTaskCompletion,
204-
) -> Result<(), CompleteNexusError> {
205-
(**self).complete_nexus_task(completion).await
206-
}
207-
208-
fn record_activity_heartbeat(&self, details: ActivityHeartbeat) {
209-
(**self).record_activity_heartbeat(details)
210-
}
211-
212-
fn request_workflow_eviction(&self, run_id: &str) {
213-
(**self).request_workflow_eviction(run_id)
214-
}
215-
216-
fn get_config(&self) -> &WorkerConfig {
217-
(**self).get_config()
218-
}
219-
220-
fn initiate_shutdown(&self) {
221-
(**self).initiate_shutdown()
222-
}
223-
224-
async fn shutdown(&self) {
225-
(**self).shutdown().await
226-
}
227-
228-
async fn finalize_shutdown(self) {
229-
panic!("Can't finalize shutdown on Arc'd worker")
230-
}
231-
232-
fn worker_instance_key(&self) -> Uuid {
233-
(**self).worker_instance_key()
234-
}
235-
}
236-
23723
macro_rules! dbg_panic {
23824
($($arg:tt)*) => {
23925
use tracing::error;

crates/common/src/protos/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ pub mod coresdk {
7070

7171
#[allow(clippy::module_inception)]
7272
pub mod activity_task {
73-
use crate::protos::{coresdk::ActivityTaskCompletion, task_token::fmt_tt};
73+
use crate::protos::{coresdk::ActivityTaskCompletion, task_token::format_task_token};
7474
use std::fmt::{Display, Formatter};
7575
tonic::include_proto!("coresdk.activity_task");
7676

@@ -119,7 +119,7 @@ pub mod coresdk {
119119
write!(
120120
f,
121121
"ActivityTaskCompletion(token: {}",
122-
fmt_tt(&self.task_token),
122+
format_task_token(&self.task_token),
123123
)?;
124124
if let Some(r) = self.result.as_ref().and_then(|r| r.status.as_ref()) {
125125
write!(f, ", {r}")?;

crates/common/src/protos/task_token.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@ impl TaskToken {
3636

3737
impl Display for TaskToken {
3838
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
39-
f.write_str(&fmt_tt(&self.0))
39+
f.write_str(&format_task_token(&self.0))
4040
}
4141
}
4242

4343
impl Debug for TaskToken {
4444
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45-
f.write_str(&format!("TaskToken({})", fmt_tt(&self.0)))
45+
f.write_str(&format!("TaskToken({})", format_task_token(&self.0)))
4646
}
4747
}
4848

@@ -52,6 +52,6 @@ impl Borrow<[u8]> for TaskToken {
5252
}
5353
}
5454

55-
pub(crate) fn fmt_tt(tt: &[u8]) -> String {
55+
pub(crate) fn format_task_token(tt: &[u8]) -> String {
5656
BASE64_STANDARD.encode(tt)
5757
}

0 commit comments

Comments
 (0)