Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog.d/24229_nats_slow_consumer_suppress.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
The `nats` source now suppresses high-frequency slow consumer warnings that could generate
millions of logs per minute when subscription capacity was exceeded. These events are now
logged at INFO level with rate limiting and tracked via a `nats_slow_consumer_events_total`
metric.

authors: benjamin-awd
4 changes: 4 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ mod metric_to_log;
mod mongodb_metrics;
#[cfg(feature = "sinks-mqtt")]
mod mqtt;
#[cfg(feature = "sources-nats")]
mod nats;
#[cfg(feature = "sources-nginx_metrics")]
mod nginx_metrics;
mod open;
Expand Down Expand Up @@ -246,6 +248,8 @@ pub(crate) use self::lua::*;
pub(crate) use self::metric_to_log::*;
#[cfg(feature = "sinks-mqtt")]
pub(crate) use self::mqtt::*;
#[cfg(feature = "sources-nats")]
pub(crate) use self::nats::*;
#[cfg(feature = "sources-nginx_metrics")]
pub(crate) use self::nginx_metrics::*;
#[cfg(any(
Expand Down
26 changes: 26 additions & 0 deletions src/internal_events/nats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use metrics::counter;
use tracing::info;
use vector_lib::internal_event::InternalEvent;

#[derive(Debug)]
pub struct NatsSlowConsumerEventReceived {
pub subscription_id: u64,
pub component_id: String,
}

impl InternalEvent for NatsSlowConsumerEventReceived {
fn emit(self) {
info!(
message = "NATS slow consumer for subscription.",
subscription_id = %self.subscription_id,
component_id = %self.component_id,
internal_log_rate_secs = 10,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant because the default value is already 10.

);
counter!(
"nats_slow_consumer_events_total",
"component_id" => self.component_id,
"subscription_id" => self.subscription_id.to_string(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this high cardinality?

Copy link
Contributor Author

@benjamin-awd benjamin-awd Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'll most likely only ever be equal to 1, since we spin up a single subscription per source -- personally I've never seen anything besides that for Vector.

)
.increment(1);
}
}
24 changes: 24 additions & 0 deletions src/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use nkeys::error::Error as NKeysError;
use snafu::{ResultExt, Snafu};
use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};

use crate::internal_events::NatsSlowConsumerEventReceived;

use crate::tls::TlsEnableableConfig;

/// Errors that can occur during NATS configuration.
Expand Down Expand Up @@ -184,6 +186,28 @@ pub(crate) fn from_tls_auth_config(
}
}

