diff --git a/src/client/handler.rs b/src/client/handler.rs index b444209..107101e 100644 --- a/src/client/handler.rs +++ b/src/client/handler.rs @@ -9,24 +9,66 @@ use crate::client::producer::produce; use crate::config::{AppConfig, ClientConfig}; pub fn read_probes_from_csv(buf_reader: R) -> Result> { - let probes = Vec::new(); + let mut probes = Vec::new(); let mut rdr = ReaderBuilder::new() .has_headers(false) .trim(csv::Trim::All) .from_reader(buf_reader); - rdr.deserialize().enumerate().try_fold( - probes, - |mut acc, (i, result): (usize, Result)| { - acc.push(result.map_err(|e: csv::Error| { - anyhow::anyhow!(e).context(format!( - "Failed to deserialize probe from CSV at line {}", + for (i, result) in rdr.records().enumerate() { + let record = result.map_err(|e| { + anyhow::anyhow!(e).context(format!("Failed to read CSV record at line {}", i + 1)) + })?; + + // Parse CSV fields manually and construct Probe + // Assuming CSV format: dst_addr,src_port,dst_port,ttl,protocol + if record.len() < 5 { + return Err(anyhow::anyhow!( + "Invalid CSV format at line {}: expected 5 fields, got {}", + i + 1, + record.len() + )); + } + + let dst_addr = record[0].parse().map_err(|e| { + anyhow::anyhow!("Failed to parse dst_addr at line {}: {}", i + 1, e) + })?; + + let src_port = record[1].parse().map_err(|e| { + anyhow::anyhow!("Failed to parse src_port at line {}: {}", i + 1, e) + })?; + + let dst_port = record[2].parse().map_err(|e| { + anyhow::anyhow!("Failed to parse dst_port at line {}: {}", i + 1, e) + })?; + + let ttl = record[3].parse().map_err(|e| { + anyhow::anyhow!("Failed to parse ttl at line {}: {}", i + 1, e) + })?; + + let protocol = match record[4].to_lowercase().as_str() { + "udp" => caracat::models::L4::UDP, + "icmp" => caracat::models::L4::ICMP, + "icmpv6" => caracat::models::L4::ICMPv6, + other => { + return Err(anyhow::anyhow!( + "Invalid protocol '{}' at line {}", + other, i + 1 )) - })?); - Ok(acc) - }, - ) + } + }; + + probes.push(Probe { + dst_addr, + src_port, + dst_port, + ttl, + protocol, + }); + } + + Ok(probes) } pub async fn handle(config: &AppConfig, client_config: ClientConfig) -> Result<()> {