diff --git a/Cargo.lock b/Cargo.lock index c3025b7623..cd6d5f160b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -909,6 +909,7 @@ dependencies = [ "serde", "serde_json", "sysinfo", + "tempfile", "thiserror 1.0.69", "tokio", "tracing", diff --git a/ant-logging/Cargo.toml b/ant-logging/Cargo.toml index 02ba0d990a..1d16d794ad 100644 --- a/ant-logging/Cargo.toml +++ b/ant-logging/Cargo.toml @@ -21,7 +21,7 @@ serde = { version = "1.0.133", features = ["derive", "rc"] } serde_json = { version = "1.0" } sysinfo = { version = "0.30.8", default-features = false, optional = true } thiserror = "1.0.23" -tokio = { version = "1.43.1", optional = true } +tokio = { version = "1.43.1", features = ["rt", "rt-multi-thread", "macros", "time"], optional = true } tracing = { version = "~0.1.26" } tracing-appender = "~0.2.0" tracing-core = "0.1.30" @@ -31,6 +31,8 @@ tracing-subscriber = { version = "0.3.16", features = ["json"] } [dev-dependencies] color-eyre = "0.6.3" tracing-test = "0.2.4" +tokio = { version = "1.43.1", features = ["rt", "macros", "rt-multi-thread", "time"] } +tempfile = "3.8" [features] otlp = [ diff --git a/ant-logging/src/layers.rs b/ant-logging/src/layers.rs index 50f3366c5e..d3454da8ae 100644 --- a/ant-logging/src/layers.rs +++ b/ant-logging/src/layers.rs @@ -28,9 +28,9 @@ use tracing_subscriber::{ Layer, Registry, }; -const MAX_LOG_SIZE: usize = 20 * 1024 * 1024; -const MAX_UNCOMPRESSED_LOG_FILES: usize = 10; -const MAX_LOG_FILES: usize = 1000; +pub(crate) const MAX_LOG_SIZE: usize = 20 * 1024 * 1024; +pub(crate) const MAX_UNCOMPRESSED_LOG_FILES: usize = 10; +pub(crate) const MAX_LOG_FILES: usize = 1000; // Everything is logged by default const ALL_ANT_LOGS: &str = "all"; // Trace at nodes, clients, debug at networking layer @@ -59,7 +59,7 @@ impl ReloadHandle { #[derive(Default)] /// Tracing log formatter setup for easier span viewing -pub(crate) struct LogFormatter; +pub struct LogFormatter; impl FormatEvent for LogFormatter where @@ -239,7 +239,7 @@ impl TracingLayers { /// `export ANT_LOG = libp2p=DEBUG, tokio=INFO, all, sn_client=ERROR` /// Custom keywords will take less precedence if the same target has been manually specified in the CSV. /// `sn_client=ERROR` in the above example will be used instead of the TRACE level set by "all" keyword. -fn get_logging_targets(logging_env_value: &str) -> Result> { +pub(crate) fn get_logging_targets(logging_env_value: &str) -> Result> { let mut targets = BTreeMap::new(); let mut contains_keyword_all_sn_logs = false; let mut contains_keyword_verbose_sn_logs = false; diff --git a/ant-logging/src/lib.rs b/ant-logging/src/lib.rs index dfc85743f2..2d8c95fbb9 100644 --- a/ant-logging/src/lib.rs +++ b/ant-logging/src/lib.rs @@ -11,6 +11,7 @@ mod error; mod layers; #[cfg(feature = "process-metrics")] pub mod metrics; +mod multi_node; use crate::error::Result; use layers::TracingLayers; @@ -18,12 +19,20 @@ use serde::{Deserialize, Serialize}; use std::path::PathBuf; use tracing::info; use tracing_core::dispatcher::DefaultGuard; -use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt}; +use tracing_subscriber::{ + prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt, Layer, +}; pub use error::Error; pub use layers::ReloadHandle; pub use tracing_appender::non_blocking::WorkerGuard; +// Export for testing - hidden from docs but available for integration tests +#[doc(hidden)] +pub use layers::LogFormatter; +#[doc(hidden)] +pub use multi_node::NodeSpecificFormatter; + // re-exporting the tracing crate's Level as it is used in our public API pub use tracing_core::Level; @@ -188,6 +197,127 @@ impl LogBuilder { Ok((reload_handle, layers.log_appender_guard)) } + /// Initialize multi-node logging with per-node log files. + /// Each node gets its own log directory and rotation. + /// + /// # Arguments + /// * `node_count` - Number of nodes to create logging for + /// + /// # Returns + /// * `ReloadHandle` - Handle to modify log levels + /// * `Vec` - Guards for all node appenders + pub fn initialize_with_multi_node_logging( + self, + node_count: usize, + ) -> Result<(ReloadHandle, Vec)> { + use crate::appender; + use crate::layers::TracingLayers; + use crate::multi_node::NodeRoutingLayer; + + if node_count == 1 { + // Fall back to existing single-node implementation + let (handle, guard) = self.initialize()?; + return Ok(( + handle, + vec![guard.unwrap_or_else(|| { + // Create a dummy guard if none exists + let (_, guard) = tracing_appender::non_blocking(std::io::sink()); + guard + })], + )); + } + + // Multi-node logging requires file output + let base_log_dir = match &self.output_dest { + LogOutputDest::Path(path) => path.clone(), + _ => { + return Err(Error::LoggingConfiguration( + "Multi-node logging requires file output".to_string(), + )) + } + }; + + // Get logging targets + let targets = match std::env::var("ANT_LOG") { + Ok(sn_log_val) => { + if self.print_updates_to_stdout { + println!("Using ANT_LOG={sn_log_val}"); + } + crate::layers::get_logging_targets(&sn_log_val)? + } + Err(_) => self.default_logging_targets.clone(), + }; + + // Create NodeRoutingLayer and set up per-node appenders + let mut routing_layer = NodeRoutingLayer::new(targets.clone()); + let mut guards = Vec::new(); + + for i in 1..=node_count { + let node_name = format!("node_{i}"); + + let node_log_dir = base_log_dir.join(&node_name); + std::fs::create_dir_all(&node_log_dir)?; + + if self.print_updates_to_stdout { + println!("Logging for {node_name} to directory: {node_log_dir:?}"); + } + + let (appender, guard) = appender::file_rotater( + &node_log_dir, + crate::layers::MAX_LOG_SIZE, + self.max_log_files + .unwrap_or(crate::layers::MAX_UNCOMPRESSED_LOG_FILES), + self.max_archived_log_files + .map(|max_archived| { + max_archived + + self + .max_log_files + .unwrap_or(crate::layers::MAX_UNCOMPRESSED_LOG_FILES) + }) + .unwrap_or(crate::layers::MAX_LOG_FILES), + ); + + routing_layer.add_node_writer(node_name, appender); + guards.push(guard); + } + + // Create reload handle for log level changes + let targets_filter: Box< + dyn tracing_subscriber::layer::Filter + Send + Sync, + > = Box::new(tracing_subscriber::filter::Targets::new().with_targets(targets)); + let (filter, reload_handle) = tracing_subscriber::reload::Layer::new(targets_filter); + let reload_handle = ReloadHandle(reload_handle); + + // Apply the filter to the routing layer + let filtered_routing_layer = routing_layer.with_filter(filter); + + let mut layers = TracingLayers::default(); + layers.layers.push(Box::new(filtered_routing_layer)); + + #[cfg(feature = "otlp")] + { + match std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") { + Ok(_) => layers.otlp_layer(self.default_logging_targets)?, + Err(_) => println!( + "The OTLP feature is enabled but the OTEL_EXPORTER_OTLP_ENDPOINT variable is not \ + set, so traces will not be submitted." + ), + } + } + + if tracing_subscriber::registry() + .with(layers.layers) + .try_init() + .is_err() + { + return Err(Error::LoggingConfiguration( + "Global subscriber already initialized".to_string(), + )); + } + + Ok((reload_handle, guards)) + } + /// Logs to the data_dir with per-test log files. Should be called from a single threaded tokio/non-tokio context. /// Each test gets its own log file based on the test name. /// diff --git a/ant-logging/src/multi_node.rs b/ant-logging/src/multi_node.rs new file mode 100644 index 0000000000..3eb3f06557 --- /dev/null +++ b/ant-logging/src/multi_node.rs @@ -0,0 +1,230 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +//! Multi-node logging functionality for routing logs to separate files per node. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use tracing::field::{Field, Visit}; +use tracing::span::{Attributes, Id}; +use tracing::{Event, Level, Metadata, Subscriber}; +use tracing_appender::non_blocking::NonBlocking; +use tracing_subscriber::fmt as tracing_fmt; +use tracing_subscriber::{ + filter::Targets, + fmt::{ + format::Writer, + time::{FormatTime, SystemTime}, + FmtContext, FormatEvent, FormatFields, + }, + layer::Context, + registry::LookupSpan, + Layer, +}; + +/// Metadata stored with each node span for routing purposes +#[derive(Debug)] +struct NodeMetadata { + node_name: String, +} + +/// Visitor to extract node_id field from span attributes +struct NodeIdVisitor { + node_id: Option, +} + +impl Visit for NodeIdVisitor { + fn record_u64(&mut self, field: &Field, value: u64) { + if field.name() == "node_id" { + self.node_id = Some(value as usize); + } + } + + fn record_i64(&mut self, field: &Field, value: i64) { + if field.name() == "node_id" { + self.node_id = Some(value as usize); + } + } + + fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { + if field.name() == "node_id" { + // Try to extract from debug representation as fallback + let debug_str = format!("{value:?}"); + if let Ok(parsed) = debug_str.parse::() { + self.node_id = Some(parsed); + } + } + } +} + +/// Layer that routes events to different file appenders based on span context +pub struct NodeRoutingLayer { + node_writers: Arc>>, + targets_filter: Targets, +} + +impl NodeRoutingLayer { + pub fn new(targets: Vec<(String, Level)>) -> Self { + Self { + node_writers: Arc::new(Mutex::new(HashMap::new())), + targets_filter: Targets::new().with_targets(targets), + } + } + + pub fn add_node_writer(&mut self, node_name: String, writer: NonBlocking) { + let mut writers = self + .node_writers + .lock() + .expect("Failed to acquire node writers lock"); + writers.insert(node_name, writer); + } +} + +impl Layer for NodeRoutingLayer +where + S: Subscriber + for<'lookup> LookupSpan<'lookup>, +{ + fn enabled(&self, meta: &Metadata<'_>, ctx: Context<'_, S>) -> bool { + use tracing_subscriber::layer::Filter; + Filter::enabled(&self.targets_filter, meta, &ctx) + } + + fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span should exist in registry"); + let span_name = span.name(); + + // Extract node_id from spans named "node" + if span_name == "node" { + let mut visitor = NodeIdVisitor { node_id: None }; + attrs.record(&mut visitor); + + if let Some(node_id) = visitor.node_id { + let node_name = format!("node_{node_id}"); + span.extensions_mut().insert(NodeMetadata { node_name }); + } + } + } + + fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) { + // Find which node this event belongs to based on span hierarchy + let mut target_node = None; + + if let Some(span_ref) = ctx.lookup_current() { + let mut current = Some(span_ref); + while let Some(span) = current { + let span_name = span.name(); + + // Check for dynamic node spans with stored metadata + if span_name == "node" { + if let Some(metadata) = span.extensions().get::() { + target_node = Some(metadata.node_name.clone()); + break; + } + } + + // Check for legacy node spans: node_1, node_2, etc. (backwards compatibility) + if span_name.starts_with("node_") { + target_node = Some(span_name.to_string()); + break; + } + + // Check for node_other spans (for nodes > 20) + if span_name == "node_other" { + // For node_other, we'll route to a default "node_other" directory + target_node = Some("node_other".to_string()); + break; + } + + current = span.parent(); + } + } + + // Route to the appropriate writer + if let Some(node_name) = target_node { + let writers = self + .node_writers + .lock() + .expect("Failed to acquire node writers lock"); + if let Some(writer) = writers.get(&node_name) { + // Create a custom formatter that only shows the target node span + let custom_formatter = NodeSpecificFormatter; + + // Create a temporary fmt layer to format and write the event + let temp_layer = tracing_fmt::layer() + .with_ansi(false) + .with_writer(writer.clone()) + .event_format(custom_formatter); + + // Forward the event to the temporary layer for proper formatting + temp_layer.on_event(event, ctx); + } + } + } +} + +/// Custom formatter that only shows the target node span, avoiding nested node spans +pub struct NodeSpecificFormatter; + +impl FormatEvent for NodeSpecificFormatter +where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> FormatFields<'a> + 'static, +{ + fn format_event( + &self, + ctx: &FmtContext<'_, S, N>, + mut writer: Writer, + event: &Event<'_>, + ) -> std::fmt::Result { + // Write level and target + let level = *event.metadata().level(); + let module = event.metadata().module_path().unwrap_or(""); + let lno = event.metadata().line().unwrap_or(0); + let time = SystemTime; + + write!(writer, "[")?; + time.format_time(&mut writer)?; + write!(writer, " {level} {module} {lno}")?; + + // Only include spans up to and including the first "node" span + // This prevents nested node spans from appearing in the output + let mut all_spans = Vec::new(); + + // First, collect all spans from current to root + if let Some(span_ref) = ctx.lookup_current() { + let mut current = Some(span_ref); + while let Some(span) = current { + all_spans.push(span.name()); + current = span.parent(); + } + } + + // Now, find spans from root down to (and including) the first node span + let mut spans_to_include = Vec::new(); + for span_name in all_spans.iter().rev() { + spans_to_include.push(*span_name); + + // Stop after we include the first "node" span + if *span_name == "node" || span_name.starts_with("node_") || *span_name == "node_other" { + break; + } + } + + // Write spans in order (from outermost to innermost, but only up to the first node) + for span_name in spans_to_include.iter() { + write!(writer, "/{span_name}")?; + } + + write!(writer, "] ")?; + + // Add the log message and any fields associated with the event + ctx.field_format().format_fields(writer.by_ref(), event)?; + + writeln!(writer) + } +} diff --git a/ant-logging/tests/formatter_consistency.rs b/ant-logging/tests/formatter_consistency.rs new file mode 100644 index 0000000000..7018d37bf3 --- /dev/null +++ b/ant-logging/tests/formatter_consistency.rs @@ -0,0 +1,233 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +// Test to ensure LogFormatter and NodeSpecificFormatter produce consistent output + +use ant_logging::{LogFormatter, NodeSpecificFormatter}; +use std::io::{self, Write}; +use std::sync::{Arc, Mutex}; +use tracing::info; +use tracing_subscriber::{fmt::MakeWriter, layer::SubscriberExt, Registry}; + +/// A test writer that captures output to a string +#[derive(Clone)] +struct TestWriter { + buffer: Arc>>, +} + +impl TestWriter { + fn new() -> Self { + Self { + buffer: Arc::new(Mutex::new(Vec::new())), + } + } + + fn get_output(&self) -> String { + let buffer = self.buffer.lock().unwrap(); + String::from_utf8(buffer.clone()).unwrap() + } +} + +impl Write for TestWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.buffer.lock().unwrap().extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl<'a> MakeWriter<'a> for TestWriter { + type Writer = Self; + + fn make_writer(&'a self) -> Self::Writer { + self.clone() + } +} + +#[test] +fn test_formatters_consistent_output_simple_span() { + // Test with a simple single span - both formatters should produce identical output + let (log_formatter_output, node_formatter_output) = capture_formatter_outputs(|| { + let span = tracing::info_span!("test_span"); + let _enter = span.enter(); + info!("Test message"); + }); + + assert_eq!( + log_formatter_output, node_formatter_output, + "LogFormatter and NodeSpecificFormatter should produce identical output for simple spans" + ); +} + +#[test] +fn test_formatters_consistent_output_nested_regular_spans() { + // Test with nested regular spans (not node spans) - should be identical + let (log_formatter_output, node_formatter_output) = capture_formatter_outputs(|| { + let outer_span = tracing::info_span!("outer"); + let _outer_enter = outer_span.enter(); + + let inner_span = tracing::info_span!("inner"); + let _inner_enter = inner_span.enter(); + + info!("Nested message"); + }); + + assert_eq!( + log_formatter_output, node_formatter_output, + "LogFormatter and NodeSpecificFormatter should produce identical output for nested regular spans" + ); +} + +#[test] +fn test_formatters_consistent_output_single_node_span() { + // Test with a single node span - should be identical + let (log_formatter_output, node_formatter_output) = capture_formatter_outputs(|| { + let node_span = tracing::info_span!("node", node_id = 1); + let _enter = node_span.enter(); + info!("Node message"); + }); + + assert_eq!( + log_formatter_output, node_formatter_output, + "LogFormatter and NodeSpecificFormatter should produce identical output for single node spans" + ); +} + +#[test] +fn test_formatters_different_output_nested_node_spans() { + // Test with nested node spans - this is where they should differ + let (log_formatter_output, node_formatter_output) = capture_formatter_outputs(|| { + let node_span_1 = tracing::info_span!("node", node_id = 1); + let _enter_1 = node_span_1.enter(); + + let node_span_2 = tracing::info_span!("node", node_id = 2); + let _enter_2 = node_span_2.enter(); + + info!("Nested node message"); + }); + + // These should be different - LogFormatter will show /node/node, NodeSpecificFormatter should show /node + assert_ne!( + log_formatter_output, node_formatter_output, + "LogFormatter and NodeSpecificFormatter should produce different output for nested node spans" + ); + + // Verify the specific behavior + assert!( + log_formatter_output.contains("/node/node"), + "LogFormatter should show nested node spans as /node/node" + ); + + assert!( + node_formatter_output.contains("/node") && !node_formatter_output.contains("/node/node"), + "NodeSpecificFormatter should show only single /node for nested node spans" + ); +} + +#[test] +fn test_formatters_mixed_spans() { + // Test with a mix of regular spans and node spans + let (log_formatter_output, node_formatter_output) = capture_formatter_outputs(|| { + let outer_span = tracing::info_span!("outer_task"); + let _outer_enter = outer_span.enter(); + + let node_span = tracing::info_span!("node", node_id = 1); + let _node_enter = node_span.enter(); + + let inner_span = tracing::info_span!("inner_task"); + let _inner_enter = inner_span.enter(); + + info!("Mixed span message"); + }); + + // Both should show the spans up to and including the node span + // LogFormatter: /outer_task/node/inner_task + // NodeSpecificFormatter: /outer_task/node (stops at first node span) + + assert!( + log_formatter_output.contains("/outer_task/node/inner_task"), + "LogFormatter should show all spans including those after node span. Got: '{}'", + log_formatter_output + ); + + assert!( + node_formatter_output.contains("/outer_task/node") + && !node_formatter_output.contains("/inner_task"), + "NodeSpecificFormatter should stop at the first node span. Got: '{}'", + node_formatter_output + ); +} + +/// Helper function to extract just the span and message part from formatter output +fn extract_span_and_message(output: &str) -> String { + // Extract everything after the timestamp and level + // Format: [timestamp LEVEL module line/spans] message + if let Some(start) = output.find("] ") { + // Find the spans part - everything between the line number and the closing ] + if let Some(level_end) = output.find(" formatter_consistency ") { + if let Some(spans_start) = output[level_end..start].find('/') { + let spans_part = &output[level_end + spans_start..start]; + let message_part = &output[start + 2..]; + format!("{spans_part}] {message_part}") + } else { + // No spans + let message_part = &output[start + 2..]; + format!("] {message_part}") + } + } else { + output.to_string() + } + } else { + output.to_string() + } +} + +/// Helper function to capture output from both formatters for the same operation +fn capture_formatter_outputs(test_operation: F) -> (String, String) +where + F: Fn() + Send + Sync + 'static, +{ + // Capture LogFormatter output + let log_formatter_output = { + let test_writer = TestWriter::new(); + let layer = tracing_subscriber::fmt::layer() + .with_ansi(false) + .with_target(false) + .event_format(LogFormatter) + .with_writer(test_writer.clone()); + + let subscriber = Registry::default().with(layer); + + let _guard = tracing::subscriber::set_default(subscriber); + test_operation(); + + extract_span_and_message(&test_writer.get_output()) + }; + + // Capture NodeSpecificFormatter output + let node_formatter_output = { + let test_writer = TestWriter::new(); + let layer = tracing_subscriber::fmt::layer() + .with_ansi(false) + .with_target(false) + .event_format(NodeSpecificFormatter) + .with_writer(test_writer.clone()); + + let subscriber = Registry::default().with(layer); + + let _guard = tracing::subscriber::set_default(subscriber); + test_operation(); + + extract_span_and_message(&test_writer.get_output()) + }; + + (log_formatter_output, node_formatter_output) +} diff --git a/ant-logging/tests/multi_node_logging.rs b/ant-logging/tests/multi_node_logging.rs new file mode 100644 index 0000000000..42a743a0fc --- /dev/null +++ b/ant-logging/tests/multi_node_logging.rs @@ -0,0 +1,156 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +// Integration test for multi-node logging functionality + +use ant_logging::{LogBuilder, LogOutputDest}; +use std::path::PathBuf; +use std::time::Duration; +use tempfile::TempDir; +use tracing::{info, Instrument}; + +#[tokio::test] +async fn test_multi_node_logging_e2e() { + let temp_dir = TempDir::new().expect("Failed to create temp directory"); + let log_dir = temp_dir.path().to_path_buf(); + + // Test multi-node logging with 2 nodes + let mut log_builder = LogBuilder::new(vec![( + "multi_node_logging".to_string(), + tracing::Level::INFO, + )]); + log_builder.output_dest(LogOutputDest::Path(log_dir.clone())); + + let (_reload_handle, guards) = log_builder + .initialize_with_multi_node_logging(2) + .expect("Failed to initialize multi-node logging"); + + // Log messages from different nodes using new dynamic span format + let node_1_span = tracing::info_span!("node", node_id = 1); + let task1 = async { + info!("Message from node 1"); + info!("Another message from node 1"); + + // Test nested spans + let inner_span = tracing::info_span!("inner_task"); + let inner_task = async { + info!("Inner message from node 1"); + } + .instrument(inner_span); + inner_task.await; + } + .instrument(node_1_span); + + let node_2_span = tracing::info_span!("node", node_id = 2); + let task2 = async { + info!("Message from node 2"); + } + .instrument(node_2_span); + + // Run tasks concurrently + tokio::join!(task1, task2); + + // Allow time for logs to be written and flushed + tokio::time::sleep(Duration::from_millis(200)).await; + drop(guards); + tokio::time::sleep(Duration::from_millis(200)).await; + + // Verify node directories were created + let node_1_dir = log_dir.join("node_1"); + let node_2_dir = log_dir.join("node_2"); + + assert!(node_1_dir.exists(), "Node 1 directory should exist"); + assert!(node_2_dir.exists(), "Node 2 directory should exist"); + + // Verify each node has its own log file with correct content + let node_1_content = read_log_content(&node_1_dir).expect("Failed to read node 1 logs"); + let node_2_content = read_log_content(&node_2_dir).expect("Failed to read node 2 logs"); + + println!("Node 1 logs:\n{}", node_1_content); + println!("Node 2 logs:\n{}", node_2_content); + + // Check node 1 logs contain all its messages + assert!( + node_1_content.contains("Message from node 1"), + "Node 1 logs should contain its messages" + ); + assert!( + node_1_content.contains("Another message from node 1"), + "Node 1 logs should contain all its messages" + ); + assert!( + node_1_content.contains("Inner message from node 1"), + "Node 1 logs should contain nested span messages" + ); + assert!( + !node_1_content.contains("Message from node 2"), + "Node 1 logs should not contain node 2 messages" + ); + + // Check node 2 logs contain only its messages + assert!( + node_2_content.contains("Message from node 2"), + "Node 2 logs should contain its messages" + ); + assert!( + !node_2_content.contains("Message from node 1"), + "Node 2 logs should not contain node 1 messages" + ); + + // Verify proper log formatting + assert!( + node_1_content.contains("multi_node_logging"), + "Should contain target name" + ); + assert!( + node_1_content.contains("/node"), + "Should contain span information with /node" + ); + assert!( + node_2_content.contains("/node"), + "Should contain span information with /node" + ); +} + +#[test] +fn test_unlimited_node_span_creation() { + // Test that we can create spans for nodes beyond the old 20-node limit + // This tests the span creation functionality without requiring a full logging setup + + let test_nodes = vec![1, 15, 21, 25, 50, 100]; + + for &node_id in &test_nodes { + // This should work for any node_id now (no hardcoded limit) + let node_span = tracing::info_span!("node", node_id = node_id); + + // Verify the span can be entered and used + let _enter = node_span.enter(); + // If we get here without panicking, the span creation works + } + + println!("Successfully created spans for node IDs: {:?}", test_nodes); +} + +/// Helper function to read log content from a node directory +fn read_log_content(node_dir: &PathBuf) -> Result> { + let mut content = String::new(); + + for entry in std::fs::read_dir(node_dir)? { + let entry = entry?; + if entry.path().extension().is_some_and(|ext| ext == "log") { + let file_content = std::fs::read_to_string(entry.path())?; + content.push_str(&file_content); + } + } + + if content.is_empty() { + return Err("No log content found".into()); + } + + Ok(content) +} diff --git a/ant-logging/tests/test_nested_span_fix.rs b/ant-logging/tests/test_nested_span_fix.rs new file mode 100644 index 0000000000..4c93dde5dc --- /dev/null +++ b/ant-logging/tests/test_nested_span_fix.rs @@ -0,0 +1,101 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +// Test to demonstrate that the nested span issue is fixed + +use ant_logging::{LogBuilder, LogOutputDest}; +use std::time::Duration; +use tempfile::TempDir; +use tracing::info; + +#[tokio::test] +async fn test_nested_spans_are_fixed() { + let temp_dir = TempDir::new().expect("Failed to create temp directory"); + let log_dir = temp_dir.path().to_path_buf(); + + // Test multi-node logging with intentionally nested spans + let mut log_builder = LogBuilder::new(vec![( + "test_nested_span_fix".to_string(), + tracing::Level::INFO, + )]); + log_builder.output_dest(LogOutputDest::Path(log_dir.clone())); + + let (_reload_handle, guards) = log_builder + .initialize_with_multi_node_logging(3) + .expect("Failed to initialize multi-node logging"); + + // Create nested spans intentionally - this used to cause the issue + { + let node_span_1 = tracing::info_span!("node", node_id = 1); + let _enter_1 = node_span_1.enter(); + info!("Message from node 1 - outer"); + + { + let node_span_2 = tracing::info_span!("node", node_id = 2); + let _enter_2 = node_span_2.enter(); + info!("Message from node 2 - middle"); + + { + let node_span_3 = tracing::info_span!("node", node_id = 3); + let _enter_3 = node_span_3.enter(); + info!("Message from node 3 - inner"); + } + } + } + + // Allow time for logs to be written and flushed + tokio::time::sleep(Duration::from_millis(200)).await; + drop(guards); + tokio::time::sleep(Duration::from_millis(200)).await; + + // Verify that each node only shows ONE "/node" span in their logs (not nested) + for i in 1..=3 { + let node_dir = log_dir.join(format!("node_{}", i)); + assert!(node_dir.exists(), "Node {} directory should exist", i); + + if let Ok(node_content) = read_log_content(&node_dir) { + println!("Node {} logs: {}", i, node_content); + + // Each line should only have one "/node" in the span path + for line in node_content.lines() { + let node_count = line.matches("/node").count(); + assert_eq!( + node_count, 1, + "Node {} should have exactly 1 '/node' span, but found {} in line: '{}'", + i, node_count, line + ); + } + + // Verify this node's message exists + assert!( + node_content.contains(&format!("Message from node {}", i)), + "Node {} logs should contain its message", + i + ); + } + } +} + +/// Helper function to read log content from a node directory +fn read_log_content(node_dir: &std::path::PathBuf) -> Result> { + let mut content = String::new(); + + for entry in std::fs::read_dir(node_dir)? { + let entry = entry?; + if entry.path().extension().is_some_and(|ext| ext == "log") { + let file_content = std::fs::read_to_string(entry.path())?; + content.push_str(&file_content); + } + } + + if content.is_empty() { + return Err("No log content found".into()); + } + + Ok(content) +} diff --git a/ant-node/src/networking/driver/cmd.rs b/ant-node/src/networking/driver/cmd.rs index 6963f99b3c..34bc833a44 100644 --- a/ant-node/src/networking/driver/cmd.rs +++ b/ant-node/src/networking/driver/cmd.rs @@ -30,6 +30,7 @@ use libp2p::{ use std::time::Instant; use std::{collections::BTreeMap, time::Duration}; use tokio::sync::oneshot; +use tracing::Instrument; use xor_name::XorName; const MAX_CONTINUOUS_HDD_WRITE_ERROR: usize = 5; @@ -640,7 +641,7 @@ impl SwarmDriver { error!("Failed to get response from one shot channel for Cmd::PeerConsideredAsBad : {err:?}"); } } - }); + }.in_current_span()); // request let request = Request::Cmd(Cmd::PeerConsideredAsBad { diff --git a/ant-node/src/networking/driver/mod.rs b/ant-node/src/networking/driver/mod.rs index 280c517636..e61bd3be1a 100644 --- a/ant-node/src/networking/driver/mod.rs +++ b/ant-node/src/networking/driver/mod.rs @@ -53,7 +53,7 @@ use std::collections::{btree_map::Entry, BTreeMap, HashMap, HashSet}; use std::time::Instant; use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{interval, Duration, Interval}; -use tracing::warn; +use tracing::{warn, Instrument}; use super::interface::{LocalSwarmCmd, NetworkEvent, NetworkSwarmCmd}; @@ -406,17 +406,20 @@ impl SwarmDriver { let capacity = event_sender.capacity(); // push the event off thread so as to be non-blocking - let _handle = tokio::spawn(async move { - if capacity == 0 { - warn!( - "NetworkSwarmCmd channel is full. Await capacity to send: {:?}", - event - ); - } - if let Err(error) = event_sender.send(event).await { - error!("SwarmDriver failed to send event: {}", error); + let _handle = tokio::spawn( + async move { + if capacity == 0 { + warn!( + "NetworkSwarmCmd channel is full. Await capacity to send: {:?}", + event + ); + } + if let Err(error) = event_sender.send(event).await { + error!("SwarmDriver failed to send event: {}", error); + } } - }); + .in_current_span(), + ); } /// Sends an event after pushing it off thread so as to be non-blocking @@ -426,17 +429,20 @@ impl SwarmDriver { let capacity = event_sender.capacity(); // push the event off thread so as to be non-blocking - let _handle = tokio::spawn(async move { - if capacity == 0 { - warn!( - "NetworkEvent channel is full. Await capacity to send: {:?}", - event - ); - } - if let Err(error) = event_sender.send(event).await { - error!("SwarmDriver failed to send event: {}", error); + let _handle = tokio::spawn( + async move { + if capacity == 0 { + warn!( + "NetworkEvent channel is full. Await capacity to send: {:?}", + event + ); + } + if let Err(error) = event_sender.send(event).await { + error!("SwarmDriver failed to send event: {}", error); + } } - }); + .in_current_span(), + ); } /// Get K closest peers to self, from our local RoutingTable. @@ -573,11 +579,14 @@ impl SwarmDriver { // Save cache to disk. #[allow(clippy::let_underscore_future)] - let _ = tokio::spawn(async move { - if let Err(err) = old_cache.sync_and_flush_to_disk() { - error!("Failed to save bootstrap cache: {err}"); + let _ = tokio::spawn( + async move { + if let Err(err) = old_cache.sync_and_flush_to_disk() { + error!("Failed to save bootstrap cache: {err}"); + } } - }); + .in_current_span(), + ); } } Ok(()) diff --git a/ant-node/src/networking/metrics/bad_node.rs b/ant-node/src/networking/metrics/bad_node.rs index 0f1c5d597a..bb4378f659 100644 --- a/ant-node/src/networking/metrics/bad_node.rs +++ b/ant-node/src/networking/metrics/bad_node.rs @@ -18,6 +18,7 @@ use std::{ }; use strum::IntoEnumIterator; use tokio::time::interval; +use tracing::Instrument; const UPDATE_INTERVAL: Duration = Duration::from_secs(20); @@ -159,7 +160,7 @@ impl BadNodeMetrics { } } } - }); + }.in_current_span()); tx } } diff --git a/ant-node/src/networking/metrics/mod.rs b/ant-node/src/networking/metrics/mod.rs index b951d93910..9644b9af74 100644 --- a/ant-node/src/networking/metrics/mod.rs +++ b/ant-node/src/networking/metrics/mod.rs @@ -28,6 +28,7 @@ use std::sync::atomic::AtomicU64; use sysinfo::{Pid, ProcessRefreshKind, System}; use tokio::time::sleep; use tokio::time::Duration; +use tracing::Instrument; const UPDATE_INTERVAL: Duration = Duration::from_secs(60); const TO_MB: u64 = 1_000_000; @@ -335,14 +336,17 @@ impl NetworkMetricsRecorder { let bad_nodes_notifier = self.bad_nodes_notifier.clone(); let flagged_by = *flagged_by; #[allow(clippy::let_underscore_future)] - let _ = tokio::spawn(async move { - if let Err(err) = bad_nodes_notifier - .send(BadNodeMetricsMsg::ShunnedByPeer(flagged_by)) - .await - { - error!("Failed to send shunned report via notifier: {err:?}"); + let _ = tokio::spawn( + async move { + if let Err(err) = bad_nodes_notifier + .send(BadNodeMetricsMsg::ShunnedByPeer(flagged_by)) + .await + { + error!("Failed to send shunned report via notifier: {err:?}"); + } } - }); + .in_current_span(), + ); } Marker::QuotingMetrics { quoting_metrics } => { let _ = self.relevant_records.set( @@ -367,14 +371,17 @@ impl NetworkMetricsRecorder { pub(crate) fn record_change_in_close_group(&self, new_close_group: Vec) { let bad_nodes_notifier = self.bad_nodes_notifier.clone(); #[allow(clippy::let_underscore_future)] - let _ = tokio::spawn(async move { - if let Err(err) = bad_nodes_notifier - .send(BadNodeMetricsMsg::CloseGroupUpdated(new_close_group)) - .await - { - error!("Failed to send shunned report via notifier: {err:?}"); + let _ = tokio::spawn( + async move { + if let Err(err) = bad_nodes_notifier + .send(BadNodeMetricsMsg::CloseGroupUpdated(new_close_group)) + .await + { + error!("Failed to send shunned report via notifier: {err:?}"); + } } - }); + .in_current_span(), + ); } pub(crate) fn update_node_versions(&self, versions: &HashMap) { diff --git a/ant-node/src/networking/metrics/service.rs b/ant-node/src/networking/metrics/service.rs index 458bbff0c7..e8f69693f2 100644 --- a/ant-node/src/networking/metrics/service.rs +++ b/ant-node/src/networking/metrics/service.rs @@ -15,6 +15,7 @@ use std::{ sync::{Arc, Mutex}, task::{Context, Poll}, }; +use tracing::Instrument; /// The types of metrics that are exposed via the various endpoints. #[derive(Default, Debug)] @@ -42,7 +43,7 @@ pub(crate) fn run_metrics_server(registries: MetricsRegistries, port: u16) { if let Err(e) = server.await { error!("server error: {}", e); } - }); + }.in_current_span()); } type SharedRegistry = Arc>; diff --git a/ant-node/src/networking/network/mod.rs b/ant-node/src/networking/network/mod.rs index 7bafb0d0b3..29a31404b6 100644 --- a/ant-node/src/networking/network/mod.rs +++ b/ant-node/src/networking/network/mod.rs @@ -19,6 +19,7 @@ use libp2p::kad::{KBucketDistance, Record, RecordKey, K_VALUE}; use libp2p::swarm::ConnectionId; use libp2p::{identity::Keypair, Multiaddr, PeerId}; use tokio::sync::{mpsc, oneshot}; +use tracing::Instrument; use super::driver::event::MsgResponder; use super::error::{NetworkError, Result}; @@ -68,7 +69,7 @@ impl Network { }; // Run the swarm driver as a background task - let _swarm_driver_task = tokio::spawn(swarm_driver.run(shutdown_rx)); + let _swarm_driver_task = tokio::spawn(swarm_driver.run(shutdown_rx).in_current_span()); Ok((network, network_event_receiver)) } @@ -509,11 +510,14 @@ impl Network { } // Spawn a task to send the SwarmCmd and keep this fn sync - let _handle = tokio::spawn(async move { - if let Err(error) = swarm_cmd_sender.send(cmd).await { - error!("Failed to send SwarmCmd: {}", error); + let _handle = tokio::spawn( + async move { + if let Err(error) = swarm_cmd_sender.send(cmd).await { + error!("Failed to send SwarmCmd: {}", error); + } } - }); + .in_current_span(), + ); } /// Helper to send LocalSwarmCmd @@ -537,11 +541,14 @@ pub(crate) fn send_local_swarm_cmd( } // Spawn a task to send the SwarmCmd and keep this fn sync - let _handle = tokio::spawn(async move { - if let Err(error) = swarm_cmd_sender.send(cmd).await { - error!("Failed to send SwarmCmd: {}", error); + let _handle = tokio::spawn( + async move { + if let Err(error) = swarm_cmd_sender.send(cmd).await { + error!("Failed to send SwarmCmd: {}", error); + } } - }); + .in_current_span(), + ); } // A standard way to log connection id & the action performed on it. diff --git a/ant-node/src/networking/record_store.rs b/ant-node/src/networking/record_store.rs index 4354c79aaa..02f6fb0e9e 100644 --- a/ant-node/src/networking/record_store.rs +++ b/ant-node/src/networking/record_store.rs @@ -45,6 +45,7 @@ use std::{ }; use tokio::spawn; use tokio::{sync::mpsc, time::Duration}; +use tracing::Instrument; use walkdir::{DirEntry, WalkDir}; use xor_name::XorName; @@ -355,12 +356,15 @@ impl NodeRecordStore { }; #[allow(clippy::let_underscore_future)] - let _ = spawn(async move { - if let Ok(mut file) = fs::File::create(file_path) { - let mut serialiser = rmp_serde::encode::Serializer::new(&mut file); - let _ = historic_quoting_metrics.serialize(&mut serialiser); + let _ = spawn( + async move { + if let Ok(mut file) = fs::File::create(file_path) { + let mut serialiser = rmp_serde::encode::Serializer::new(&mut file); + let _ = historic_quoting_metrics.serialize(&mut serialiser); + } } - }); + .in_current_span(), + ); } /// Creates a new `DiskBackedStore` with the given configuration. @@ -754,7 +758,7 @@ impl NodeRecordStore { send_local_swarm_cmd(cloned_cmd_sender, cmd); } - }); + }.in_current_span()); Ok(()) } @@ -922,14 +926,17 @@ impl RecordStore for NodeRecordStore { debug!("Unverified Record {record_key:?} try to validate and store"); let event_sender = self.network_event_sender.clone(); // push the event off thread so as to be non-blocking - let _handle = spawn(async move { - if let Err(error) = event_sender - .send(NetworkEvent::UnverifiedRecord(record)) - .await - { - error!("SwarmDriver failed to send event: {}", error); + let _handle = spawn( + async move { + if let Err(error) = event_sender + .send(NetworkEvent::UnverifiedRecord(record)) + .await + { + error!("SwarmDriver failed to send event: {}", error); + } } - }); + .in_current_span(), + ); Ok(()) } @@ -957,16 +964,19 @@ impl RecordStore for NodeRecordStore { let filename = Self::generate_filename(k); let file_path = self.config.storage_dir.join(&filename); - let _handle = spawn(async move { - match fs::remove_file(file_path) { - Ok(_) => { - info!("Removed record from disk! filename: {filename}"); - } - Err(err) => { - error!("Error while removing file. filename: {filename}, error: {err:?}"); + let _handle = spawn( + async move { + match fs::remove_file(file_path) { + Ok(_) => { + info!("Removed record from disk! filename: {filename}"); + } + Err(err) => { + error!("Error while removing file. filename: {filename}, error: {err:?}"); + } } } - }); + .in_current_span(), + ); } fn records(&self) -> Self::RecordsIter<'_> { diff --git a/ant-node/src/networking/replication_fetcher.rs b/ant-node/src/networking/replication_fetcher.rs index f33a663022..9c6d045a6f 100644 --- a/ant-node/src/networking/replication_fetcher.rs +++ b/ant-node/src/networking/replication_fetcher.rs @@ -21,6 +21,7 @@ use std::collections::{hash_map::Entry, BTreeMap, HashMap, HashSet, VecDeque}; use std::time::Instant; use tokio::spawn; use tokio::{sync::mpsc, time::Duration}; +use tracing::Instrument; // Max parallel fetches that can be undertaken at the same time. const MAX_PARALLEL_FETCH: usize = 5; @@ -581,17 +582,20 @@ impl ReplicationFetcher { let capacity = event_sender.capacity(); // push the event off thread so as to be non-blocking - let _handle = spawn(async move { - if capacity == 0 { - warn!( - "NetworkEvent channel is full. Await capacity to send: {:?}", - event - ); - } - if let Err(error) = event_sender.send(event).await { - error!("ReplicationFetcher failed to send event: {}", error); + let _handle = spawn( + async move { + if capacity == 0 { + warn!( + "NetworkEvent channel is full. Await capacity to send: {:?}", + event + ); + } + if let Err(error) = event_sender.send(event).await { + error!("ReplicationFetcher failed to send event: {}", error); + } } - }); + .in_current_span(), + ); } } diff --git a/ant-node/src/node.rs b/ant-node/src/node.rs index cb5e8eadca..4f2263beb5 100644 --- a/ant-node/src/node.rs +++ b/ant-node/src/node.rs @@ -47,6 +47,7 @@ use tokio::{ sync::mpsc::Receiver, task::{spawn, JoinSet}, }; +use tracing::Instrument; /// Interval to trigger replication of all records to all peers. /// This is the max time it should take. Minimum interval at any node will be half this @@ -205,6 +206,7 @@ impl NodeBuilder { }; // Run the node + node.run(network_event_receiver, shutdown_rx); let running_node = RunningNode { shutdown_sender: shutdown_tx, @@ -275,7 +277,11 @@ impl Node { let peers_connected = Arc::new(AtomicUsize::new(0)); + let current_span = tracing::Span::current(); + let node_span = current_span.clone(); let _node_task = spawn(async move { + let _guard = node_span.enter(); // Enter span in spawned task + // use a random activity timeout to ensure that the nodes do not sync when messages // are being transmitted. let replication_interval: u64 = rng.gen_range( @@ -351,7 +357,7 @@ impl Node { let _handle = spawn(async move { Self::try_interval_replication(network); trace!("Periodic replication took {:?}", start.elapsed()); - }); + }.in_current_span()); } _ = uptime_metrics_update_interval.tick() => { #[cfg(feature = "open-metrics")] @@ -364,7 +370,7 @@ impl Node { let _handle = spawn(async move { Self::trigger_irrelevant_record_cleanup(network); - }); + }.in_current_span()); } // runs every storage_challenge_interval time _ = storage_challenge_interval.tick() => { @@ -375,7 +381,7 @@ impl Node { let _handle = spawn(async move { Self::storage_challenge(network).await; trace!("Periodic storage challenge took {:?}", start.elapsed()); - }); + }.in_current_span()); } } } @@ -427,16 +433,22 @@ impl Node { // try query peer version let network = self.network().clone(); - let _handle = spawn(async move { - Self::try_query_peer_version(network, peer_id, Default::default()).await; - }); + let _handle = spawn( + async move { + Self::try_query_peer_version(network, peer_id, Default::default()).await; + } + .in_current_span(), + ); // try replication here let network = self.network().clone(); self.record_metrics(Marker::IntervalReplicationTriggered); - let _handle = spawn(async move { - Self::try_interval_replication(network); - }); + let _handle = spawn( + async move { + Self::try_interval_replication(network); + } + .in_current_span(), + ); } NetworkEvent::PeerRemoved(peer_id, connected_peers) => { event_header = "PeerRemoved"; @@ -450,9 +462,12 @@ impl Node { let network = self.network().clone(); self.record_metrics(Marker::IntervalReplicationTriggered); - let _handle = spawn(async move { - Self::try_interval_replication(network); - }); + let _handle = spawn( + async move { + Self::try_interval_replication(network); + } + .in_current_span(), + ); } NetworkEvent::PeerWithUnsupportedProtocol { .. } => { event_header = "PeerWithUnsupportedProtocol"; @@ -479,32 +494,38 @@ impl Node { let network = self.network().clone(); let payment_address = *self.reward_address(); - let _handle = spawn(async move { - let res = Self::handle_query(&network, query, payment_address).await; + let _handle = spawn( + async move { + let res = Self::handle_query(&network, query, payment_address).await; - // Reducing non-mandatory logging - if let Response::Query(QueryResponse::GetVersion { .. }) = res { - trace!("Sending response {res:?}"); - } else { - debug!("Sending response {res:?}"); - } + // Reducing non-mandatory logging + if let Response::Query(QueryResponse::GetVersion { .. }) = res { + trace!("Sending response {res:?}"); + } else { + debug!("Sending response {res:?}"); + } - network.send_response(res, channel); - }); + network.send_response(res, channel); + } + .in_current_span(), + ); } NetworkEvent::UnverifiedRecord(record) => { event_header = "UnverifiedRecord"; // queries can be long running and require validation, so we spawn a task to handle them let self_clone = self.clone(); - let _handle = spawn(async move { - let key = PrettyPrintRecordKey::from(&record.key).into_owned(); - match self_clone.validate_and_store_record(record).await { - Ok(()) => debug!("UnverifiedRecord {key} has been stored"), - Err(err) => { - self_clone.record_metrics(Marker::RecordRejected(&key, &err)); + let _handle = spawn( + async move { + let key = PrettyPrintRecordKey::from(&record.key).into_owned(); + match self_clone.validate_and_store_record(record).await { + Ok(()) => debug!("UnverifiedRecord {key} has been stored"), + Err(err) => { + self_clone.record_metrics(Marker::RecordRejected(&key, &err)); + } } } - }); + .in_current_span(), + ); } NetworkEvent::TerminateNode { reason } => { event_header = "TerminateNode"; @@ -526,28 +547,35 @@ impl Node { // any change to the keyword `failed to fetch` shall incur // correspondent CI script change as well. debug!("Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."); - let _handle = spawn(async move { - for (peer_id, record_key) in bad_nodes { - // Obsoleted fetch request (due to flooded in fresh replicates) could result - // in peer to be claimed as bad, as local copy blocks the entry to be cleared. - if let Ok(false) = network.is_record_key_present_locally(&record_key).await - { - error!( - "From peer {peer_id:?}, failed to fetch record {:?}", - PrettyPrintRecordKey::from(&record_key) - ); - network.record_node_issues(peer_id, NodeIssue::ReplicationFailure); + let _handle = spawn( + async move { + for (peer_id, record_key) in bad_nodes { + // Obsoleted fetch request (due to flooded in fresh replicates) could result + // in peer to be claimed as bad, as local copy blocks the entry to be cleared. + if let Ok(false) = + network.is_record_key_present_locally(&record_key).await + { + error!( + "From peer {peer_id:?}, failed to fetch record {:?}", + PrettyPrintRecordKey::from(&record_key) + ); + network.record_node_issues(peer_id, NodeIssue::ReplicationFailure); + } } } - }); + .in_current_span(), + ); } NetworkEvent::QuoteVerification { quotes } => { event_header = "QuoteVerification"; let network = self.network().clone(); - let _handle = spawn(async move { - quotes_verification(&network, quotes).await; - }); + let _handle = spawn( + async move { + quotes_verification(&network, quotes).await; + } + .in_current_span(), + ); } NetworkEvent::FreshReplicateToFetch { holder, keys } => { event_header = "FreshReplicateToFetch"; @@ -556,9 +584,12 @@ impl Node { NetworkEvent::PeersForVersionQuery(peers) => { event_header = "PeersForVersionQuery"; let network = self.network().clone(); - let _handle = spawn(async move { - Self::query_peers_version(network, peers).await; - }); + let _handle = spawn( + async move { + Self::query_peers_version(network, peers).await; + } + .in_current_span(), + ); } } diff --git a/ant-node/src/replication.rs b/ant-node/src/replication.rs index 2fbd6d952b..2bb4aaecc6 100644 --- a/ant-node/src/replication.rs +++ b/ant-node/src/replication.rs @@ -39,7 +39,9 @@ impl Node { for (holder, key) in keys_to_fetch { let node = self.clone(); let requester = NetworkAddress::from(self.network().peer_id()); + let current_span = tracing::Span::current(); let _handle = spawn(async move { + let _guard = current_span.enter(); let pretty_key = PrettyPrintRecordKey::from(&key).into_owned(); debug!("Fetching record {pretty_key:?} from node {holder:?}"); let req = Request::Query(Query::GetReplicatedRecord { @@ -185,7 +187,9 @@ impl Node { )>, ) { let node = self.clone(); + let current_span = tracing::Span::current(); let _handle = spawn(async move { + let _guard = current_span.enter(); let mut new_keys = vec![]; for (addr, data_type, val_type, payment) in keys { if let Some(payment) = payment { diff --git a/ant-node/src/spawn/network_spawner.rs b/ant-node/src/spawn/network_spawner.rs index e9fdbfdfbe..db9837e24a 100644 --- a/ant-node/src/spawn/network_spawner.rs +++ b/ant-node/src/spawn/network_spawner.rs @@ -12,6 +12,14 @@ use ant_evm::{EvmNetwork, RewardsAddress}; use libp2p::Multiaddr; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; +use tracing::Instrument; + +/// Create a node span with unlimited node ID support. +/// Uses a static span name "node" with the node_id as a field to work around +/// the tracing library's requirement for compile-time span names. +fn create_node_span(node_id: usize) -> tracing::Span { + tracing::info_span!("node", node_id = node_id) +} pub struct NetworkSpawner { /// The EVM network to which the spawned nodes will connect. @@ -179,42 +187,53 @@ async fn spawn_network( let mut running_nodes: Vec = vec![]; for i in 0..size { - let ip = match local { - true => IpAddr::V4(Ipv4Addr::LOCALHOST), - false => IpAddr::V4(Ipv4Addr::UNSPECIFIED), - }; - - let socket_addr = SocketAddr::new(ip, 0); - - // Get the initial peers from the previously spawned nodes - let mut initial_peers: Vec = vec![]; - - for peer in running_nodes.iter() { - if let Ok(listen_addrs_with_peer_id) = peer.get_listen_addrs_with_peer_id().await { - initial_peers.extend(listen_addrs_with_peer_id); + let node_id = i + 1; + // Create a span with a unique name for this specific node + let node_span = create_node_span(node_id); + + // Instrument the entire node spawn with the node_id span + let spawn_future = async { + let ip = match local { + true => IpAddr::V4(Ipv4Addr::LOCALHOST), + false => IpAddr::V4(Ipv4Addr::UNSPECIFIED), + }; + + let socket_addr = SocketAddr::new(ip, 0); + + // Get the initial peers from the previously spawned nodes + let mut initial_peers: Vec = vec![]; + + for peer in running_nodes.iter() { + if let Ok(listen_addrs_with_peer_id) = peer.get_listen_addrs_with_peer_id().await { + initial_peers.extend(listen_addrs_with_peer_id); + } } - } - - let node = NodeSpawner::new() - .with_socket_addr(socket_addr) - .with_evm_network(evm_network.clone()) - .with_rewards_address(rewards_address) - .with_initial_peers(initial_peers) - .with_local(local) - .with_no_upnp(no_upnp) - .with_root_dir(root_dir.clone()) - .spawn() - .await?; - let listen_addrs = node.get_listen_addrs().await; + let node = NodeSpawner::new() + .with_socket_addr(socket_addr) + .with_evm_network(evm_network.clone()) + .with_rewards_address(rewards_address) + .with_initial_peers(initial_peers) + .with_local(local) + .with_no_upnp(no_upnp) + .with_root_dir(root_dir.clone()) + .spawn() + .await?; + + let listen_addrs = node.get_listen_addrs().await; + + info!( + "Spawned node #{} with listen addresses: {:?}", + i + 1, + listen_addrs + ); - info!( - "Spawned node #{} with listen addresses: {:?}", - i + 1, - listen_addrs - ); + Ok::<_, eyre::Error>(node) + } + .instrument(node_span); // ALL sub-operations inherit this span - running_nodes.push(node); + let running_node = spawn_future.await?; + running_nodes.push(running_node); } Ok(RunningNetwork { running_nodes })