/// Apply event callback to suppress noisy NATS client events.
///
/// This prevents log flooding from high-frequency SlowConsumer events.
/// The async_nats library logs these events outside of Vector's
/// rate limiting system, so we handle them here instead.
pub(crate) fn apply_event_callback(
options: async_nats::ConnectOptions,
component_id: String,
) -> async_nats::ConnectOptions {
options.event_callback(move |event| {
let component_id = component_id.clone();
async move {
if let async_nats::Event::SlowConsumer(subscription_id) = event {
emit!(NatsSlowConsumerEventReceived {
subscription_id,
component_id
});
}
}
})
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
8 changes: 5 additions & 3 deletions src/sources/nats/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use vrl::value::Kind;
use crate::{
codecs::DecodingConfig,
config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
nats::{NatsAuthConfig, NatsConfigError, from_tls_auth_config},
nats::{NatsAuthConfig, NatsConfigError, apply_event_callback, from_tls_auth_config},
serde::{default_decoding, default_framing_message_based},
sources::{
Source,
Expand Down Expand Up @@ -319,8 +319,10 @@ impl TryFrom<&NatsSourceConfig> for async_nats::ConnectOptions {
type Error = NatsConfigError;

fn try_from(config: &NatsSourceConfig) -> Result<Self, Self::Error> {
from_tls_auth_config(&config.connection_name, &config.auth, &config.tls)
.map(|options| options.subscription_capacity(config.subscriber_capacity))
from_tls_auth_config(&config.connection_name, &config.auth, &config.tls).map(|options| {
let options = apply_event_callback(options, config.connection_name.clone());
options.subscription_capacity(config.subscriber_capacity)
})
}
}

Expand Down
133 changes: 133 additions & 0 deletions src/sources/nats/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
fn generate_source_config(url: &str, subject: &str) -> NatsSourceConfig {
NatsSourceConfig {
url: url.to_string(),
connection_name: format!("vector-test-{}", subject),
subject: subject.to_string(),
framing: default_framing_message_based(),
decoding: default_decoding(),
Expand Down Expand Up @@ -593,3 +594,135 @@ async fn nats_shutdown_drain_messages() {
source_handle.await.unwrap().expect("Source task failed");
shutdown_done.await;
}

#[tokio::test]
async fn nats_slow_consumer_event() {
use futures::StreamExt;
use tokio::time::{Duration, sleep};

vector_lib::metrics::init_test();

let subject = format!("test-slow-consumer-{}", random_string(10));
let url =
std::env::var("NATS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222"));

let mut conf = generate_source_config(&url, &subject);
// Set subscriber capacity to 1 to trigger slow consumer events quickly
conf.subscriber_capacity = 1;

let (nc, sub) = create_subscription(&conf).await.unwrap();
let nc_pub = nc.clone();
let (tx, mut rx) = SourceSender::new_test();
let decoder = DecodingConfig::new(
conf.framing.clone(),
conf.decoding.clone(),
LogNamespace::Legacy,
)
.build()
.unwrap();

let (shutdown_trigger, shutdown_signal, shutdown_done) = ShutdownSignal::new_wired();

let source_handle = tokio::spawn(run_nats_core(
conf.clone(),
nc,
sub,
decoder,
LogNamespace::Legacy,
shutdown_signal,
tx,
));

// Publish many messages rapidly to overwhelm the subscriber buffer (capacity = 1)
for i in 0..100 {
nc_pub
.publish(subject.clone(), Bytes::from(format!("msg{}", i)))
.await
.unwrap();
}
nc_pub.flush().await.unwrap();

// Give the source time to process and trigger slow consumer events
sleep(Duration::from_millis(200)).await;

// Drain messages from the receiver with a timeout to prevent blocking
let mut message_count = 0;
let drain_timeout = Duration::from_secs(2);
let start = tokio::time::Instant::now();

while message_count < 50 && start.elapsed() < drain_timeout {
if tokio::time::timeout(Duration::from_millis(50), rx.next())
.await
.is_ok()
{
message_count += 1;
} else {
break;
}
}

// Check that the slow consumer metric was incremented
let controller = vector_lib::metrics::Controller::get()
.expect("Metrics controller should be initialized");
let metrics = controller.capture_metrics();
let slow_consumer_metric = metrics
.iter()
.find(|m| m.name() == "nats_slow_consumer_events_total");

assert!(
slow_consumer_metric.is_some(),
"Expected nats_slow_consumer_events_total metric to be emitted"
);

if let Some(metric) = slow_consumer_metric {
// Check the counter value
let value = match metric.value() {
vector_lib::event::MetricValue::Counter { value } => value,
_ => panic!("Expected counter metric"),
};
assert!(
*value > 0.0,
"Expected nats_slow_consumer_events_total to be > 0, got {}",
value
);
println!("Slow consumer events recorded: {}", value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove println!s statements from tests.


// Verify component_id label is present and correct
let tags = metric.tags().expect("Metric should have tags");
let component_id_tag = tags.get("component_id");
assert!(
component_id_tag.is_some(),
"Expected component_id tag in metric"
);
assert_eq!(
component_id_tag.unwrap(),
&conf.connection_name,
"component_id should match connection_name"
);
println!(
"Component ID correctly set to: {}",
component_id_tag.unwrap()
);

// Verify subscription_id label is present
let subscription_id_tag = tags.get("subscription_id");
assert!(
subscription_id_tag.is_some(),
"Expected subscription_id tag in metric"
);
println!(
"Subscription ID captured: {}",
subscription_id_tag.unwrap()
);
}

// Cleanup with timeout to prevent test from hanging
shutdown_trigger.cancel();

tokio::time::timeout(Duration::from_secs(5), async {
source_handle.await.unwrap().expect("Source task failed");
shutdown_done.await;
})
.await
.expect("Test timed out during cleanup");
}
Loading