Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/fluent-bit/config_format/flb_cf.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ struct cfl_variant *flb_cf_section_property_add(struct flb_cf *cf,
char *k_buf, size_t k_len,
char *v_buf, size_t v_len);

struct cfl_variant *flb_cf_section_property_add_variant(struct flb_cf *cf,
struct cfl_kvlist *kv_list,
char *k_buf, size_t k_len,
struct cfl_variant *variant);

struct cfl_array *flb_cf_section_property_add_list(struct flb_cf *cf,
struct cfl_kvlist *kv_list,
char *k_buf, size_t k_len);
Expand Down
4 changes: 4 additions & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_task_map.h>
#include <cfl/cfl.h>

#include <monkey/mk_core.h>

Expand Down Expand Up @@ -325,6 +326,9 @@ struct flb_config {
int json_escape_unicode;

int dry_run;

/* New Router Configuration */
struct cfl_list input_routes;
};

#define FLB_CONFIG_LOG_LEVEL(c) (c->log->level)
Expand Down
83 changes: 82 additions & 1 deletion include/fluent-bit/flb_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

/* Fluent Bit
* ==========
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Copyright (C) 2015-2025 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,11 @@
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_event.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_config.h>
#include <cfl/cfl.h>
#include <monkey/mk_core.h>

struct flb_router_path {
struct flb_output_instance *ins;
Expand Down Expand Up @@ -56,6 +61,61 @@ static inline int flb_router_match_type(int in_event_type,
return FLB_TRUE;
}

enum flb_router_signal {
FLB_ROUTER_SIGNAL_LOGS = (1U << 0),
FLB_ROUTER_SIGNAL_METRICS = (1U << 1),
FLB_ROUTER_SIGNAL_TRACES = (1U << 2),
FLB_ROUTER_SIGNAL_ANY = (FLB_ROUTER_SIGNAL_LOGS |
FLB_ROUTER_SIGNAL_METRICS |
FLB_ROUTER_SIGNAL_TRACES)
};

struct flb_route_condition_rule {
flb_sds_t field;
flb_sds_t op;
flb_sds_t value;
struct cfl_list _head;
};

struct flb_route_condition {
struct cfl_list rules;
int is_default;
};

struct flb_route_output {
flb_sds_t name;
flb_sds_t fallback;
struct cfl_list _head;
};

struct flb_route_processor_property {
flb_sds_t key;
flb_sds_t value;
struct cfl_list _head;
};

struct flb_route_processor {
flb_sds_t name;
struct cfl_list properties;
struct cfl_list _head;
};

struct flb_route {
flb_sds_t name;
uint32_t signals;
struct flb_route_condition *condition;
struct cfl_list outputs;
struct cfl_list processors;
struct cfl_list _head;
};

struct flb_input_routes {
flb_sds_t input_name;
struct cfl_list processors;
struct cfl_list routes;
struct cfl_list _head;
};

int flb_router_connect(struct flb_input_instance *in,
struct flb_output_instance *out);
int flb_router_connect_direct(struct flb_input_instance *in,
Expand All @@ -65,4 +125,25 @@ int flb_router_match(const char *tag, int tag_len,
const char *match, void *match_regex);
int flb_router_io_set(struct flb_config *config);
void flb_router_exit(struct flb_config *config);

uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk);

int flb_route_condition_eval(struct flb_event_chunk *chunk,
struct flb_route *route);
int flb_condition_eval_logs(struct flb_event_chunk *chunk,
struct flb_route *route);
int flb_condition_eval_metrics(struct flb_event_chunk *chunk,
struct flb_route *route);
int flb_condition_eval_traces(struct flb_event_chunk *chunk,
struct flb_route *route);

struct flb_cf;

int flb_router_config_parse(struct flb_cf *cf,
struct cfl_list *input_routes,
struct flb_config *config);
void flb_router_routes_destroy(struct cfl_list *input_routes);
int flb_router_apply_config(struct flb_config *config);

#endif

2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ set(src
flb_upstream_ha.c
flb_upstream_node.c
flb_router.c
flb_router_condition.c
flb_router_config.c
flb_worker.c
flb_coro.c
flb_time.c
Expand Down
92 changes: 71 additions & 21 deletions src/config_format/flb_cf_yaml.c
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,9 @@ static enum status state_copy_into_properties(struct parser_state *state, struct
struct cfl_kvpair *kvp;
struct cfl_variant *var;
struct cfl_array *arr;
int idx;
size_t idx;
size_t entry_count;
int array_all_strings;

cfl_list_foreach(head, &state->keyvals->list) {
kvp = cfl_list_entry(head, struct cfl_kvpair, _head);
Expand All @@ -773,33 +775,57 @@ static enum status state_copy_into_properties(struct parser_state *state, struct
}
break;
case CFL_VARIANT_ARRAY:
arr = flb_cf_section_property_add_list(conf, properties,
kvp->key, cfl_sds_len(kvp->key));
entry_count = kvp->val->data.as_array->entry_count;
array_all_strings = 1;

if (arr == NULL) {
flb_error("unable to add property list");
return YAML_FAILURE;
}
for (idx = 0; idx < kvp->val->data.as_array->entry_count; idx++) {
for (idx = 0; idx < entry_count; idx++) {
var = cfl_array_fetch_by_index(kvp->val->data.as_array, idx);
if (var == NULL || var->type != CFL_VARIANT_STRING) {
array_all_strings = 0;
break;
}
}

if (array_all_strings == 1) {
arr = flb_cf_section_property_add_list(conf, properties,
kvp->key, cfl_sds_len(kvp->key));

if (var == NULL) {
flb_error("unable to retrieve from array by index");
if (arr == NULL) {
flb_error("unable to add property list");
return YAML_FAILURE;
}
switch (var->type) {
case CFL_VARIANT_STRING:

for (idx = 0; idx < entry_count; idx++) {
var = cfl_array_fetch_by_index(kvp->val->data.as_array, idx);

if (cfl_array_append_string(arr, var->data.as_string) < 0) {
flb_error("unable to append string to array");
return YAML_FAILURE;
}
break;
default:
flb_error("unable to copy value for property");
}
}
else {
if (flb_cf_section_property_add_variant(conf,
properties,
kvp->key,
cfl_sds_len(kvp->key),
kvp->val) == NULL) {
flb_error("unable to add variant property");
return YAML_FAILURE;
}
kvp->val = NULL;
}
break;
case CFL_VARIANT_KVLIST:
if (flb_cf_section_property_add_variant(conf,
properties,
kvp->key,
cfl_sds_len(kvp->key),
kvp->val) == NULL) {
flb_error("unable to add variant property");
return YAML_FAILURE;
}
kvp->val = NULL;
break;
default:
flb_error("unknown value type for properties: %d", kvp->val->type);
Expand Down Expand Up @@ -1990,6 +2016,10 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx,
if (state->section == SECTION_PROCESSOR) {
state = state_push_variant(ctx, state, 0);
}
else if (strcmp(state->key, "routes") == 0 ||
strcmp(state->key, "processors") == 0) {
state = state_push_variant(ctx, state, 0);
}
else {
state = state_push_witharr(ctx, state, STATE_PLUGIN_VAL_LIST);
}
Expand All @@ -2001,6 +2031,29 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx,
break;
case YAML_MAPPING_START_EVENT:

if (strcmp(state->key, "processors") == 0) {
struct flb_cf_group *group;

group = flb_cf_group_create(conf, state->cf_section,
state->key,
strlen(state->key));

if (group == NULL) {
flb_error("unable to create processors group");
return YAML_FAILURE;
}

state->cf_group = group;
state = state_push(ctx, STATE_INPUT_PROCESSORS);

if (state == NULL) {
flb_error("unable to allocate state");
return YAML_FAILURE;
}

break;
}

if (state->section == SECTION_PROCESSOR) {
/* when in a processor section, we allow plugins to have nested
* properties which are returned as a cfl_variant */
Expand All @@ -2013,17 +2066,14 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx,
break;
}

