Skip to content

Commit 8f28137

Browse files
committed
feat(logging): use separate file per spawned node
1 parent 81d171d commit 8f28137

File tree

7 files changed

+461
-41
lines changed

7 files changed

+461
-41
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ant-logging/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ serde = { version = "1.0.133", features = ["derive", "rc"] }
2121
serde_json = { version = "1.0" }
2222
sysinfo = { version = "0.30.8", default-features = false, optional = true }
2323
thiserror = "1.0.23"
24-
tokio = { version = "1.43.1", optional = true }
24+
tokio = { version = "1.43.1", features = ["rt", "rt-multi-thread", "macros", "time"], optional = true }
2525
tracing = { version = "~0.1.26" }
2626
tracing-appender = "~0.2.0"
2727
tracing-core = "0.1.30"
@@ -31,6 +31,8 @@ tracing-subscriber = { version = "0.3.16", features = ["json"] }
3131
[dev-dependencies]
3232
color-eyre = "0.6.3"
3333
tracing-test = "0.2.4"
34+
tokio = { version = "1.43.1", features = ["rt", "macros", "rt-multi-thread", "time"] }
35+
tempfile = "3.8"
3436

3537
[features]
3638
otlp = [

ant-logging/src/layers.rs

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::{
1313
};
1414
use std::collections::BTreeMap;
1515
use tracing_appender::non_blocking::WorkerGuard;
16+
use tracing_core::Metadata;
1617
use tracing_core::{Event, Level, Subscriber};
1718
use tracing_subscriber::{
1819
filter::Targets,
@@ -22,15 +23,15 @@ use tracing_subscriber::{
2223
time::{FormatTime, SystemTime},
2324
FmtContext, FormatEvent, FormatFields,
2425
},
25-
layer::Filter,
26+
layer::{Context, Filter},
2627
registry::LookupSpan,
2728
reload::{self, Handle},
2829
Layer, Registry,
2930
};
3031

31-
const MAX_LOG_SIZE: usize = 20 * 1024 * 1024;
32-
const MAX_UNCOMPRESSED_LOG_FILES: usize = 10;
33-
const MAX_LOG_FILES: usize = 1000;
32+
pub(crate) const MAX_LOG_SIZE: usize = 20 * 1024 * 1024;
33+
pub(crate) const MAX_UNCOMPRESSED_LOG_FILES: usize = 10;
34+
pub(crate) const MAX_LOG_FILES: usize = 1000;
3435
// Everything is logged by default
3536
const ALL_ANT_LOGS: &str = "all";
3637
// Trace at nodes, clients, debug at networking layer
@@ -239,7 +240,7 @@ impl TracingLayers {
239240
/// `export ANT_LOG = libp2p=DEBUG, tokio=INFO, all, sn_client=ERROR`
240241
/// Custom keywords will take less precedence if the same target has been manually specified in the CSV.
241242
/// `sn_client=ERROR` in the above example will be used instead of the TRACE level set by "all" keyword.
242-
fn get_logging_targets(logging_env_value: &str) -> Result<Vec<(String, Level)>> {
243+
pub(crate) fn get_logging_targets(logging_env_value: &str) -> Result<Vec<(String, Level)>> {
243244
let mut targets = BTreeMap::new();
244245
let mut contains_keyword_all_sn_logs = false;
245246
let mut contains_keyword_verbose_sn_logs = false;
@@ -325,3 +326,85 @@ fn get_log_level_from_str(log_level: &str) -> Result<Level> {
325326
))),
326327
}
327328
}
329+
330+
use std::collections::HashMap;
331+
use std::sync::{Arc, Mutex};
332+
use tracing_appender::non_blocking::NonBlocking;
333+
334+
/// Layer that routes events to different file appenders based on span context
335+
pub struct NodeRoutingLayer {
336+
node_writers: Arc<Mutex<HashMap<String, NonBlocking>>>,
337+
targets_filter: Targets,
338+
}
339+
340+
impl NodeRoutingLayer {
341+
pub fn new(targets: Vec<(String, Level)>) -> Self {
342+
Self {
343+
node_writers: Arc::new(Mutex::new(HashMap::new())),
344+
targets_filter: Targets::new().with_targets(targets),
345+
}
346+
}
347+
348+
pub fn add_node_writer(&mut self, node_name: String, writer: NonBlocking) {
349+
let mut writers = self
350+
.node_writers
351+
.lock()
352+
.expect("Failed to acquire node writers lock");
353+
writers.insert(node_name, writer);
354+
}
355+
}
356+
357+
impl<S> Layer<S> for NodeRoutingLayer
358+
where
359+
S: Subscriber + for<'lookup> LookupSpan<'lookup>,
360+
{
361+
fn enabled(&self, meta: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
362+
use tracing_subscriber::layer::Filter;
363+
Filter::enabled(&self.targets_filter, meta, &ctx)
364+
}
365+
366+
fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
367+
// Find which node this event belongs to based on span hierarchy
368+
let mut target_node = None;
369+
370+
if let Some(span_ref) = ctx.lookup_current() {
371+
let mut current = Some(span_ref);
372+
while let Some(span) = current {
373+
let span_name = span.name();
374+
375+
// Check for standard node spans: node_1, node_2, etc.
376+
if span_name.starts_with("node_") {
377+
target_node = Some(span_name.to_string());
378+
break;
379+
}
380+
381+
// Check for node_other spans (for nodes > 20)
382+
if span_name == "node_other" {
383+
// For node_other, we'll route to a default "node_other" directory
384+
target_node = Some("node_other".to_string());
385+
break;
386+
}
387+
388+
current = span.parent();
389+
}
390+
}
391+
392+
// Route to the appropriate writer
393+
if let Some(node_name) = target_node {
394+
let writers = self
395+
.node_writers
396+
.lock()
397+
.expect("Failed to acquire node writers lock");
398+
if let Some(writer) = writers.get(&node_name) {
399+
// Create a temporary fmt layer to format and write the event
400+
let temp_layer = tracing_fmt::layer()
401+
.with_ansi(false)
402+
.with_writer(writer.clone())
403+
.event_format(LogFormatter);
404+
405+
// Forward the event to the temporary layer for proper formatting
406+
temp_layer.on_event(event, ctx);
407+
}
408+
}
409+
}
410+
}

