Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/fluent-bit/config_format/flb_cf.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ struct cfl_variant *flb_cf_section_property_add(struct flb_cf *cf,
char *k_buf, size_t k_len,
char *v_buf, size_t v_len);

struct cfl_variant *flb_cf_section_property_add_variant(struct flb_cf *cf,
struct cfl_kvlist *kv_list,
char *k_buf, size_t k_len,
struct cfl_variant *variant);

struct cfl_array *flb_cf_section_property_add_list(struct flb_cf *cf,
struct cfl_kvlist *kv_list,
char *k_buf, size_t k_len);
Expand Down
4 changes: 4 additions & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_task_map.h>
#include <cfl/cfl.h>

#include <monkey/mk_core.h>

Expand Down Expand Up @@ -325,6 +326,9 @@ struct flb_config {
int json_escape_unicode;

int dry_run;

/* New Router Configuration */
struct cfl_list input_routes;
};

#define FLB_CONFIG_LOG_LEVEL(c) (c->log->level)
Expand Down
83 changes: 82 additions & 1 deletion include/fluent-bit/flb_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

/* Fluent Bit
* ==========
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Copyright (C) 2015-2025 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,11 @@
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_event.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_config.h>
#include <cfl/cfl.h>
#include <monkey/mk_core.h>

struct flb_router_path {
struct flb_output_instance *ins;
Expand Down Expand Up @@ -56,6 +61,61 @@ static inline int flb_router_match_type(int in_event_type,
return FLB_TRUE;
}

enum flb_router_signal {
FLB_ROUTER_SIGNAL_LOGS = (1U << 0),
FLB_ROUTER_SIGNAL_METRICS = (1U << 1),
FLB_ROUTER_SIGNAL_TRACES = (1U << 2),
FLB_ROUTER_SIGNAL_ANY = (FLB_ROUTER_SIGNAL_LOGS |
FLB_ROUTER_SIGNAL_METRICS |
FLB_ROUTER_SIGNAL_TRACES)
};

struct flb_route_condition_rule {
flb_sds_t field;
flb_sds_t op;
flb_sds_t value;
struct cfl_list _head;
};

struct flb_route_condition {
struct cfl_list rules;
int is_default;
};

struct flb_route_output {
flb_sds_t name;
flb_sds_t fallback;
struct cfl_list _head;
};

struct flb_route_processor_property {
flb_sds_t key;
flb_sds_t value;
struct cfl_list _head;
};

struct flb_route_processor {
flb_sds_t name;
struct cfl_list properties;
struct cfl_list _head;
};

struct flb_route {
flb_sds_t name;
uint32_t signals;
struct flb_route_condition *condition;
struct cfl_list outputs;
struct cfl_list processors;
struct cfl_list _head;
};

struct flb_input_routes {
flb_sds_t input_name;
struct cfl_list processors;
struct cfl_list routes;
struct cfl_list _head;
};

int flb_router_connect(struct flb_input_instance *in,
struct flb_output_instance *out);
int flb_router_connect_direct(struct flb_input_instance *in,
Expand All @@ -65,4 +125,25 @@ int flb_router_match(const char *tag, int tag_len,
const char *match, void *match_regex);
int flb_router_io_set(struct flb_config *config);
void flb_router_exit(struct flb_config *config);

uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk);

int flb_route_condition_eval(struct flb_event_chunk *chunk,
struct flb_route *route);
int flb_condition_eval_logs(struct flb_event_chunk *chunk,
struct flb_route *route);
int flb_condition_eval_metrics(struct flb_event_chunk *chunk,
struct flb_route *route);
int flb_condition_eval_traces(struct flb_event_chunk *chunk,
struct flb_route *route);

struct flb_cf;

int flb_router_config_parse(struct flb_cf *cf,
struct cfl_list *input_routes,
struct flb_config *config);
void flb_router_routes_destroy(struct cfl_list *input_routes);
int flb_router_apply_config(struct flb_config *config);

#endif

2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ set(src
flb_upstream_ha.c
flb_upstream_node.c
flb_router.c
flb_router_condition.c
flb_router_config.c
flb_worker.c
flb_coro.c
flb_time.c
Expand Down
69 changes: 48 additions & 21 deletions src/config_format/flb_cf_yaml.c
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,9 @@ static enum status state_copy_into_properties(struct parser_state *state, struct
struct cfl_kvpair *kvp;
struct cfl_variant *var;
struct cfl_array *arr;
int idx;
size_t idx;
size_t entry_count;
int array_all_strings;

cfl_list_foreach(head, &state->keyvals->list) {
kvp = cfl_list_entry(head, struct cfl_kvpair, _head);
Expand All @@ -773,33 +775,57 @@ static enum status state_copy_into_properties(struct parser_state *state, struct
}
break;
case CFL_VARIANT_ARRAY:
arr = flb_cf_section_property_add_list(conf, properties,
kvp->key, cfl_sds_len(kvp->key));
entry_count = kvp->val->data.as_array->entry_count;
array_all_strings = 1;

if (arr == NULL) {
flb_error("unable to add property list");
return YAML_FAILURE;
}
for (idx = 0; idx < kvp->val->data.as_array->entry_count; idx++) {
for (idx = 0; idx < entry_count; idx++) {
var = cfl_array_fetch_by_index(kvp->val->data.as_array, idx);
if (var == NULL || var->type != CFL_VARIANT_STRING) {
array_all_strings = 0;
break;
}
}

if (array_all_strings == 1) {
arr = flb_cf_section_property_add_list(conf, properties,
kvp->key, cfl_sds_len(kvp->key));

if (var == NULL) {
flb_error("unable to retrieve from array by index");
if (arr == NULL) {
flb_error("unable to add property list");
return YAML_FAILURE;
}
switch (var->type) {
case CFL_VARIANT_STRING:

for (idx = 0; idx < entry_count; idx++) {
var = cfl_array_fetch_by_index(kvp->val->data.as_array, idx);

if (cfl_array_append_string(arr, var->data.as_string) < 0) {
flb_error("unable to append string to array");
return YAML_FAILURE;
}
break;
default:
flb_error("unable to copy value for property");
}
}
else {
if (flb_cf_section_property_add_variant(conf,
properties,
kvp->key,
cfl_sds_len(kvp->key),
kvp->val) == NULL) {
flb_error("unable to add variant property");
return YAML_FAILURE;
}
kvp->val = NULL;
}
break;
case CFL_VARIANT_KVLIST:
if (flb_cf_section_property_add_variant(conf,
properties,
kvp->key,
cfl_sds_len(kvp->key),
kvp->val) == NULL) {
flb_error("unable to add variant property");
return YAML_FAILURE;
}
kvp->val = NULL;
break;
default:
flb_error("unknown value type for properties: %d", kvp->val->type);
Expand Down Expand Up @@ -1990,6 +2016,10 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx,
if (state->section == SECTION_PROCESSOR) {
state = state_push_variant(ctx, state, 0);
}
else if (strcmp(state->key, "routes") == 0 ||
strcmp(state->key, "processors") == 0) {
state = state_push_variant(ctx, state, 0);
}
else {
state = state_push_witharr(ctx, state, STATE_PLUGIN_VAL_LIST);
}
Expand All @@ -2013,17 +2043,14 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx,
break;
}

if (strcmp(state->key, "processors") == 0) {
state = state_push(ctx, STATE_INPUT_PROCESSORS);
if (strcmp(state->key, "routes") == 0 ||
strcmp(state->key, "processors") == 0) {
state = state_push_variant(ctx, state, 1);

if (state == NULL) {
flb_error("unable to allocate state");
return YAML_FAILURE;
}

if (state_create_group(conf, state, "processors") == YAML_FAILURE) {
return YAML_FAILURE;
}
break;
}

Expand Down
38 changes: 38 additions & 0 deletions src/config_format/flb_config_format.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,44 @@ struct cfl_variant *flb_cf_section_property_add(struct flb_cf *cf,
return NULL;
}

struct cfl_variant *flb_cf_section_property_add_variant(struct flb_cf *cf,
struct cfl_kvlist *kv_list,
char *k_buf, size_t k_len,
struct cfl_variant *variant)
{
int rc;
flb_sds_t key;

if (variant == NULL) {
return NULL;
}

if (k_len == 0) {
k_len = strlen(k_buf);
}

key = flb_cf_key_translate(cf, k_buf, k_len);
if (key == NULL) {
return NULL;
}

rc = flb_sds_trim(key);
if (rc == -1) {
flb_cf_error_set(cf, FLB_CF_ERROR_KV_INVALID_KEY);
flb_sds_destroy(key);
return NULL;
}

rc = cfl_kvlist_insert(kv_list, key, variant);
if (rc < 0) {
flb_sds_destroy(key);
return NULL;
}

flb_sds_destroy(key);
return variant;
}

struct cfl_array *flb_cf_section_property_add_list(struct flb_cf *cf,
struct cfl_kvlist *kv_list,
char *k_buf, size_t k_len)
Expand Down
16 changes: 16 additions & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <fluent-bit/flb_config_format.h>
#include <fluent-bit/multiline/flb_ml.h>
#include <fluent-bit/flb_bucket_queue.h>
#include <fluent-bit/flb_router.h>

const char *FLB_CONF_ENV_LOGLEVEL = "FLB_LOG_LEVEL";

Expand Down Expand Up @@ -367,6 +368,7 @@ struct flb_config *flb_config_init()
mk_list_init(&config->filters);
mk_list_init(&config->outputs);
mk_list_init(&config->proxies);
cfl_list_init(&config->input_routes);
mk_list_init(&config->workers);
mk_list_init(&config->upstreams);
mk_list_init(&config->downstreams);
Expand Down Expand Up @@ -613,6 +615,9 @@ void flb_config_exit(struct flb_config *config)
flb_config_task_map_resize(config, 0);
flb_routes_empty_mask_destroy(config);

/* Clean up router input routes */
flb_router_routes_destroy(&config->input_routes);