if (strcmp(state->key, "processors") == 0) {
state = state_push(ctx, STATE_INPUT_PROCESSORS);
if (strcmp(state->key, "routes") == 0 ||
strcmp(state->key, "processors") == 0) {
state = state_push_variant(ctx, state, 1);

if (state == NULL) {
flb_error("unable to allocate state");
return YAML_FAILURE;
}

if (state_create_group(conf, state, "processors") == YAML_FAILURE) {
return YAML_FAILURE;
}
break;
}

Expand Down
38 changes: 38 additions & 0 deletions src/config_format/flb_config_format.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,44 @@ struct cfl_variant *flb_cf_section_property_add(struct flb_cf *cf,
return NULL;
}

struct cfl_variant *flb_cf_section_property_add_variant(struct flb_cf *cf,
struct cfl_kvlist *kv_list,
char *k_buf, size_t k_len,
struct cfl_variant *variant)
{
int rc;
flb_sds_t key;

if (variant == NULL) {
return NULL;
}

if (k_len == 0) {
k_len = strlen(k_buf);
}

key = flb_cf_key_translate(cf, k_buf, k_len);
if (key == NULL) {
return NULL;
}

rc = flb_sds_trim(key);
if (rc == -1) {
flb_cf_error_set(cf, FLB_CF_ERROR_KV_INVALID_KEY);
flb_sds_destroy(key);
return NULL;
}

rc = cfl_kvlist_insert(kv_list, key, variant);
if (rc < 0) {
flb_sds_destroy(key);
return NULL;
}

flb_sds_destroy(key);
return variant;
}

struct cfl_array *flb_cf_section_property_add_list(struct flb_cf *cf,
struct cfl_kvlist *kv_list,
char *k_buf, size_t k_len)
Expand Down
16 changes: 16 additions & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <fluent-bit/flb_config_format.h>
#include <fluent-bit/multiline/flb_ml.h>
#include <fluent-bit/flb_bucket_queue.h>
#include <fluent-bit/flb_router.h>

const char *FLB_CONF_ENV_LOGLEVEL = "FLB_LOG_LEVEL";

Expand Down Expand Up @@ -367,6 +368,7 @@ struct flb_config *flb_config_init()
mk_list_init(&config->filters);
mk_list_init(&config->outputs);
mk_list_init(&config->proxies);
cfl_list_init(&config->input_routes);
mk_list_init(&config->workers);
mk_list_init(&config->upstreams);
mk_list_init(&config->downstreams);
Expand Down Expand Up @@ -613,6 +615,9 @@ void flb_config_exit(struct flb_config *config)
flb_config_task_map_resize(config, 0);
flb_routes_empty_mask_destroy(config);

/* Clean up router input routes */
flb_router_routes_destroy(&config->input_routes);

flb_free(config);
}

