Skip to content

Dogstatds: Enable consolidating different metrics in a single payload #561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions metrics-exporter-dogstatsd/src/forwarder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,6 @@ pub(crate) struct ForwarderConfiguration {
pub write_timeout: Duration,
}

impl ForwarderConfiguration {
/// Returns `true` if the remote address requires a length prefix to be sent before each payload.
pub fn is_length_prefixed(&self) -> bool {
match self.remote_addr {
RemoteAddr::Udp(_) => false,
#[cfg(target_os = "linux")]
RemoteAddr::Unix(_) => true,
#[cfg(target_os = "linux")]
RemoteAddr::Unixgram(_) => true,
}
}
}

#[cfg(test)]
mod tests {
use std::net::SocketAddrV4;
Expand Down
18 changes: 12 additions & 6 deletions metrics-exporter-dogstatsd/src/forwarder/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,17 @@ impl Client {
Client::Unixgram(socket) => socket.send(buf),

#[cfg(target_os = "linux")]
Client::Unix(socket) => match socket.write_all(buf) {
Ok(()) => Ok(buf.len()),
Err(e) => Err(e),
},
Client::Unix(socket) => {
match u32::try_from(buf.len()) {
Ok(len) => socket.write_all(&len.to_be_bytes())?,
Err(e) => {
use std::io::{Error, ErrorKind};
return Err(Error::new(ErrorKind::InvalidData, e));
}
}

socket.write_all(buf).map(|()| buf.len())
}
}
}
}
Expand Down Expand Up @@ -142,8 +149,7 @@ impl Forwarder {
/// Run the forwarder, sending out payloads to the configured remote address at the configured interval.
pub fn run(mut self) {
let mut flush_state = FlushState::default();
let mut writer =
PayloadWriter::new(self.config.max_payload_len, self.config.is_length_prefixed());
let mut writer = PayloadWriter::new(self.config.max_payload_len);
let mut telemetry_update = TelemetryUpdate::default();

let mut next_flush = Instant::now() + self.config.flush_interval;
Expand Down
102 changes: 20 additions & 82 deletions metrics-exporter-dogstatsd/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,19 @@ pub(super) struct PayloadWriter {
buf: Vec<u8>,
trailer_buf: Vec<u8>,
offsets: Vec<usize>,
with_length_prefix: bool,
}

impl PayloadWriter {
/// Creates a new `PayloadWriter` with the given maximum payload length.
pub fn new(max_payload_len: usize, with_length_prefix: bool) -> Self {
pub fn new(max_payload_len: usize) -> Self {
// NOTE: This should also be handled in the builder, but we want to just double check here that we're getting a
// properly sanitized value.
assert!(
u32::try_from(max_payload_len).is_ok(),
"maximum payload length must be less than 2^32 bytes"
);

let mut writer = Self {
max_payload_len,
buf: Vec::new(),
trailer_buf: Vec::new(),
offsets: Vec::new(),
with_length_prefix,
};

writer.prepare_for_write();
writer
Self { max_payload_len, buf: Vec::new(), trailer_buf: Vec::new(), offsets: Vec::new() }
}

fn last_offset(&self) -> usize {
Expand All @@ -80,21 +70,10 @@ impl PayloadWriter {
//
// If there aren't any committed metrics, then the last offset is simply zero.
let last_offset = self.last_offset();
let maybe_length_prefix_len = if self.with_length_prefix { 4 } else { 0 };
self.buf.len() - last_offset - maybe_length_prefix_len
}

fn prepare_for_write(&mut self) {
if self.with_length_prefix {
// If we're adding length prefixes, we need to write the length of the payload first.
//
// We write a dummy length of zero for now, and then we'll go back and fill it in later.
self.buf.extend_from_slice(&[0, 0, 0, 0]);
}
self.buf.len() - last_offset
}

fn commit(&mut self) -> bool {
let current_last_offset = self.last_offset();
let current_len = self.current_len();
if current_len > self.max_payload_len {
// If the current metric is too long, we need to truncate everything we just wrote to get us back to the end
Expand All @@ -104,22 +83,20 @@ impl PayloadWriter {
return false;
}

// Track the new offset.
self.offsets.push(self.buf.len());

// If we're dealing with length-delimited payloads, go back to the beginning of this payload and fill in the
// length of it.
if self.with_length_prefix {
// NOTE: We unwrap the conversion here because we know that `self.max_payload_len` is less than 2^32, and we
// check above that `current_len` is less than or equal to `self.max_payload_len`.
let current_len_buf = u32::try_from(current_len).unwrap().to_le_bytes();
self.buf[current_last_offset..current_last_offset + 4]
.copy_from_slice(&current_len_buf[..]);
// Offset update
if current_len + self.last_offset() <= self.max_payload_len {
// If the current metric can be written within the max_payload_len
// replace the last offset (if there is valid offset)
if let Some(last_offset) = self.offsets.last_mut() {
*last_offset = self.buf.len();
} else {
self.offsets.push(self.buf.len());
}
} else {
// - else add a new offset to send current metric in a new Packet
self.offsets.push(self.buf.len());
}

// Initialize the buffer for the next payload.
self.prepare_for_write();

true
}

Expand Down Expand Up @@ -542,7 +519,7 @@ mod tests {
];

for (key, value, ts, prefix, global_labels, expected) in cases {
let mut writer = PayloadWriter::new(8192, false);
let mut writer = PayloadWriter::new(8192);
let result = writer.write_counter(&key, value, ts, prefix, global_labels);
assert_eq!(result.payloads_written(), 1);

Expand Down Expand Up @@ -607,7 +584,7 @@ mod tests {
];

for (key, value, ts, prefix, global_labels, expected) in cases {
let mut writer = PayloadWriter::new(8192, false);
let mut writer = PayloadWriter::new(8192);
let result = writer.write_gauge(&key, value, ts, prefix, global_labels);
assert_eq!(result.payloads_written(), 1);

Expand Down Expand Up @@ -666,7 +643,7 @@ mod tests {
];

for (key, values, prefix, global_labels, expected) in cases {
let mut writer = PayloadWriter::new(8192, false);
let mut writer = PayloadWriter::new(8192);
let result =
writer.write_histogram(&key, values.iter().copied(), None, prefix, global_labels);
assert_eq!(result.payloads_written(), 1);
Expand Down Expand Up @@ -726,7 +703,7 @@ mod tests {
];

for (key, values, prefix, global_labels, expected) in cases {
let mut writer = PayloadWriter::new(8192, false);
let mut writer = PayloadWriter::new(8192);
let result = writer.write_distribution(
&key,
values.iter().copied(),
Expand All @@ -741,51 +718,12 @@ mod tests {
}
}

#[test]
fn length_prefix() {
let prefixed = |buf: &str| {
let mut prefixed_buf = Vec::with_capacity(buf.len() + 4);
prefixed_buf.extend_from_slice(&(buf.len() as u32).to_le_bytes());
prefixed_buf.extend_from_slice(buf.as_bytes());
prefixed_buf
};

// Cases are defined as: metric key, metric values, metric timestamp, expected output.
let cases = [
(Key::from("test_distribution"), &[22.22][..], prefixed("test_distribution:22.22|d\n")),
(
Key::from_parts("test_distribution", &[("foo", "bar"), ("baz", "quux")]),
&[88.0][..],
prefixed("test_distribution:88.0|d|#foo:bar,baz:quux\n"),
),
(
Key::from("test_distribution"),
&[22.22, 33.33, 44.44][..],
prefixed("test_distribution:22.22:33.33:44.44|d\n"),
),
(
Key::from_parts("test_distribution", &[("foo", "bar"), ("baz", "quux")]),
&[88.0, 66.6, 123.4][..],
prefixed("test_distribution:88.0:66.6:123.4|d|#foo:bar,baz:quux\n"),
),
];

for (key, values, expected) in cases {
let mut writer = PayloadWriter::new(8192, true);
let result = writer.write_distribution(&key, values.iter().copied(), None, None, &[]);
assert_eq!(result.payloads_written(), 1);

let actual = buf_from_writer(&mut writer);
assert_eq!(actual, expected);
}
}

proptest! {
#[test]
fn property_test_gauntlet(payload_limit in 0..16384usize, inputs in arb_vec(arb_metric(), 1..128)) {
// TODO: Parameterize reservoir size so we can exercise the sample rate stuff.[]

let mut writer = PayloadWriter::new(payload_limit, false);
let mut writer = PayloadWriter::new(payload_limit);
let mut total_input_points: u64 = 0;
let mut payloads_written = 0;
let mut points_dropped = 0;
Expand Down