Skip to content

Commit

Permalink
Add exit on fail support for simple networking testing in script. (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
r12f authored Aug 9, 2021
1 parent 303a922 commit 4387546
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 18 deletions.
15 changes: 14 additions & 1 deletion src/bin/rnp/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ fn main() {
let rnp_core_config = opts.to_rnp_core_config();

let rt = Runtime::new().unwrap();
rt.block_on(async {
let result = rt.block_on(async {
let stop_event = Arc::new(ManualResetEvent::new(false));
let rnp_exit_failure_reason = rnp_core_config.result_processor_config.exit_failure_reason.clone();
let mut rp = RnpCore::new(rnp_core_config, stop_event.clone());

ctrlc::set_handler(move || {
Expand All @@ -33,5 +34,17 @@ fn main() {

rp.start_running_normal_pings();
rp.join().await;

if let Some(rnp_exit_failure_reason) = rnp_exit_failure_reason {
if rnp_exit_failure_reason.lock().unwrap().is_some() {
return Err("Ping failed!".to_string());
}
}
return Ok(());
});

// In order to have better control over the console output, we don't return the result from main function directly.
if let Err(_) = result {
std::process::exit(1);
}
}
16 changes: 16 additions & 0 deletions src/bin/rnp/rnp_cli_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use rnp::{
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use structopt::StructOpt;

Expand Down Expand Up @@ -70,6 +71,9 @@ pub struct RnpCliCommonOptions {

#[structopt(short = "p", long = "parallel", default_value = "1", help = "Count of pings running in parallel.")]
pub parallel_ping_count: u32,

#[structopt(long, help = "Exit as soon as a ping failed and return a non-zero error code.")]
pub exit_on_fail: bool,
}

#[derive(Debug, StructOpt, PartialEq)]
Expand Down Expand Up @@ -246,6 +250,8 @@ impl RnpCliOptions {
},
result_processor_config: PingResultProcessorConfig {
common_config: PingResultProcessorCommonConfig { quiet_level: self.output_options.quiet_level },
exit_on_fail: self.common_options.exit_on_fail,
exit_failure_reason: if self.common_options.exit_on_fail { Some(Arc::new(Mutex::new(None))) } else { None },
csv_log_path: self.output_options.csv_log_path.clone(),
json_log_path: self.output_options.json_log_path.clone(),
text_log_path: self.output_options.text_log_path.clone(),
Expand Down Expand Up @@ -355,6 +361,7 @@ mod tests {
time_to_live: None,
check_disconnect: false,
parallel_ping_count: 1,
exit_on_fail: false,
},
quic_options: RnpCliQuicPingOptions {
server_name: None,
Expand Down Expand Up @@ -393,6 +400,7 @@ mod tests {
time_to_live: None,
check_disconnect: true,
parallel_ping_count: 10,
exit_on_fail: false,
},
quic_options: RnpCliQuicPingOptions {
server_name: None,
Expand Down Expand Up @@ -461,6 +469,7 @@ mod tests {
time_to_live: Some(128),
check_disconnect: true,
parallel_ping_count: 10,
exit_on_fail: true,
},
quic_options: RnpCliQuicPingOptions {
server_name: Some(String::from("localhost")),
Expand Down Expand Up @@ -500,6 +509,7 @@ mod tests {
"--check-disconnect",
"--parallel",
"10",
"--exit-on-fail",
"--server-name",
"localhost",
"--log-tls-key",
Expand Down Expand Up @@ -548,6 +558,8 @@ mod tests {
},
result_processor_config: PingResultProcessorConfig {
common_config: PingResultProcessorCommonConfig { quiet_level: RNP_QUIET_LEVEL_NONE },
exit_on_fail: false,
exit_failure_reason: None,
csv_log_path: None,
json_log_path: None,
text_log_path: None,
Expand All @@ -572,6 +584,7 @@ mod tests {
time_to_live: Some(128),
check_disconnect: false,
parallel_ping_count: 1,
exit_on_fail: false,
},
quic_options: RnpCliQuicPingOptions {
server_name: None,
Expand Down Expand Up @@ -617,6 +630,8 @@ mod tests {
},
result_processor_config: PingResultProcessorConfig {
common_config: PingResultProcessorCommonConfig { quiet_level: RNP_QUIET_LEVEL_NO_PING_RESULT },
exit_on_fail: true,
exit_failure_reason: Some(Arc::new(Mutex::new(None))),
csv_log_path: Some(PathBuf::from("log.csv")),
json_log_path: Some(PathBuf::from("log.json")),
text_log_path: Some(PathBuf::from("log.txt")),
Expand All @@ -641,6 +656,7 @@ mod tests {
time_to_live: Some(128),
check_disconnect: true,
parallel_ping_count: 1,
exit_on_fail: true,
},
quic_options: RnpCliQuicPingOptions {
server_name: Some(String::from("localhost")),
Expand Down
14 changes: 10 additions & 4 deletions src/ping_result_processing_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,31 @@ use crate::{ping_result_processors::ping_result_processor_factory, PingResult, P
use futures_intrusive::sync::ManualResetEvent;
use std::sync::Arc;
use tokio::{sync::mpsc, task, task::JoinHandle};
use contracts::requires;

pub struct PingResultProcessingWorker {
stop_event: Arc<ManualResetEvent>,

receiver: mpsc::Receiver<PingResult>,
processors: Vec<Box<dyn PingResultProcessor + Send + Sync>>,
}

impl PingResultProcessingWorker {
#[requires(config.exit_on_fail -> config.exit_failure_reason.is_some())]
pub fn run(
config: Arc<PingResultProcessorConfig>,
extra_ping_result_processors: Vec<Box<dyn PingResultProcessor + Send + Sync>>,
stop_event: Arc<ManualResetEvent>,
ping_stop_event: Arc<ManualResetEvent>,
receiver: mpsc::Receiver<PingResult>,
) -> JoinHandle<()> {
let join_handle = task::spawn(async move {
let processors = ping_result_processor_factory::new(&config, extra_ping_result_processors);

let mut worker = PingResultProcessingWorker { stop_event, receiver, processors };

let processors = ping_result_processor_factory::new(&config, extra_ping_result_processors, ping_stop_event);
let mut worker = PingResultProcessingWorker {
stop_event,
receiver,
processors,
};
worker.run_worker().await;
});

Expand Down
42 changes: 36 additions & 6 deletions src/ping_result_processors/ping_result_processor_console_logger.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use crate::*;
use std::io::{stdout, Write};
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tracing;
use futures_intrusive::sync::ManualResetEvent;

pub struct PingResultProcessorConsoleLogger {
common_config: Arc<PingResultProcessorCommonConfig>,
last_console_flush_time: Option<Instant>,

ping_stop_event: Arc<ManualResetEvent>,
exit_on_fail: bool,
exit_failure_reason: Option<Arc<Mutex<Option<PingResultDto>>>>,

protocol: Option<String>,
target: Option<SocketAddr>,
ping_count: u32,
Expand All @@ -23,10 +28,18 @@ pub struct PingResultProcessorConsoleLogger {

impl PingResultProcessorConsoleLogger {
#[tracing::instrument(name = "Creating ping result console logger", level = "debug")]
pub fn new(common_config: Arc<PingResultProcessorCommonConfig>) -> PingResultProcessorConsoleLogger {
pub fn new(
common_config: Arc<PingResultProcessorCommonConfig>,
ping_stop_event: Arc<ManualResetEvent>,
exit_on_fail: bool,
exit_failure_reason: Option<Arc<Mutex<Option<PingResultDto>>>>,
) -> PingResultProcessorConsoleLogger {
return PingResultProcessorConsoleLogger {
common_config,
last_console_flush_time: None,
ping_stop_event,
exit_on_fail,
exit_failure_reason,
protocol: None,
target: None,
ping_count: 0,
Expand Down Expand Up @@ -135,18 +148,35 @@ impl PingResultProcessor for PingResultProcessorConsoleLogger {
}

self.output_result_to_console(ping_result);
}

fn rundown(&mut self) {
if self.has_quiet_level(RNP_QUIET_LEVEL_NO_PING_SUMMARY) {
return;
if self.exit_on_fail {
// We ignore the preparation error here, because it is not really network issue.
if !ping_result.is_succeeded() && !ping_result.is_preparation_error() {
tracing::debug!("Ping failure received! Save result as exit reason and signal all ping workers to exit: Result = {:?}", ping_result);
*self.exit_failure_reason.as_ref().unwrap().lock().unwrap() = Some(ping_result.create_dto());
self.ping_stop_event.set();
}
}
}

fn rundown(&mut self) {
if self.config().quiet_level == RNP_QUIET_LEVEL_NO_PING_RESULT || self.config().quiet_level == RNP_QUIET_LEVEL_NO_PING_SUMMARY {
self.output_ping_count_update_to_console(true);
println!();
}

// We delay the log output here, because during rundown, we will flush the pending log to console once.
// If we don't do it, the output will be chaotic.
if !self.has_quiet_level(RNP_QUIET_LEVEL_NO_OUTPUT) {
if self.exit_on_fail && self.exit_failure_reason.as_ref().unwrap().lock().unwrap().is_some() {
println!("Ping failure received! Exiting...");
}
}

if self.has_quiet_level(RNP_QUIET_LEVEL_NO_PING_SUMMARY) {
return;
}

// Didn't received any result, skip output statistics.
if self.target.is_none() {
return;
Expand Down
19 changes: 16 additions & 3 deletions src/ping_result_processors/ping_result_processor_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,23 @@ use crate::ping_result_processors::ping_result_processor_result_scatter_logger::
use crate::ping_result_processors::ping_result_processor_text_logger::PingResultProcessorTextLogger;
use crate::{PingResultProcessor, PingResultProcessorConfig};
use std::sync::Arc;
use futures_intrusive::sync::ManualResetEvent;

pub fn new(
config: &PingResultProcessorConfig,
mut extra_ping_result_processors: Vec<Box<dyn PingResultProcessor + Send + Sync>>,
ping_stop_event: Arc<ManualResetEvent>,
) -> Vec<Box<dyn PingResultProcessor + Send + Sync>> {
let common_config = Arc::new(config.common_config.clone());
let mut processors = Vec::new();

// We always create the console logger for keeping our user informed.
let console_logger: Box<dyn PingResultProcessor + Send + Sync> = Box::new(PingResultProcessorConsoleLogger::new(common_config.clone()));
let console_logger: Box<dyn PingResultProcessor + Send + Sync> = Box::new(PingResultProcessorConsoleLogger::new(
common_config.clone(),
ping_stop_event.clone(),
config.exit_on_fail,
config.exit_failure_reason.clone(),
));
processors.push(console_logger);

if let Some(csv_log_path) = &config.csv_log_path {
Expand Down Expand Up @@ -65,11 +72,15 @@ mod tests {
use crate::ping_result_processors::ping_result_processor_factory::new;
use crate::*;
use std::path::PathBuf;
use futures_intrusive::sync::ManualResetEvent;
use std::sync::Arc;

#[test]
fn create_ping_result_processor_should_work_with_empty_config() {
let config = PingResultProcessorConfig {
common_config: PingResultProcessorCommonConfig { quiet_level: RNP_QUIET_LEVEL_NONE },
exit_on_fail: false,
exit_failure_reason: None,
csv_log_path: None,
json_log_path: None,
text_log_path: None,
Expand All @@ -78,14 +89,16 @@ mod tests {
latency_buckets: None,
};

let ping_clients = new(&config, vec![]);
let ping_clients = new(&config, vec![], Arc::new(ManualResetEvent::new(false)));
assert_eq!(1, ping_clients.len());
}

#[test]
fn create_ping_result_processor_should_work_with_valid_config() {
let config = PingResultProcessorConfig {
common_config: PingResultProcessorCommonConfig { quiet_level: RNP_QUIET_LEVEL_NO_PING_RESULT },
exit_on_fail: false,
exit_failure_reason: None,
csv_log_path: Some(PathBuf::from("log.csv")),
json_log_path: Some(PathBuf::from("log.json")),
text_log_path: Some(PathBuf::from("log.txt")),
Expand All @@ -94,7 +107,7 @@ mod tests {
latency_buckets: Some(vec![0.1, 0.5, 1.0, 10.0]),
};

let ping_clients = new(&config, vec![]);
let ping_clients = new(&config, vec![], Arc::new(ManualResetEvent::new(false)));
assert_eq!(7, ping_clients.len());
}
}
13 changes: 11 additions & 2 deletions src/rnp_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ impl RnpCore {
/// common_config: PingResultProcessorCommonConfig {
/// quiet_level: RNP_QUIET_LEVEL_NONE,
/// },
/// exit_on_fail: false,
/// exit_failure_reason: None,
/// csv_log_path: None,
/// json_log_path: None,
/// text_log_path: None,
Expand Down Expand Up @@ -93,6 +95,7 @@ impl RnpCore {
extra_ping_result_processors,
config.worker_scheduler_config.parallel_ping_count,
ping_result_processor_stop_event.clone(),
stop_event.clone(),
);

let rnp_core = RnpCore {
Expand All @@ -115,15 +118,21 @@ impl RnpCore {
extra_ping_result_processors: Vec<Box<dyn PingResultProcessor + Send + Sync>>,
parallel_ping_count: u32,
stop_event: Arc<ManualResetEvent>,
ping_stop_event: Arc<ManualResetEvent>,
) -> (mpsc::Sender<PingResult>, JoinHandle<()>) {
let mut ping_result_channel_size = parallel_ping_count * 2;
if ping_result_channel_size < 128 {
ping_result_channel_size = 128;
}

let (ping_result_sender, ping_result_receiver) = mpsc::channel(ping_result_channel_size as usize);
let ping_result_processor_join_handle =
PingResultProcessingWorker::run(Arc::new(result_processor_config), extra_ping_result_processors, stop_event, ping_result_receiver);
let ping_result_processor_join_handle = PingResultProcessingWorker::run(
Arc::new(result_processor_config),
extra_ping_result_processors,
stop_event,
ping_stop_event,
ping_result_receiver,
);

return (ping_result_sender, ping_result_processor_join_handle);
}
Expand Down
Loading

0 comments on commit 4387546

Please sign in to comment.