Skip to content

Commit

Permalink
Split ping client prepare error and ping error for more properly hand…
Browse files Browse the repository at this point in the history
…ling the failure reasons. (#12)
  • Loading branch information
r12f authored Jul 14, 2021
1 parent c98aaef commit 34b8cb2
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 64 deletions.
80 changes: 44 additions & 36 deletions src/ping_clients/ping_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::time::Duration;
pub struct PingClientPingResultDetails {
pub actual_local_addr: Option<SockAddr>,
pub round_trip_time: Duration,
pub inner_error: Option<io::Error>,
pub prepare_error: Option<io::Error>,
pub ping_error: Option<io::Error>,
}
pub type PingClientPingResult =
std::result::Result<PingClientPingResultDetails, PingClientPingResultDetails>;
Expand All @@ -15,22 +16,14 @@ impl PingClientPingResultDetails {
pub fn new(
actual_local_addr: Option<SockAddr>,
round_trip_time: Duration,
inner_error: Option<io::Error>,
prepare_error: Option<io::Error>,
ping_error: Option<io::Error>,
) -> PingClientPingResultDetails {
PingClientPingResultDetails {
actual_local_addr,
round_trip_time,
inner_error,
}
}
}

impl From<io::Error> for PingClientPingResultDetails {
fn from(e: io::Error) -> PingClientPingResultDetails {
PingClientPingResultDetails {
actual_local_addr: None,
round_trip_time: Duration::from_secs(0),
inner_error: Some(e),
prepare_error,
ping_error,
}
}
}
Expand All @@ -47,10 +40,10 @@ mod tests {
use super::*;
use crate::{ping_clients::ping_client_factory, PingClientConfig};
use futures_intrusive::sync::ManualResetEvent;
use std::sync::Arc;
use std::{io, net::SocketAddr};
use tide::prelude::*;
use tide::Request;
use std::sync::Arc;
use tokio::runtime::Runtime;

struct ExpectedPingClientTestResults {
Expand Down Expand Up @@ -124,7 +117,12 @@ mod tests {
let source = SockAddr::from("0.0.0.0:0".parse::<SocketAddr>().unwrap());
let target = SockAddr::from("127.0.0.1:3389".parse::<SocketAddr>().unwrap());
let result = ping_client.ping(&source, &target);
ping_client_result_should_be_expected(&result, None, expected_results.timeout_min_time);
ping_client_result_should_be_expected(
&result,
None,
None,
expected_results.timeout_min_time,
);
}

fn ping_client_should_fail_when_pinging_non_existing_host(
Expand All @@ -136,6 +134,7 @@ mod tests {
let result = ping_client.ping(&source, &target);
ping_client_result_should_be_expected(
&result,
None,
Some(expected_results.ping_non_existing_host_error),
expected_results.timeout_min_time,
);
Expand All @@ -150,6 +149,7 @@ mod tests {
let result = ping_client.ping(&source, &target);
ping_client_result_should_be_expected(
&result,
None,
Some(expected_results.ping_non_existing_port_error),
expected_results.timeout_min_time,
);
Expand All @@ -165,6 +165,7 @@ mod tests {
ping_client_result_should_be_expected(
&result,
Some(expected_results.binding_invalid_source_ip_error),
None,
expected_results.timeout_min_time,
);
}
Expand All @@ -179,35 +180,42 @@ mod tests {
ping_client_result_should_be_expected(
&result,
Some(expected_results.binding_unavailable_source_port_error),
None,
expected_results.timeout_min_time,
);
}

fn ping_client_result_should_be_expected(
result: &PingClientPingResult,
expected_error: Option<io::ErrorKind>,
expected_prepare_error: Option<io::ErrorKind>,
expected_ping_error: Option<io::ErrorKind>,
timeout_min_time: Duration,
) {
match expected_error {
None => {
assert!(result.is_ok());
}

Some(error) => {
assert!(result.is_err());
assert!(result.as_ref().err().is_some());

let actual_error_details = result.as_ref().err().unwrap();
assert!(actual_error_details.inner_error.is_some());

let actual_error_kind = actual_error_details.inner_error.as_ref().unwrap().kind();
assert_eq!(actual_error_kind, error);

if error == io::ErrorKind::TimedOut {
assert!(actual_error_details.round_trip_time > timeout_min_time);
} else {
assert_eq!(0, actual_error_details.round_trip_time.as_micros());
}
if expected_prepare_error.is_none() && expected_ping_error.is_none() {
assert!(result.is_ok());
} else {
assert!(result.is_err());
assert!(result.as_ref().err().is_some());

let actual_error_details = result.as_ref().err().unwrap();

let actual_prepare_error = match &actual_error_details.prepare_error {
Some(e) => Some(e.kind()),
None => None,
};
assert_eq!(expected_prepare_error, actual_prepare_error);

let actual_ping_error = match &actual_error_details.ping_error {
Some(e) => Some(e.kind()),
None => None,
};
assert_eq!(expected_ping_error, actual_ping_error);

if actual_ping_error.is_some() && actual_ping_error.unwrap() == io::ErrorKind::TimedOut
{
assert!(actual_error_details.round_trip_time > timeout_min_time);
} else {
assert_eq!(0, actual_error_details.round_trip_time.as_micros());
}
}
}
Expand Down
45 changes: 30 additions & 15 deletions src/ping_clients/ping_client_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,48 @@ impl PingClientTcp {
pub fn new(config: &PingClientConfig) -> PingClientTcp {
return PingClientTcp { config: config.clone() };
}
}

impl PingClient for PingClientTcp {
fn protocol(&self) -> Protocol { Protocol::TCP }

fn ping(&self, source: &SockAddr, target: &SockAddr) -> PingClientPingResult {
fn ping_target(&self, source: &SockAddr, target: &SockAddr) -> PingClientPingResult {
let socket_domain = Domain::from(target.family() as i32);
let socket = Socket::new(socket_domain, Type::STREAM, None)?;
socket.bind(&source)?;
socket.set_linger(Some(Duration::from_secs(0)))?;
if let Some(ttl) = self.config.time_to_live {
socket.set_ttl(ttl)?;
}
let socket = match self.prepare_socket_for_ping(socket_domain, source) {
Ok(socket) => socket,
Err(e) => return Err(PingClientPingResultDetails::new(None, Duration::from_millis(0), Some(e), None)),
};

let start_time = Instant::now();
let connect_result = socket.connect_timeout(&target, self.config.wait_timeout);
let rtt = Instant::now().duration_since(start_time);
match connect_result {
Err(e) if e.kind() == io::ErrorKind::TimedOut => return Err(PingClientPingResultDetails::new(None, rtt, Some(e))),
Err(e) => return Err(PingClientPingResultDetails::new(None, Duration::from_millis(0), Some(e))),
Err(e) if e.kind() == io::ErrorKind::TimedOut => return Err(PingClientPingResultDetails::new(None, rtt, None, Some(e))),
Err(e) => return Err(PingClientPingResultDetails::new(None, Duration::from_millis(0), None, Some(e))),
Ok(()) => (),
}

// If getting local address failed, we ignore it.
// The worse case we can get is to output a 0.0.0.0 as source IP, which is not critical to what we are trying to do.
let local_addr = socket.local_addr();
return match local_addr {
Ok(addr) => Ok(PingClientPingResultDetails::new(Some(addr), rtt, None)),
Err(_) => Ok(PingClientPingResultDetails::new(None, rtt, None)),
Ok(addr) => Ok(PingClientPingResultDetails::new(Some(addr), rtt, None, None)),
Err(_) => Ok(PingClientPingResultDetails::new(None, rtt, None, None)),
};
}

fn prepare_socket_for_ping(&self, socket_domain: Domain, source: &SockAddr) -> io::Result<Socket> {
let socket = Socket::new(socket_domain, Type::STREAM, None)?;
socket.bind(&source)?;
socket.set_linger(Some(Duration::from_secs(0)))?;
if let Some(ttl) = self.config.time_to_live {
socket.set_ttl(ttl)?;
}

return Ok(socket);
}
}

impl PingClient for PingClientTcp {
fn protocol(&self) -> Protocol { Protocol::TCP }

fn ping(&self, source: &SockAddr, target: &SockAddr) -> PingClientPingResult {
return self.ping_target(source, target);
}
}
31 changes: 18 additions & 13 deletions src/ping_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{ping_client_factory, PingClient, PingPortPicker, PingResult, PingWor
use chrono::{offset::Utc, DateTime};
use futures_intrusive::sync::ManualResetEvent;
use socket2::SockAddr;
use std::{io, net::SocketAddr, sync::Arc, sync::Mutex};
use std::{net::SocketAddr, sync::Arc, sync::Mutex};
use tokio::{sync::mpsc, task, task::JoinHandle};

pub struct PingWorker {
Expand Down Expand Up @@ -61,9 +61,7 @@ impl PingWorker {
.expect("Failed getting port picker lock")
.next();
match source_port {
Some(source_port) => {
self.run_single_ping(source_port).await
}
Some(source_port) => self.run_single_ping(source_port).await,
None => {
tracing::debug!("Ping finished, stopping worker; worker_id={}", self.id);
return;
Expand Down Expand Up @@ -107,6 +105,7 @@ impl PingWorker {
local_addr = Some(SocketAddr::new(self.config.source_ip, src_port));
}

let is_prepare_error = ping_result.prepare_error.is_some();
let result = PingResult::new(
ping_time,
self.id,
Expand All @@ -115,20 +114,26 @@ impl PingWorker {
local_addr.unwrap(),
self.is_warmup_worker,
ping_result.round_trip_time,
ping_result.inner_error,
if ping_result.ping_error.is_some() {
ping_result.ping_error
} else {
ping_result.prepare_error
},
);

// Failed due to unable to bind source port, the source port might be taken, and we should ignore this error and continue.
if let Some(e) = result.error() {
if e.kind() == io::ErrorKind::AddrInUse || e.kind() == io::ErrorKind::PermissionDenied {
if is_prepare_error {
if let Some(e) = result.error() {
let warmup_sign = if result.is_warmup() { " (warmup)" } else { "" };

println!(
"Unable to perform ping to {} {} from {}{}, because local port is unavailable: Error = {}",
result.protocol_string(),
result.target(),
result.source(),
warmup_sign,
e);
"Unable to perform ping to {} {} from {}{}, because failing to prepare local socket: Error = {}",
result.protocol_string(),
result.target(),
result.source(),
warmup_sign,
e);

return;
}
}
Expand Down

0 comments on commit 34b8cb2

Please sign in to comment.