flb_free(config);
}

Expand Down Expand Up @@ -856,6 +861,9 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf,
if (strcasecmp(kv->key, "name") == 0) {
continue;
}
if (strcasecmp(kv->key, "routes") == 0) {
continue;
}

Comment on lines +864 to 867
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Do not process “processors” as a regular property

Guard against variant “processors” breaking plugin setup; skip it like “routes”, or handle the variant explicitly.

             if (strcasecmp(kv->key, "name") == 0) {
                 continue;
             }
             if (strcasecmp(kv->key, "routes") == 0) {
                 continue;
             }
+            if (strcasecmp(kv->key, "processors") == 0) {
+                /* handled via group loader or (future) variant loader */
+                continue;
+            }
🤖 Prompt for AI Agents
In src/flb_config.c around lines 864 to 867, the parsing currently skips the
"routes" key but not the "processors" variant which can disrupt plugin setup;
update the conditional to also skip keys equal to "processors" (e.g., treat
strcasecmp(kv->key, "processors") == 0 the same as "routes") or add an explicit
branch to handle "processors" correctly so it is not processed as a regular
property. Ensure the check is case-insensitive and placed alongside the existing
"routes" guard so "processors" will be bypassed or handled safely during
configuration parsing.

/* set ret to -1 to ensure that we treat any unhandled plugin or
* value types as errors.
Expand Down Expand Up @@ -960,6 +968,7 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf,
int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf)
{
int ret;
flb_debug("[config] starting configuration loading");
struct flb_kv *kv;
struct mk_list *head;
struct cfl_kvpair *ckv;
Expand Down Expand Up @@ -1061,6 +1070,13 @@ int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf)
return -1;
}

/* Parse new router configuration */
ret = flb_router_config_parse(cf, &config->input_routes, config);
if (ret == -1) {
flb_debug("[router] router configuration parsing failed");
return -1;
}

return 0;
}

Expand Down
9 changes: 9 additions & 0 deletions src/flb_router.c
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,15 @@ int flb_router_io_set(struct flb_config *config)
}
}

/* Apply new router configuration if available */
if (!cfl_list_is_empty(&config->input_routes)) {
flb_debug("[router] new router configuration found, applying...");
if (flb_router_apply_config(config) == -1) {
flb_error("[router] failed to apply new router configuration");
return -1;
}
}

return 0;
}

Expand Down
Loading
Loading