Skip to content

Commit

Permalink
fix: error handling and grace shutdown during udf failures (#2406)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Feb 20, 2025
1 parent 01cc33d commit ae6c2a6
Show file tree
Hide file tree
Showing 27 changed files with 977 additions and 657 deletions.
6 changes: 3 additions & 3 deletions rust/numaflow-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ mod tests {
let settings = Settings::load().unwrap();
let mvtx_cfg = match settings.custom_resource_type {
CustomResourceType::MonoVertex(cfg) => cfg,
_ => panic!("Invalid configuration type"),
CustomResourceType::Pipeline(_) => panic!("Invalid configuration type"),
};

assert_eq!(
Expand Down Expand Up @@ -307,7 +307,7 @@ mod tests {
let settings = Settings::load().unwrap();
let mvtx_cfg = match settings.custom_resource_type {
CustomResourceType::MonoVertex(cfg) => cfg,
_ => panic!("Invalid configuration type"),
CustomResourceType::Pipeline(_) => panic!("Invalid configuration type"),
};

assert_eq!(
Expand Down Expand Up @@ -386,7 +386,7 @@ mod tests {
let settings = Settings::load().unwrap();
let mvtx_config = match settings.custom_resource_type {
CustomResourceType::MonoVertex(cfg) => cfg,
_ => panic!("Invalid configuration type"),
CustomResourceType::Pipeline(_) => panic!("Invalid configuration type"),
};

assert_eq!(
Expand Down
19 changes: 2 additions & 17 deletions rust/numaflow-core/src/config/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ impl PipelineConfig {
.spec
.watermark
.clone()
.map_or(true, |w| w.disabled.unwrap_or(true))
.map_or(true, |w| !w.disabled.unwrap_or(false))
{
Self::create_watermark_config(
vertex_obj.spec.watermark.clone(),
Expand Down Expand Up @@ -733,22 +733,7 @@ mod tests {
transformer_config: None,
}),
metrics_config: Default::default(),
watermark_config: Some(WatermarkConfig::Source(SourceWatermarkConfig {
max_delay: Default::default(),
source_bucket_config: BucketConfig {
vertex: "in",
partitions: 1,
ot_bucket: "default-simple-pipeline-in_SOURCE_OT",
hb_bucket: "default-simple-pipeline-in_SOURCE_PROCESSORS",
},
to_vertex_bucket_config: vec![BucketConfig {
vertex: "out",
partitions: 1,
ot_bucket: "default-simple-pipeline-in-out_OT",
hb_bucket: "default-simple-pipeline-in-out_PROCESSORS",
}],
idle_config: None,
})),
watermark_config: None,
..Default::default()
};

Expand Down
8 changes: 1 addition & 7 deletions rust/numaflow-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub enum Error {
Connection(String),

#[error("gRPC Error - {0}")]
Grpc(String),
Grpc(tonic::Status),

#[error("Config Error - {0}")]
Config(String),
Expand Down Expand Up @@ -59,9 +59,3 @@ pub enum Error {
#[error("Watermark Error - {0}")]
Watermark(String),
}

impl From<tonic::Status> for Error {
fn from(status: tonic::Status) -> Self {
Error::Grpc(status.to_string())
}
}
14 changes: 10 additions & 4 deletions rust/numaflow-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ pub async fn run() -> Result<()> {
info!("Starting monovertex forwarder with config: {:#?}", config);
// Run the forwarder with cancellation token.
if let Err(e) = monovertex::start_forwarder(cln_token, &config).await {
error!("Application error running monovertex: {:?}", e);

if let Error::Grpc(e) = e {
error!(error=?e, "Monovertex failed because of UDF failure")
} else {
error!(?e, "Error running monovertex");
}
// abort the signal handler task since we have an error and we are shutting down
if !shutdown_handle.is_finished() {
shutdown_handle.abort();
Expand All @@ -90,8 +93,11 @@ pub async fn run() -> Result<()> {
CustomResourceType::Pipeline(config) => {
info!("Starting pipeline forwarder with config: {:#?}", config);
if let Err(e) = pipeline::start_forwarder(cln_token, config).await {
error!("Application error running pipeline: {:?}", e);

if let Error::Grpc(e) = e {
error!(error=?e, "Pipeline failed because of UDF failure")
} else {
error!(?e, "Error running pipeline");
}
// abort the signal handler task since we have an error and we are shutting down
if !shutdown_handle.is_finished() {
shutdown_handle.abort();
Expand Down
Loading

0 comments on commit ae6c2a6

Please sign in to comment.