From c7166eeba56e0146dadf462e10ed900de039805a Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski Date: Thu, 14 Aug 2025 18:17:34 +0200 Subject: [PATCH 1/2] processor: log_sampling: add new processor for rate limiting Add log_sampling processor with three window strategies: - Fixed window: samples first N logs per time window - Sliding window: maintains rolling rate limit - Exponential decay: reduces sampling rate over time Signed-off-by: Jorge Niedbalski --- cmake/plugins_options.cmake | 1 + plugins/CMakeLists.txt | 1 + plugins/processor_log_sampling/CMakeLists.txt | 1 + plugins/processor_log_sampling/log_sampling.c | 381 ++++++++++++++++++ plugins/processor_log_sampling/log_sampling.h | 89 ++++ 5 files changed, 473 insertions(+) create mode 100644 plugins/processor_log_sampling/CMakeLists.txt create mode 100644 plugins/processor_log_sampling/log_sampling.c create mode 100644 plugins/processor_log_sampling/log_sampling.h diff --git a/cmake/plugins_options.cmake b/cmake/plugins_options.cmake index 703073ff3f8..86db1040c0c 100644 --- a/cmake/plugins_options.cmake +++ b/cmake/plugins_options.cmake @@ -70,6 +70,7 @@ DEFINE_OPTION(FLB_IN_EBPF "Enable Linux eBPF input plugin" # ========== DEFINE_OPTION(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" ON) DEFINE_OPTION(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" ON) +DEFINE_OPTION(FLB_PROCESSOR_LOG_SAMPLING "Enable log sampling processor" ON) DEFINE_OPTION(FLB_PROCESSOR_METRICS_SELECTOR "Enable metrics selector processor" ON) DEFINE_OPTION(FLB_PROCESSOR_OPENTELEMETRY_ENVELOPE "Enable OpenTelemetry envelope processor" ON) DEFINE_OPTION(FLB_PROCESSOR_SQL "Enable SQL processor" ON) diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 65d417d7cbb..07ed4920ca7 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -345,6 +345,7 @@ REGISTER_IN_PLUGIN("in_random") # ========== REGISTER_PROCESSOR_PLUGIN("processor_content_modifier") REGISTER_PROCESSOR_PLUGIN("processor_labels") +REGISTER_PROCESSOR_PLUGIN("processor_log_sampling") REGISTER_PROCESSOR_PLUGIN("processor_metrics_selector") REGISTER_PROCESSOR_PLUGIN("processor_opentelemetry_envelope") REGISTER_PROCESSOR_PLUGIN("processor_sampling") diff --git a/plugins/processor_log_sampling/CMakeLists.txt b/plugins/processor_log_sampling/CMakeLists.txt new file mode 100644 index 00000000000..8e85f4670f2 --- /dev/null +++ b/plugins/processor_log_sampling/CMakeLists.txt @@ -0,0 +1 @@ +FLB_PLUGIN(processor_log_sampling "log_sampling.c" "") \ No newline at end of file diff --git a/plugins/processor_log_sampling/log_sampling.c b/plugins/processor_log_sampling/log_sampling.c new file mode 100644 index 00000000000..8cb714dad8f --- /dev/null +++ b/plugins/processor_log_sampling/log_sampling.c @@ -0,0 +1,381 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "log_sampling.h" + +static int parse_window_type(const char *str) +{ + if (strcasecmp(str, LOG_SAMPLING_WINDOW_TYPE_FIXED) == 0) { + return LOG_SAMPLING_WINDOW_FIXED; + } + else if (strcasecmp(str, LOG_SAMPLING_WINDOW_TYPE_SLIDING) == 0) { + return LOG_SAMPLING_WINDOW_SLIDING; + } + else if (strcasecmp(str, LOG_SAMPLING_WINDOW_TYPE_EXPONENTIAL) == 0) { + return LOG_SAMPLING_WINDOW_EXPONENTIAL; + } + return -1; +} + +FLB_EXPORT int flb_sampling_fixed_window(struct sampling_state *state, + time_t current_time, + int window_size, + int max_logs_per_window) +{ + /* Check if we need to start a new window */ + if (current_time >= state->window_start + window_size) { + state->window_start = (current_time / window_size) * window_size; + state->current_window_count = 0; + } + + /* Sample if under limit */ + if (state->current_window_count < max_logs_per_window) { + state->current_window_count++; + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static int should_sample_fixed_window(struct log_sampling_ctx *ctx, struct flb_time *tm) +{ + time_t current_time = tm->tm.tv_sec; + int result; + + result = flb_sampling_fixed_window(&ctx->state, current_time, + ctx->window_size, ctx->max_logs_per_window); + + if (result == FLB_TRUE) { + flb_plg_trace(ctx->ins, "fixed window: sampled log %d/%d in window", + ctx->state.current_window_count, ctx->max_logs_per_window); + } else { + flb_plg_trace(ctx->ins, "fixed window: dropped log, window full (%d/%d)", + ctx->state.current_window_count, ctx->max_logs_per_window); + } + + return result; +} + +FLB_EXPORT int flb_sampling_sliding_window(struct sampling_state *state, + time_t current_time, + int window_size, + int max_logs_per_window) +{ + int bucket_index = current_time % state->bucket_count; + time_t cutoff_time = current_time - window_size; + int total_in_window = 0; + int i; + + /* Clear old buckets */ + for (i = 0; i < state->bucket_count; i++) { + if (state->buckets[i].timestamp < cutoff_time) { + state->buckets[i].count = 0; + state->buckets[i].timestamp = 0; + } + total_in_window += state->buckets[i].count; + } + + /* Sample if under limit */ + if (total_in_window < max_logs_per_window) { + state->buckets[bucket_index].timestamp = current_time; + state->buckets[bucket_index].count++; + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static int should_sample_sliding_window(struct log_sampling_ctx *ctx, struct flb_time *tm) +{ + time_t current_time = tm->tm.tv_sec; + int result; + + result = flb_sampling_sliding_window(&ctx->state, current_time, + ctx->window_size, ctx->max_logs_per_window); + + if (result == FLB_TRUE) { + int bucket_index = current_time % ctx->state.bucket_count; + flb_plg_trace(ctx->ins, "sliding window: sampled log, bucket[%d] count=%d", + bucket_index, ctx->state.buckets[bucket_index].count); + } else { + flb_plg_trace(ctx->ins, "sliding window: dropped log, window full"); + } + + return result; +} + +FLB_EXPORT int flb_sampling_exponential(time_t window_start, + time_t current_time, + double base_rate, + double decay_factor, + int decay_interval) +{ + double current_rate; + double random_value; + int intervals_passed; + + /* Calculate time-based decay from start time */ + intervals_passed = (current_time - window_start) / decay_interval; + + if (intervals_passed == 0) { + current_rate = base_rate; + } + else { + current_rate = base_rate * pow(decay_factor, intervals_passed); + /* Cap at minimum rate */ + if (current_rate < 0.001) { + current_rate = 0.001; + } + } + + /* Random sampling with computed rate */ + random_value = (double)rand() / RAND_MAX; + return (random_value < current_rate) ? FLB_TRUE : FLB_FALSE; +} + +static int should_sample_exponential(struct log_sampling_ctx *ctx, struct flb_time *tm) +{ + time_t current_time = tm->tm.tv_sec; + int result; + + result = flb_sampling_exponential(ctx->state.window_start, current_time, + ctx->decay_base_rate, ctx->decay_factor, + ctx->decay_interval); + + if (result == FLB_TRUE) { + flb_plg_trace(ctx->ins, "exponential: sampled log"); + } else { + flb_plg_trace(ctx->ins, "exponential: dropped log"); + } + + return result; +} + +static int cb_log_sampling_init(struct flb_processor_instance *ins, + void *source_plugin_instance, + int source_plugin_type, + struct flb_config *config) +{ + struct log_sampling_ctx *ctx; + const char *window_type_str; + int ret; + + ctx = flb_calloc(1, sizeof(struct log_sampling_ctx)); + if (!ctx) { + flb_errno(); + return FLB_PROCESSOR_FAILURE; + } + + ctx->ins = ins; + + /* Load configuration */ + ret = flb_processor_instance_config_map_set(ins, ctx); + if (ret < 0) { + flb_free(ctx); + return FLB_PROCESSOR_FAILURE; + } + + /* Parse window type */ + window_type_str = flb_processor_instance_get_property("window_type", ins); + if (window_type_str) { + ctx->window_type = parse_window_type(window_type_str); + if (ctx->window_type == -1) { + flb_plg_error(ins, "invalid window_type: %s", window_type_str); + flb_free(ctx); + return FLB_PROCESSOR_FAILURE; + } + } + else { + ctx->window_type = LOG_SAMPLING_WINDOW_FIXED; + } + + /* Initialize window state */ + if (ctx->window_type == LOG_SAMPLING_WINDOW_SLIDING) { + ctx->state.bucket_count = ctx->window_size; + ctx->state.buckets = flb_calloc(ctx->state.bucket_count, + sizeof(*ctx->state.buckets)); + if (!ctx->state.buckets) { + flb_errno(); + flb_free(ctx); + return FLB_PROCESSOR_FAILURE; + } + flb_plg_info(ins, "initialized sliding window with %d buckets", ctx->state.bucket_count); + } + + ctx->state.window_start = time(NULL); + + ins->context = ctx; + + /* Log configuration */ + flb_plg_info(ins, "log_sampling processor initialized:"); + flb_plg_info(ins, " window_type: %s", + ctx->window_type == LOG_SAMPLING_WINDOW_FIXED ? LOG_SAMPLING_WINDOW_TYPE_FIXED : + ctx->window_type == LOG_SAMPLING_WINDOW_SLIDING ? LOG_SAMPLING_WINDOW_TYPE_SLIDING : + LOG_SAMPLING_WINDOW_TYPE_EXPONENTIAL); + flb_plg_info(ins, " window_size: %d seconds", ctx->window_size); + flb_plg_info(ins, " max_logs_per_window: %d", ctx->max_logs_per_window); + if (ctx->window_type == LOG_SAMPLING_WINDOW_EXPONENTIAL) { + flb_plg_info(ins, " decay_base_rate: %.2f", ctx->decay_base_rate); + flb_plg_info(ins, " decay_factor: %.2f", ctx->decay_factor); + flb_plg_info(ins, " decay_interval: %d seconds", ctx->decay_interval); + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int cb_log_sampling_process_logs(struct flb_processor_instance *ins, + void *chunk_data, + const char *tag, int tag_len) +{ + struct log_sampling_ctx *ctx = ins->context; + struct flb_mp_chunk_cobj *chunk_cobj; + struct flb_mp_chunk_record *record; + struct flb_time timestamp; + int should_sample; + size_t total_records = 0; + size_t sampled_records = 0; + + chunk_cobj = (struct flb_mp_chunk_cobj *) chunk_data; + + /* Process each log event */ + while (flb_mp_chunk_cobj_record_next(chunk_cobj, &record) == FLB_MP_CHUNK_RECORD_OK) { + total_records++; + ctx->state.total_logs_seen++; + + /* Get timestamp from the record's event */ + timestamp = record->event.timestamp; + + /* Apply window-based sampling */ + switch (ctx->window_type) { + case LOG_SAMPLING_WINDOW_FIXED: + should_sample = should_sample_fixed_window(ctx, ×tamp); + break; + case LOG_SAMPLING_WINDOW_SLIDING: + should_sample = should_sample_sliding_window(ctx, ×tamp); + break; + case LOG_SAMPLING_WINDOW_EXPONENTIAL: + should_sample = should_sample_exponential(ctx, ×tamp); + break; + default: + should_sample = FLB_TRUE; + } + + if (!should_sample) { + /* Remove this record */ + flb_mp_chunk_cobj_record_destroy(chunk_cobj, record); + } + else { + ctx->state.total_logs_sampled++; + sampled_records++; + } + } + + /* Log sampling statistics at trace level */ + flb_plg_trace(ins, "sampling stats: seen=%" PRIu64 " sampled=%" PRIu64 " rate=%.2f%%", + ctx->state.total_logs_seen, + ctx->state.total_logs_sampled, + ctx->state.total_logs_seen > 0 ? + (double)ctx->state.total_logs_sampled / ctx->state.total_logs_seen * 100.0 : 0.0); + + if (total_records > 0) { + flb_plg_debug(ins, "processed %zu records: sampled=%zu, dropped=%zu (%.1f%% kept)", + total_records, sampled_records, total_records - sampled_records, + (double)sampled_records / total_records * 100.0); + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int cb_log_sampling_exit(struct flb_processor_instance *ins, void *data) +{ + struct log_sampling_ctx *ctx = data; + + if (!ctx) { + return FLB_PROCESSOR_SUCCESS; + } + + if (ctx->state.buckets) { + flb_free(ctx->state.buckets); + } + + flb_free(ctx); + + return FLB_PROCESSOR_SUCCESS; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "window_type", LOG_SAMPLING_WINDOW_TYPE_FIXED, + 0, FLB_TRUE, offsetof(struct log_sampling_ctx, window_type), + "Window type: fixed, sliding, or exponential" + }, + { + FLB_CONFIG_MAP_INT, "window_size", "60", + 0, FLB_TRUE, offsetof(struct log_sampling_ctx, window_size), + "Window size in seconds" + }, + { + FLB_CONFIG_MAP_INT, "max_logs_per_window", "1000", + 0, FLB_TRUE, offsetof(struct log_sampling_ctx, max_logs_per_window), + "Maximum logs to keep per window" + }, + { + FLB_CONFIG_MAP_DOUBLE, "decay_base_rate", "0.1", + 0, FLB_TRUE, offsetof(struct log_sampling_ctx, decay_base_rate), + "Base sampling rate for exponential decay (0.0-1.0)" + }, + { + FLB_CONFIG_MAP_DOUBLE, "decay_factor", "0.95", + 0, FLB_TRUE, offsetof(struct log_sampling_ctx, decay_factor), + "Decay factor per interval" + }, + { + FLB_CONFIG_MAP_INT, "decay_interval", "60", + 0, FLB_TRUE, offsetof(struct log_sampling_ctx, decay_interval), + "Decay interval in seconds" + }, + /* EOF */ + {0} +}; + +struct flb_processor_plugin processor_log_sampling_plugin = { + .name = "log_sampling", + .description = "Log sampling based on window strategies", + .cb_init = cb_log_sampling_init, + .cb_process_logs = cb_log_sampling_process_logs, + .cb_process_metrics = NULL, + .cb_process_traces = NULL, + .cb_exit = cb_log_sampling_exit, + .config_map = config_map, + .flags = 0 +}; \ No newline at end of file diff --git a/plugins/processor_log_sampling/log_sampling.h b/plugins/processor_log_sampling/log_sampling.h new file mode 100644 index 00000000000..11607a53ca7 --- /dev/null +++ b/plugins/processor_log_sampling/log_sampling.h @@ -0,0 +1,89 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_PROCESSOR_LOG_SAMPLING_H +#define FLB_PROCESSOR_LOG_SAMPLING_H + +#include +#include +#include + + +/* Window type constants */ +#define LOG_SAMPLING_WINDOW_TYPE_FIXED "fixed" +#define LOG_SAMPLING_WINDOW_TYPE_SLIDING "sliding" +#define LOG_SAMPLING_WINDOW_TYPE_EXPONENTIAL "exponential" + +enum log_sampling_window_type { + LOG_SAMPLING_WINDOW_FIXED, + LOG_SAMPLING_WINDOW_SLIDING, + LOG_SAMPLING_WINDOW_EXPONENTIAL +}; + +struct log_sampling_ctx { + /* Configuration */ + enum log_sampling_window_type window_type; + int window_size; /* Window size in seconds */ + int max_logs_per_window; /* Maximum logs to keep per window */ + + /* Exponential decay settings */ + double decay_base_rate; /* Base sampling rate (0.0-1.0) */ + double decay_factor; /* Decay factor per interval */ + int decay_interval; /* Decay interval in seconds */ + + /* Window state for runtime */ + struct sampling_state { + /* Fixed window state */ + time_t window_start; + int current_window_count; + + /* Sliding window state */ + struct { + time_t timestamp; + int count; + } *buckets; + int bucket_count; + + /* Statistics */ + uint64_t total_logs_seen; + uint64_t total_logs_sampled; + } state; + + /* Fluent Bit context */ + struct flb_processor_instance *ins; +}; + +/* Exported sampling functions */ +FLB_EXPORT int flb_sampling_fixed_window(struct sampling_state *state, + time_t current_time, + int window_size, + int max_logs_per_window); + +FLB_EXPORT int flb_sampling_sliding_window(struct sampling_state *state, + time_t current_time, + int window_size, + int max_logs_per_window); + +FLB_EXPORT int flb_sampling_exponential(time_t window_start, + time_t current_time, + double base_rate, + double decay_factor, + int decay_interval); + +#endif \ No newline at end of file From 3ff1e70ab9c77f4f843f77164db006b52b7958ce Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski Date: Thu, 14 Aug 2025 18:18:30 +0200 Subject: [PATCH 2/2] tests: internal: add unit tests for log_sampling processor Add unit tests for sampling algorithms: - Fixed window sampling and window resets - Sliding window with bucket expiration - Exponential decay rate calculations - Edge cases and boundary conditions Signed-off-by: Jorge Niedbalski --- tests/internal/CMakeLists.txt | 7 + tests/internal/log_sampling.c | 241 ++++++++++++++++++++++++++++++++++ 2 files changed, 248 insertions(+) create mode 100644 tests/internal/log_sampling.c diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index f71725c856a..03bf93f8766 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -153,6 +153,13 @@ if(FLB_LUAJIT) ) endif() +if(FLB_PROCESSOR_LOG_SAMPLING) + set(UNIT_TESTS_FILES + ${UNIT_TESTS_FILES} + log_sampling.c + ) +endif() + set(UNIT_TESTS_DATA data/tls/certificate.pem data/tls/private_key.pem diff --git a/tests/internal/log_sampling.c b/tests/internal/log_sampling.c new file mode 100644 index 00000000000..3f71c29c0e3 --- /dev/null +++ b/tests/internal/log_sampling.c @@ -0,0 +1,241 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "flb_tests_internal.h" + +/* Import the sampling functions from the processor plugin */ +#include "../../plugins/processor_log_sampling/log_sampling.h" + +static void test_fixed_window_basic() +{ + struct sampling_state state = {0}; + int result; + int i; + int sampled_count = 0; + time_t current_time = 1000; /* Start time */ + int window_size = 60; + int max_logs = 5; + + /* Initialize state */ + state.window_start = current_time; + state.current_window_count = 0; + + /* Test: First 5 logs should be sampled */ + for (i = 0; i < 10; i++) { + result = flb_sampling_fixed_window(&state, current_time, window_size, max_logs); + if (result == FLB_TRUE) { + sampled_count++; + } + } + + TEST_CHECK(sampled_count == 5); + TEST_MSG("Fixed window: sampled %d out of 10 (expected 5)", sampled_count); + + /* Test: New window should reset the count */ + current_time += window_size + 1; /* Move to next window */ + sampled_count = 0; + + for (i = 0; i < 3; i++) { + result = flb_sampling_fixed_window(&state, current_time, window_size, max_logs); + if (result == FLB_TRUE) { + sampled_count++; + } + } + + TEST_CHECK(sampled_count == 3); + TEST_MSG("Fixed window (new): sampled %d out of 3 (expected 3)", sampled_count); +} + +static void test_sliding_window_basic() +{ + struct sampling_state state = {0}; + int result; + int i; + int sampled_count = 0; + time_t current_time = 1000; + int window_size = 10; + int max_logs = 5; + + /* Initialize sliding window buckets */ + state.bucket_count = window_size; + state.buckets = flb_calloc(state.bucket_count, sizeof(*state.buckets)); + TEST_CHECK(state.buckets != NULL); + + /* Test: First 5 logs should be sampled */ + for (i = 0; i < 7; i++) { + result = flb_sampling_sliding_window(&state, current_time, window_size, max_logs); + if (result == FLB_TRUE) { + sampled_count++; + } + /* Advance time slightly to distribute across buckets */ + if (i % 2 == 0) { + current_time++; + } + } + + TEST_CHECK(sampled_count == 5); + TEST_MSG("Sliding window: sampled %d out of 7 (expected 5)", sampled_count); + + /* Test: Old entries should expire */ + current_time += window_size + 2; /* Move past window */ + sampled_count = 0; + + for (i = 0; i < 3; i++) { + result = flb_sampling_sliding_window(&state, current_time, window_size, max_logs); + if (result == FLB_TRUE) { + sampled_count++; + } + } + + TEST_CHECK(sampled_count == 3); + TEST_MSG("Sliding window (after expiry): sampled %d out of 3 (expected 3)", sampled_count); + + flb_free(state.buckets); +} + +static void test_exponential_decay_basic() +{ + int result; + int i; + int sampled_count; + time_t window_start = 1000; + time_t current_time = 1000; + double base_rate = 1.0; /* 100% initially */ + double decay_factor = 0.5; /* 50% reduction per interval */ + int decay_interval = 10; + + /* Seed random for reproducible tests */ + srand(12345); + + /* Test: First interval should sample most logs (rate=1.0) */ + sampled_count = 0; + for (i = 0; i < 100; i++) { + result = flb_sampling_exponential(window_start, current_time, + base_rate, decay_factor, decay_interval); + if (result == FLB_TRUE) { + sampled_count++; + } + } + + /* With rate=1.0, should sample all or nearly all */ + TEST_CHECK(sampled_count >= 95); + TEST_MSG("Exponential (interval 0): sampled %d out of 100 (expected >= 95)", sampled_count); + + /* Test: Second interval should sample about 50% */ + current_time = window_start + decay_interval; + sampled_count = 0; + + for (i = 0; i < 100; i++) { + result = flb_sampling_exponential(window_start, current_time, + base_rate, decay_factor, decay_interval); + if (result == FLB_TRUE) { + sampled_count++; + } + } + + /* With rate=0.5, should sample roughly 40-60% */ + TEST_CHECK(sampled_count >= 40 && sampled_count <= 60); + TEST_MSG("Exponential (interval 1): sampled %d out of 100 (expected 40-60)", sampled_count); + + /* Test: Third interval should sample about 25% */ + current_time = window_start + (2 * decay_interval); + sampled_count = 0; + + for (i = 0; i < 100; i++) { + result = flb_sampling_exponential(window_start, current_time, + base_rate, decay_factor, decay_interval); + if (result == FLB_TRUE) { + sampled_count++; + } + } + + /* With rate=0.25, should sample roughly 20-30% */ + TEST_CHECK(sampled_count >= 15 && sampled_count <= 35); + TEST_MSG("Exponential (interval 2): sampled %d out of 100 (expected 15-35)", sampled_count); +} + +static void test_fixed_window_edge_cases() +{ + struct sampling_state state = {0}; + int result; + time_t current_time = 1000; + int window_size = 60; + int max_logs = 0; /* Edge case: no logs allowed */ + + /* Test: max_logs = 0 should reject all */ + state.window_start = current_time; + state.current_window_count = 0; + + result = flb_sampling_fixed_window(&state, current_time, window_size, max_logs); + TEST_CHECK(result == FLB_FALSE); + + /* Test: Window boundary */ + max_logs = 1; + state.current_window_count = 0; + + result = flb_sampling_fixed_window(&state, current_time, window_size, max_logs); + TEST_CHECK(result == FLB_TRUE); + + /* Move to exact window boundary */ + current_time = state.window_start + window_size; + result = flb_sampling_fixed_window(&state, current_time, window_size, max_logs); + TEST_CHECK(result == FLB_TRUE); /* New window should allow */ + TEST_CHECK(state.current_window_count == 1); +} + +static void test_sliding_window_edge_cases() +{ + struct sampling_state state = {0}; + int result; + time_t current_time = 1000; + int window_size = 1; /* Minimal window */ + int max_logs = 1; + + /* Initialize with minimal bucket */ + state.bucket_count = 1; + state.buckets = flb_calloc(1, sizeof(*state.buckets)); + TEST_CHECK(state.buckets != NULL); + + /* Test: Single bucket behavior */ + result = flb_sampling_sliding_window(&state, current_time, window_size, max_logs); + TEST_CHECK(result == FLB_TRUE); + + result = flb_sampling_sliding_window(&state, current_time, window_size, max_logs); + TEST_CHECK(result == FLB_FALSE); /* Already at limit */ + + /* Test: Bucket expiry */ + current_time += window_size + 1; + result = flb_sampling_sliding_window(&state, current_time, window_size, max_logs); + TEST_CHECK(result == FLB_TRUE); /* Old bucket should be expired */ + + flb_free(state.buckets); +} + +TEST_LIST = { + { "fixed_window_basic", test_fixed_window_basic }, + { "sliding_window_basic", test_sliding_window_basic }, + { "exponential_decay_basic", test_exponential_decay_basic }, + { "fixed_window_edge_cases", test_fixed_window_edge_cases }, + { "sliding_window_edge_cases", test_sliding_window_edge_cases }, + { 0 } +}; \ No newline at end of file