Expand Down Expand Up @@ -856,6 +861,9 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf,
if (strcasecmp(kv->key, "name") == 0) {
continue;
}
if (strcasecmp(kv->key, "routes") == 0) {
continue;
}

Comment on lines +864 to 867
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Do not process “processors” as a regular property

Guard against variant “processors” breaking plugin setup; skip it like “routes”, or handle the variant explicitly.

             if (strcasecmp(kv->key, "name") == 0) {
                 continue;
             }
             if (strcasecmp(kv->key, "routes") == 0) {
                 continue;
             }
+            if (strcasecmp(kv->key, "processors") == 0) {
+                /* handled via group loader or (future) variant loader */
+                continue;
+            }
🤖 Prompt for AI Agents
In src/flb_config.c around lines 864 to 867, the parsing currently skips the
"routes" key but not the "processors" variant which can disrupt plugin setup;
update the conditional to also skip keys equal to "processors" (e.g., treat
strcasecmp(kv->key, "processors") == 0 the same as "routes") or add an explicit
branch to handle "processors" correctly so it is not processed as a regular
property. Ensure the check is case-insensitive and placed alongside the existing
"routes" guard so "processors" will be bypassed or handled safely during
configuration parsing.

/* set ret to -1 to ensure that we treat any unhandled plugin or
* value types as errors.
Expand Down Expand Up @@ -960,6 +968,7 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf,
int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf)
{
int ret;
flb_debug("[config] starting configuration loading");
struct flb_kv *kv;
struct mk_list *head;
struct cfl_kvpair *ckv;
Expand Down Expand Up @@ -1061,6 +1070,13 @@ int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf)
return -1;
}

/* Parse new router configuration */
ret = flb_router_config_parse(cf, &config->input_routes, config);
if (ret == -1) {
flb_debug("[router] router configuration parsing failed");
return -1;
}

return 0;
}

Expand Down
Loading
Loading