Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 53 additions & 11 deletions src/client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,66 @@ use crate::client::producer::produce;
use crate::config::{AppConfig, ClientConfig};

pub fn read_probes_from_csv<R: BufRead>(buf_reader: R) -> Result<Vec<Probe>> {
let probes = Vec::new();
let mut probes = Vec::new();
let mut rdr = ReaderBuilder::new()
.has_headers(false)
.trim(csv::Trim::All)
.from_reader(buf_reader);

rdr.deserialize().enumerate().try_fold(
probes,
|mut acc, (i, result): (usize, Result<Probe, _>)| {
acc.push(result.map_err(|e: csv::Error| {
anyhow::anyhow!(e).context(format!(
"Failed to deserialize probe from CSV at line {}",
for (i, result) in rdr.records().enumerate() {
let record = result.map_err(|e| {
anyhow::anyhow!(e).context(format!("Failed to read CSV record at line {}", i + 1))
})?;

// Parse CSV fields manually and construct Probe
// Assuming CSV format: dst_addr,src_port,dst_port,ttl,protocol
if record.len() < 5 {
return Err(anyhow::anyhow!(
"Invalid CSV format at line {}: expected 5 fields, got {}",
i + 1,
record.len()
));
}

let dst_addr = record[0].parse().map_err(|e| {
anyhow::anyhow!("Failed to parse dst_addr at line {}: {}", i + 1, e)
})?;

let src_port = record[1].parse().map_err(|e| {
anyhow::anyhow!("Failed to parse src_port at line {}: {}", i + 1, e)
})?;

let dst_port = record[2].parse().map_err(|e| {
anyhow::anyhow!("Failed to parse dst_port at line {}: {}", i + 1, e)
})?;

let ttl = record[3].parse().map_err(|e| {
anyhow::anyhow!("Failed to parse ttl at line {}: {}", i + 1, e)
})?;

let protocol = match record[4].to_lowercase().as_str() {
"udp" => caracat::models::L4::UDP,
"icmp" => caracat::models::L4::ICMP,
"icmpv6" => caracat::models::L4::ICMPv6,
other => {
return Err(anyhow::anyhow!(
"Invalid protocol '{}' at line {}",
other,
i + 1
))
})?);
Ok(acc)
},
)
}
};

probes.push(Probe {
dst_addr,
src_port,
dst_port,
ttl,
protocol,
});
}

Ok(probes)
}

pub async fn handle(config: &AppConfig, client_config: ClientConfig) -> Result<()> {
Expand Down