ant-logging/src/lib.rs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,123 @@ impl LogBuilder {
188188
Ok((reload_handle, layers.log_appender_guard))
189189
}
190190

191+
/// Initialize multi-node logging with per-node log files.
192+
/// Each node gets its own log directory and rotation.
193+
///
194+
/// # Arguments
195+
/// * `node_count` - Number of nodes to create logging for
196+
///
197+
/// # Returns
198+
/// * `ReloadHandle` - Handle to modify log levels
199+
/// * `Vec<WorkerGuard>` - Guards for all node appenders
200+
pub fn initialize_with_multi_node_logging(
201+
self,
202+
node_count: usize,
203+
) -> Result<(ReloadHandle, Vec<WorkerGuard>)> {
204+
use crate::appender;
205+
use crate::layers::{NodeRoutingLayer, TracingLayers};
206+
207+
if node_count == 1 {
208+
// Fall back to existing single-node implementation
209+
let (handle, guard) = self.initialize()?;
210+
return Ok((
211+
handle,
212+
vec![guard.unwrap_or_else(|| {
213+
// Create a dummy guard if none exists
214+
let (_, guard) = tracing_appender::non_blocking(std::io::sink());
215+
guard
216+
})],
217+
));
218+
}
219+
220+
// Multi-node logging requires file output
221+
let base_log_dir = match &self.output_dest {
222+
LogOutputDest::Path(path) => path.clone(),
223+
_ => {
224+
return Err(Error::LoggingConfiguration(
225+
"Multi-node logging requires file output".to_string(),
226+
))
227+
}
228+
};
229+
230+
// Get logging targets
231+
let targets = match std::env::var("ANT_LOG") {
232+
Ok(sn_log_val) => {
233+
if self.print_updates_to_stdout {
234+
println!("Using ANT_LOG={sn_log_val}");
235+
}
236+
crate::layers::get_logging_targets(&sn_log_val)?
237+
}
238+
Err(_) => self.default_logging_targets.clone(),
239+
};
240+
241+
// Create NodeRoutingLayer and set up per-node appenders
242+
let mut routing_layer = NodeRoutingLayer::new(targets);
243+
let mut guards = Vec::new();
244+
245+
for i in 1..=node_count {
246+
let node_name = format!("node_{i}");
247+
248+
let node_log_dir = base_log_dir.join(&node_name);
249+
std::fs::create_dir_all(&node_log_dir)?;
250+
251+
if self.print_updates_to_stdout {
252+
println!("Logging for {node_name} to directory: {node_log_dir:?}");
253+
}
254+
255+
let (appender, guard) = appender::file_rotater(
256+
&node_log_dir,
257+
crate::layers::MAX_LOG_SIZE,
258+
self.max_log_files
259+
.unwrap_or(crate::layers::MAX_UNCOMPRESSED_LOG_FILES),
260+
self.max_archived_log_files
261+
.map(|max_archived| {
262+
max_archived
263+
+ self
264+
.max_log_files
265+
.unwrap_or(crate::layers::MAX_UNCOMPRESSED_LOG_FILES)
266+
})
267+
.unwrap_or(crate::layers::MAX_LOG_FILES),
268+
);
269+
270+
routing_layer.add_node_writer(node_name, appender);
271+
guards.push(guard);
272+
}
273+
274+
let mut layers = TracingLayers::default();
275+
layers.layers.push(Box::new(routing_layer));
276+
277+
#[cfg(feature = "otlp")]
278+
{
279+
match std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") {
280+
Ok(_) => layers.otlp_layer(self.default_logging_targets)?,
281+
Err(_) => println!(
282+
"The OTLP feature is enabled but the OTEL_EXPORTER_OTLP_ENDPOINT variable is not \
283+
set, so traces will not be submitted."
284+
),
285+
}
286+
}
287+
288+
if tracing_subscriber::registry()
289+
.with(layers.layers)
290+
.try_init()
291+
.is_err()
292+
{
293+
return Err(Error::LoggingConfiguration(
294+
"Global subscriber already initialized".to_string(),
295+
));
296+
}
297+
298+
// Create reload handle for log level changes
299+
let targets_filter: Box<
300+
dyn tracing_subscriber::layer::Filter<tracing_subscriber::Registry> + Send + Sync,
301+
> = Box::new(tracing_subscriber::filter::Targets::new());
302+
let (_, reload_handle) = tracing_subscriber::reload::Layer::new(targets_filter);
303+
let reload_handle = ReloadHandle(reload_handle);
304+
305+
Ok((reload_handle, guards))
306+
}
307+
191308
/// Logs to the data_dir. Should be called from a single threaded tokio/non-tokio context.
192309
/// Provide the test file name to capture tracings from the test.
193310
///
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Integration test for multi-node logging functionality
2+
3+
use ant_logging::{LogBuilder, LogOutputDest};
4+
use std::path::PathBuf;
5+
use std::time::Duration;
6+
use tempfile::TempDir;
7+
use tracing::{info, Instrument};
8+
9+
#[tokio::test]
10+
async fn test_multi_node_logging_e2e() {
11+
let temp_dir = TempDir::new().expect("Failed to create temp directory");
12+
let log_dir = temp_dir.path().to_path_buf();
13+
14+
// Test multi-node logging with 2 nodes
15+
let mut log_builder = LogBuilder::new(vec![(
16+
"test_multi_node_e2e".to_string(),
17+
tracing::Level::INFO,
18+
)]);
19+
log_builder.output_dest(LogOutputDest::Path(log_dir.clone()));
20+
21+
let (_reload_handle, guards) = log_builder
22+
.initialize_with_multi_node_logging(2)
23+
.expect("Failed to initialize multi-node logging");
24+
25+
// Log messages from different nodes
26+
let node_1_span = tracing::info_span!("node_1");
27+
let task1 = async {
28+
info!("Message from node 1");
29+
info!("Another message from node 1");
30+
31+
// Test nested spans
32+
let inner_span = tracing::info_span!("inner_task");
33+
let inner_task = async {
34+
info!("Inner message from node 1");
35+
}
36+
.instrument(inner_span);
37+
inner_task.await;
38+
}
39+
.instrument(node_1_span);
40+
41+
let node_2_span = tracing::info_span!("node_2");
42+
let task2 = async {
43+
info!("Message from node 2");
44+
}
45+
.instrument(node_2_span);
46+
47+
// Run tasks concurrently
48+
tokio::join!(task1, task2);
49+
50+
// Allow time for logs to be written and flushed
51+
tokio::time::sleep(Duration::from_millis(200)).await;
52+
drop(guards);
53+
tokio::time::sleep(Duration::from_millis(200)).await;
54+
55+
// Verify node directories were created
56+
let node_1_dir = log_dir.join("node_1");
57+
let node_2_dir = log_dir.join("node_2");
58+
59+
assert!(node_1_dir.exists(), "Node 1 directory should exist");
60+
assert!(node_2_dir.exists(), "Node 2 directory should exist");
61+
62+
// Verify each node has its own log file with correct content
63+
let node_1_content = read_log_content(&node_1_dir).expect("Failed to read node 1 logs");
64+
let node_2_content = read_log_content(&node_2_dir).expect("Failed to read node 2 logs");
65+
66+
// Check node 1 logs contain all its messages
67+
assert!(
68+
node_1_content.contains("Message from node 1"),
69+
"Node 1 logs should contain its messages"
70+
);
71+
assert!(
72+
node_1_content.contains("Another message from node 1"),
73+
"Node 1 logs should contain all its messages"
74+
);
75+
assert!(
76+
node_1_content.contains("Inner message from node 1"),
77+
"Node 1 logs should contain nested span messages"
78+
);
79+
assert!(
80+
!node_1_content.contains("Message from node 2"),
81+
"Node 1 logs should not contain node 2 messages"
82+
);
83+
84+
// Check node 2 logs contain only its messages
85+
assert!(
86+
node_2_content.contains("Message from node 2"),
87+
"Node 2 logs should contain its messages"
88+
);
89+
assert!(
90+
!node_2_content.contains("Message from node 1"),
91+
"Node 2 logs should not contain node 1 messages"
92+
);
93+
94+
// Verify proper log formatting
95+
assert!(
96+
node_1_content.contains("test_multi_node_e2e"),
97+
"Should contain target name"
98+
);
99+
assert!(
100+
node_1_content.contains("node_1"),
101+
"Should contain span information"
102+
);
103+
assert!(
104+
node_2_content.contains("node_2"),
105+
"Should contain span information"
106+
);
107+
}
108+
109+
/// Helper function to read log content from a node directory
110+
fn read_log_content(node_dir: &PathBuf) -> Result<String, Box<dyn std::error::Error>> {
111+
let mut content = String::new();
112+
113+
for entry in std::fs::read_dir(node_dir)? {
114+
let entry = entry?;
115+
if entry.path().extension().is_some_and(|ext| ext == "log") {
116+
let file_content = std::fs::read_to_string(entry.path())?;
117+
content.push_str(&file_content);
118+
}
119+
}
120+
121+
if content.is_empty() {
122+
return Err("No log content found".into());
123+
}
124+
125+
Ok(content)
126+
}

0 commit comments

Comments
 (0)