-
Notifications
You must be signed in to change notification settings - Fork 1.8k
config: multiline: in_tail: filter_multiline: Add configurable buffer limit for multiline interface #10653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
config: multiline: in_tail: filter_multiline: Add configurable buffer limit for multiline interface #10653
Conversation
578f46a
to
3a8abd9
Compare
WalkthroughAdds a configurable multiline buffer limit, threads a FLB_MULTILINE_TRUNCATED status through multiline processing, enforces per-group buffer limits on append, records truncation via metrics/logs (filter and tail), adds binary-size parsing utility, and adds tests for truncation and size parsing. Changes
Sequence Diagram(s)sequenceDiagram
participant Config
participant TailInput
participant MultilineEngine
participant StreamGroup
participant FilterPlugin
participant Metrics
Config->>MultilineEngine: set buffer_limit (bytes)
TailInput->>MultilineEngine: append line
MultilineEngine->>StreamGroup: flb_ml_group_cat(data,len)
alt fits within limit
StreamGroup-->>MultilineEngine: FLB_MULTILINE_OK
MultilineEngine->>FilterPlugin: process / emit
FilterPlugin->>Metrics: increment emitted metric
else truncated or partially appended
StreamGroup-->>MultilineEngine: FLB_MULTILINE_TRUNCATED
MultilineEngine->>TailInput: log warning
MultilineEngine->>Metrics: increment truncated metric
MultilineEngine->>StreamGroup: mark truncated flag
end
MultilineEngine->>StreamGroup: flush -> include "multiline_truncated": true if set
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested reviewers
Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
3a8abd9
to
dec20f1
Compare
/* Return codes */ | ||
#define FLB_MULTILINE_OK 0 | ||
#define FLB_MULTILINE_PROCESSED 1 /* Reserved */ | ||
#define FLB_MULTILINE_TRUNCATED 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing to status code 2 is needed because status code 1 will be collided for FLB_TRUE status.
Signed-off-by: Hiroshi Hatake <[email protected]>
Signed-off-by: Hiroshi Hatake <[email protected]>
1 is also indicated for FLB_TRUE. Signed-off-by: Hiroshi Hatake <[email protected]>
This commit updates the expected output for the 'container_mix' unit test. Previously, the multiline engine could incorrectly merge pending messages when the log stream switched between different parser types (e.g., from `docker` to `cri`). The test's original expectations were written to match this buggy behavior. Recent fixes have made the engine's state handling more robust and precise. It now correctly flushes a pending message when the parser context changes, preventing improper merges. This change aligns the test case with the new, correct logic. Signed-off-by: Hiroshi Hatake <[email protected]> Co-authored-by: Eduardo Silva <[email protected]>
Signed-off-by: Hiroshi Hatake <[email protected]>
…line Signed-off-by: Hiroshi Hatake <[email protected]>
Signed-off-by: Hiroshi Hatake <[email protected]>
Signed-off-by: Hiroshi Hatake <[email protected]>
Signed-off-by: Hiroshi Hatake <[email protected]>
size_to_byte function just converts with 1000(K), 1000*K, 1000*M. But this function converts with 1024(KiB), 1024*KiB(MiB), and 1024*MiB(GiB). Signed-off-by: Hiroshi Hatake <[email protected]>
Signed-off-by: Hiroshi Hatake <[email protected]>
Signed-off-by: Hiroshi Hatake <[email protected]>
d485a85
to
4788161
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/multiline/flb_ml.c (2)
292-306
: Enforce buffer limit for ENDSWITH/EQ by using flb_ml_group_cat instead of direct sds concat.Direct flb_sds_cat_safe bypasses buffer_limit; use flb_ml_group_cat and propagate TRUNCATED.
Apply:
@@ - /* Concatenate value */ - if (val_content) { - flb_sds_cat_safe(&stream_group->buf, - val_content->via.str.ptr, - val_content->via.str.size); - } - else { - flb_sds_cat_safe(&stream_group->buf, buf_data, buf_size); - } + /* Concatenate value with buffer limit enforcement */ + if (val_content) { + ret = flb_ml_group_cat(stream_group, + val_content->via.str.ptr, + val_content->via.str.size); + } + else { + ret = flb_ml_group_cat(stream_group, buf_data, buf_size); + } + if (ret == -1) { + return -1; + } + if (ret == FLB_MULTILINE_TRUNCATED) { + truncated = FLB_TRUE; + } @@ - /* Concatenate value */ - if (val_content) { - flb_sds_cat_safe(&stream_group->buf, - val_content->via.str.ptr, - val_content->via.str.size); - } - else { - flb_sds_cat_safe(&stream_group->buf, buf_data, buf_size); - } + /* Concatenate value with buffer limit enforcement */ + if (val_content) { + ret = flb_ml_group_cat(stream_group, + val_content->via.str.ptr, + val_content->via.str.size); + } + else { + ret = flb_ml_group_cat(stream_group, buf_data, buf_size); + } + if (ret == -1) { + return -1; + } + if (ret == FLB_MULTILINE_TRUNCATED) { + truncated = FLB_TRUE; + }Also applies to: 326-341
879-883
: Correct wrong NULL check after flb_sds_create(name).Should check ml->name, not ml. Current code leaks ml on failure.
Apply:
- ml->name = flb_sds_create(name); - if (!ml) { + ml->name = flb_sds_create(name); + if (!ml->name) { flb_free(ml); return NULL; }
♻️ Duplicate comments (4)
plugins/filter_multiline/ml.c (4)
381-385
: Fix typo in metric help string (“occurence” → “occurrences”).User-facing metric text should be correct.
Apply:
- "Total number of truncated occurence of multiline", + "Total number of truncated multiline occurrences",
839-850
: Guard cmt_truncated before increment to avoid NULL deref.cmt_counter_create can return NULL; cmt_counter_inc would segfault.
Apply:
#ifdef FLB_HAVE_METRICS - name = (char *) flb_filter_name(ctx->ins); - ts = cfl_time_now(); - cmt_counter_inc(ctx->cmt_truncated, ts, 1, (char *[]) {name}); + if (ctx->cmt_truncated) { + name = (char *) flb_filter_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_truncated, ts, 1, (char *[]) {name}); + } /* old api */ flb_metrics_sum(FLB_MULTILINE_METRIC_TRUNCATED, 1, ctx->ins->metrics); #endif
902-913
: Guard cmt_truncated before increment (buffered path) to prevent crashes and overcounting.Avoid NULL deref; also ensure we’re counting truncations at emit time, not per-append.
Apply:
#ifdef FLB_HAVE_METRICS - name = (char *) flb_filter_name(ctx->ins); - ts = cfl_time_now(); - cmt_counter_inc(ctx->cmt_truncated, ts, 1, (char *[]) {name}); + if (ctx->cmt_truncated) { + name = (char *) flb_filter_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_truncated, ts, 1, (char *[]) {name}); + } /* old api */ flb_metrics_sum(FLB_MULTILINE_METRIC_TRUNCATED, 1, ctx->ins->metrics); #endifAdditionally, please verify whether TRUNCATED is also recorded elsewhere on flush; if so, this increment may double-count in buffered mode. If needed, gate by buffered-emit boundary.
918-927
: Guard cmt_emitted before increment; consider moving EMITTED to flush point to match actual emissions.Current increment happens on append OK, which may not equal “emitted records”.
Apply the NULL guard:
#ifdef FLB_HAVE_METRICS - name = (char *) flb_filter_name(ctx->ins); - ts = cfl_time_now(); - cmt_counter_inc(ctx->cmt_emitted, ts, 1, (char *[]) {name}); + if (ctx->cmt_emitted) { + name = (char *) flb_filter_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_emitted, ts, 1, (char *[]) {name}); + } /* old api */ flb_metrics_sum(FLB_MULTILINE_METRIC_EMITTED, 1, ctx->ins->metrics); #endifPrefer incrementing EMITTED at the actual emit site (flush_callback) in buffered mode to avoid drift:
@@ static int flush_callback(..., char *buf_data, size_t buf_size) - return ret; + /* metrics: count actual emitted records (buffered mode) */ +#ifdef FLB_HAVE_METRICS + if (ret == 0 && ctx->cmt_emitted) { + char *name = (char *) flb_filter_name(ctx->ins); + uint64_t ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_emitted, ts, 1, (char *[]) {name}); + } + /* old api */ + flb_metrics_sum(FLB_MULTILINE_METRIC_EMITTED, 1, ctx->ins->metrics); +#endif + return ret;
🧹 Nitpick comments (3)
tests/internal/utils.c (2)
801-822
: Solid coverage for binary units; add a few edge cases.Current cases look correct for K/M/G, KB/MB/GB, and KiB/MiB/GiB. To harden the suite, add:
- Case-insensitivity
- "false" → 0
- Unsupported units (T/TiB) → -1
Apply this diff within the array:
struct size_to_bytes_check size_to_binary_bytes_checks[] = { {"922337.63", 922337}, {"2K",2048}, {"5.7263K", 5863}, {"5.7263KB", 5863}, {"5.7263KiB", 5863}, + {"5.7263kb", 5863}, {"9223372036854775.23K", -1}, {"1M", 1048576}, {"1.1M", 1153433}, {"1.1MB", 1153433}, {"1.1MiB", 1153433}, + {"1.1mib", 1153433}, {"3.592M", 3766484}, {"52.752383M", 55314882}, {"52.752383MB", 55314882}, {"52.752383MiB", 55314882}, {"9223372036854.42M", -1}, {"492.364G",528671819431}, {"492.364GB",528671819431}, {"492.364GiB",528671819431}, + {"492.364gb",528671819431}, {"1.2973G", 1392965268}, {"9223372036.78G", -1}, + {"false", 0}, + {"1T", -1}, + {"1TiB", -1}, };
857-857
: Typo in test name.Rename “bianry” → “binary” for readability in test outputs.
Apply this diff:
- { "test_size_to_bianry_bytes", test_size_to_binary_bytes }, + { "test_size_to_binary_bytes", test_size_to_binary_bytes },src/multiline/flb_ml.c (1)
747-751
: Propagate TRUNCATED status correctly on standalone fallback (text and map).You set status when flb_ml_group_cat returns TRUNCATED; also check for -1 (OOM) and bail.
Apply:
@@ - ret = flb_ml_group_cat(st_group, buf, size); - if (ret == FLB_MULTILINE_TRUNCATED) { + ret = flb_ml_group_cat(st_group, buf, size); + if (ret == -1) { + return -1; + } + if (ret == FLB_MULTILINE_TRUNCATED) { status = FLB_MULTILINE_TRUNCATED; } @@ - flb_ml_register_context(st_group, tm, obj); + flb_ml_register_context(st_group, tm, obj); flb_ml_flush_stream_group(parser_i->ml_parser, mst, st_group, FLB_FALSE);Please confirm tests cover both ENDSWITH/EQ truncation and the fallback standalone path.
Also applies to: 853-855
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (17)
include/fluent-bit/flb_config.h
(2 hunks)include/fluent-bit/flb_utils.h
(1 hunks)include/fluent-bit/multiline/flb_ml.h
(3 hunks)include/fluent-bit/multiline/flb_ml_group.h
(1 hunks)plugins/filter_multiline/ml.c
(4 hunks)plugins/filter_multiline/ml.h
(2 hunks)plugins/in_tail/tail_config.c
(1 hunks)plugins/in_tail/tail_config.h
(2 hunks)plugins/in_tail/tail_file.c
(2 hunks)src/flb_config.c
(2 hunks)src/flb_utils.c
(1 hunks)src/multiline/flb_ml.c
(13 hunks)src/multiline/flb_ml_group.c
(2 hunks)src/multiline/flb_ml_rule.c
(3 hunks)src/multiline/flb_ml_stream.c
(1 hunks)tests/internal/multiline.c
(4 hunks)tests/internal/utils.c
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (14)
- src/multiline/flb_ml_group.c
- include/fluent-bit/multiline/flb_ml_group.h
- plugins/in_tail/tail_config.c
- plugins/filter_multiline/ml.h
- include/fluent-bit/flb_utils.h
- include/fluent-bit/flb_config.h
- src/flb_utils.c
- plugins/in_tail/tail_config.h
- src/multiline/flb_ml_stream.c
- src/flb_config.c
- plugins/in_tail/tail_file.c
- include/fluent-bit/multiline/flb_ml.h
- tests/internal/multiline.c
- src/multiline/flb_ml_rule.c
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-29T06:25:02.540Z
Learnt from: shadowshot-x
PR: fluent/fluent-bit#10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.540Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.
Applied to files:
src/multiline/flb_ml.c
🧬 Code graph analysis (3)
plugins/filter_multiline/ml.c (3)
lib/cmetrics/src/cmt_counter.c (2)
cmt_counter_create
(26-81)cmt_counter_inc
(100-117)src/flb_metrics.c (2)
flb_metrics_add
(129-178)flb_metrics_sum
(180-191)src/flb_filter.c (1)
flb_filter_name
(490-497)
tests/internal/utils.c (1)
src/flb_utils.c (1)
flb_utils_size_to_binary_bytes
(610-694)
src/multiline/flb_ml.c (3)
src/multiline/flb_ml_stream.c (2)
flb_ml_stream_get
(278-292)flb_ml_stream_group_get
(91-135)src/multiline/flb_ml_group.c (1)
flb_ml_group_cat
(89-122)src/flb_utils.c (1)
flb_utils_size_to_binary_bytes
(610-694)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (29)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
- GitHub Check: pr-compile-centos-7
- GitHub Check: PR - fuzzing test
ml->config = ctx; | ||
limit = flb_utils_size_to_binary_bytes(ml->config->multiline_buffer_limit); | ||
if (limit > 0) { | ||
ml->buffer_limit = (size_t)limit; | ||
} | ||
else { | ||
ml->buffer_limit = FLB_ML_BUFFER_LIMIT_DEFAULT; | ||
} | ||
ml->last_flush = time_ms_now(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Honor “0/false = unlimited” and distinguish invalid from disabled for buffer_limit.
Treat 0 as “no limit”; only fall back to default on error (-1).
Apply:
- limit = flb_utils_size_to_binary_bytes(ml->config->multiline_buffer_limit);
- if (limit > 0) {
- ml->buffer_limit = (size_t)limit;
- }
- else {
- ml->buffer_limit = FLB_ML_BUFFER_LIMIT_DEFAULT;
- }
+ limit = flb_utils_size_to_binary_bytes(ml->config->multiline_buffer_limit);
+ if (limit == -1) {
+ /* invalid / unset: use default */
+ ml->buffer_limit = FLB_ML_BUFFER_LIMIT_DEFAULT;
+ }
+ else {
+ /* 0 means unlimited (disabled), >0 sets explicit cap */
+ ml->buffer_limit = (size_t) limit;
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
ml->config = ctx; | |
limit = flb_utils_size_to_binary_bytes(ml->config->multiline_buffer_limit); | |
if (limit > 0) { | |
ml->buffer_limit = (size_t)limit; | |
} | |
else { | |
ml->buffer_limit = FLB_ML_BUFFER_LIMIT_DEFAULT; | |
} | |
ml->last_flush = time_ms_now(); | |
ml->config = ctx; | |
- limit = flb_utils_size_to_binary_bytes(ml->config->multiline_buffer_limit); | |
- if (limit > 0) { | |
- ml->buffer_limit = (size_t)limit; | |
- } | |
- else { | |
- ml->buffer_limit = FLB_ML_BUFFER_LIMIT_DEFAULT; | |
limit = flb_utils_size_to_binary_bytes(ml->config->multiline_buffer_limit); | |
if (limit == -1) { | |
/* invalid / unset: use default */ | |
ml->buffer_limit = FLB_ML_BUFFER_LIMIT_DEFAULT; | |
} | |
else { | |
/* 0 means unlimited (disabled), >0 sets explicit cap */ | |
ml->buffer_limit = (size_t) limit; | |
} | |
ml->last_flush = time_ms_now(); |
🤖 Prompt for AI Agents
In src/multiline/flb_ml.c around lines 885 to 893, the code currently treats any
non-positive return from flb_utils_size_to_binary_bytes as a signal to fall back
to the default buffer limit; change it so that a return value of 0 means "no
limit" (set ml->buffer_limit = 0), a return value of -1 indicates an error (set
ml->buffer_limit = FLB_ML_BUFFER_LIMIT_DEFAULT), and any positive value is used
as the buffer limit (cast to size_t). Ensure you check specifically for -1 vs 0
so invalid input falls back to default while 0/false is honored as unlimited.
if (flb_sds_len(group->buf) > 0) { | ||
/* Take the first line keys and repack */ | ||
len = flb_sds_len(parser_i->key_content); | ||
size = map.via.map.size; | ||
msgpack_pack_map(&mp_pck, size); | ||
|
||
for (i = 0; i < size; i++) { | ||
k = map.via.map.ptr[i].key; | ||
v = map.via.map.ptr[i].val; | ||
|
||
/* | ||
* Check if the current key is the key that will contain the | ||
* concatenated multiline buffer | ||
*/ | ||
if (k.type == MSGPACK_OBJECT_STR && | ||
parser_i->key_content && | ||
k.via.str.size == len && | ||
strncmp(k.via.str.ptr, parser_i->key_content, len) == 0) { | ||
|
||
/* key */ | ||
msgpack_pack_object(&mp_pck, k); | ||
|
||
/* value */ | ||
len = flb_sds_len(group->buf); | ||
msgpack_pack_str(&mp_pck, len); | ||
msgpack_pack_str_body(&mp_pck, group->buf, len); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix potential NULL dereference when key_content is unset.
len = flb_sds_len(parser_i->key_content) is called before checking key_content.
Apply:
- /* Take the first line keys and repack */
- len = flb_sds_len(parser_i->key_content);
+ /* Take the first line keys and repack */
+ if (parser_i->key_content) {
+ len = flb_sds_len(parser_i->key_content);
+ } else {
+ len = 0; /* will skip replacement below */
+ }
@@
- if (k.type == MSGPACK_OBJECT_STR &&
- parser_i->key_content &&
- k.via.str.size == len &&
- strncmp(k.via.str.ptr, parser_i->key_content, len) == 0) {
+ if (parser_i->key_content &&
+ k.type == MSGPACK_OBJECT_STR &&
+ k.via.str.size == len &&
+ strncmp(k.via.str.ptr, parser_i->key_content, len) == 0) {
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if (flb_sds_len(group->buf) > 0) { | |
/* Take the first line keys and repack */ | |
len = flb_sds_len(parser_i->key_content); | |
size = map.via.map.size; | |
msgpack_pack_map(&mp_pck, size); | |
for (i = 0; i < size; i++) { | |
k = map.via.map.ptr[i].key; | |
v = map.via.map.ptr[i].val; | |
/* | |
* Check if the current key is the key that will contain the | |
* concatenated multiline buffer | |
*/ | |
if (k.type == MSGPACK_OBJECT_STR && | |
parser_i->key_content && | |
k.via.str.size == len && | |
strncmp(k.via.str.ptr, parser_i->key_content, len) == 0) { | |
/* key */ | |
msgpack_pack_object(&mp_pck, k); | |
/* value */ | |
len = flb_sds_len(group->buf); | |
msgpack_pack_str(&mp_pck, len); | |
msgpack_pack_str_body(&mp_pck, group->buf, len); | |
} | |
if (flb_sds_len(group->buf) > 0) { | |
/* Take the first line keys and repack */ | |
if (parser_i->key_content) { | |
len = flb_sds_len(parser_i->key_content); | |
} else { | |
len = 0; /* will skip replacement below */ | |
} | |
size = map.via.map.size; | |
msgpack_pack_map(&mp_pck, size); | |
for (i = 0; i < size; i++) { | |
k = map.via.map.ptr[i].key; | |
v = map.via.map.ptr[i].val; | |
/* | |
* Check if the current key is the key that will contain the | |
* concatenated multiline buffer | |
*/ | |
if (parser_i->key_content && | |
k.type == MSGPACK_OBJECT_STR && | |
k.via.str.size == len && | |
strncmp(k.via.str.ptr, parser_i->key_content, len) == 0) { | |
/* key */ | |
msgpack_pack_object(&mp_pck, k); | |
/* value */ | |
len = flb_sds_len(group->buf); | |
msgpack_pack_str(&mp_pck, len); | |
msgpack_pack_str_body(&mp_pck, group->buf, len); | |
} | |
… | |
} | |
} |
🤖 Prompt for AI Agents
In src/multiline/flb_ml.c around lines 1346 to 1372, the code calls len =
flb_sds_len(parser_i->key_content) before confirming parser_i->key_content is
non-NULL, risking a NULL dereference; move the flb_sds_len call so it only runs
after verifying parser_i->key_content is non-NULL (i.e., compute len inside the
if that checks parser_i->key_content), or add an explicit NULL guard that skips
computing len and the subsequent string comparison when key_content is NULL,
ensuring msgpack packing logic remains unchanged.
void test_size_to_binary_bytes() | ||
{ | ||
int i; | ||
int size; | ||
int64_t ret; | ||
struct size_to_bytes_check *u; | ||
|
||
size = sizeof(size_to_binary_bytes_checks) / sizeof(struct size_to_bytes_check); | ||
for (i = 0; i < size; i++) { | ||
u = &size_to_binary_bytes_checks[i]; | ||
|
||
ret = flb_utils_size_to_binary_bytes(u->size); | ||
TEST_CHECK_(ret == u->ret, "ret = %zu, u->ret = %zu", ret, u->ret); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix format specifiers for int64_t in TEST_CHECK_.
Using %zu with int64_t is undefined. Use PRId64 and include inttypes.h.
Apply this diff:
- TEST_CHECK_(ret == u->ret, "ret = %zu, u->ret = %zu", ret, u->ret);
+ TEST_CHECK_(ret == u->ret, "ret = %" PRId64 ", u->ret = %" PRId64, ret, u->ret);
Add this include near the other headers:
#include <inttypes.h>
🤖 Prompt for AI Agents
In tests/internal/utils.c around lines 824 to 838, the TEST_CHECK_ call is using
the %zu specifier for int64_t values which is undefined; add #include
<inttypes.h> near the other headers and update the TEST_CHECK_ invocation to use
the PRId64 macro in the format string (e.g. "%" PRId64) and pass the int64_t
values (ret and u->ret) as int64_t (cast if necessary) so the format matches the
type.
Signed-off-by: Hiroshi Hatake <[email protected]>
Signed-off-by: Hiroshi Hatake <[email protected]>
2b4bdf1
to
3575cca
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (2)
plugins/filter_multiline/ml.c (2)
387-391
: Typo in metric help string (“occurence” → “occurrences”).Correct the help text.
ctx->cmt_truncated = cmt_counter_create(ins->cmt, "fluentbit", "filter", "emit_truncated_total", - "Total number of truncated occurence of multiline", + "Total number of truncated multiline occurrences", 1, (char *[]) {"name"});
914-925
: Avoid potential double-counting of TRUNCATED (buffered path).If truncation is also surfaced during later flush or other paths, this may count twice. Please confirm invariants that FLB_MULTILINE_TRUNCATED is signaled exactly once per truncated message, per stream.
You can grep other increments to validate uniqueness:
#!/bin/bash rg -nC2 -e 'FLB_MULTILINE_METRIC_TRUNCATED|emit_truncated_total|cmt_truncated|FLB_MULTILINE_TRUNCATED' --type=c
🧹 Nitpick comments (2)
plugins/filter_multiline/ml.c (2)
807-810
: Keep label name const; cast only at call sites.Avoid casting away const on the pointer variable; declare as const char* and cast per API call.
#ifdef FLB_HAVE_METRICS - uint64_t ts; - char *name; + uint64_t ts; + const char *name; #endif- name = (char *) flb_filter_name(ctx->ins); + name = flb_filter_name(ctx->ins); ts = cfl_time_now(); - cmt_counter_inc(ctx->cmt_truncated, ts, 1, (char *[]) {name}); + cmt_counter_inc(ctx->cmt_truncated, ts, 1, (char *[]) {(char *) name});- name = (char *) flb_filter_name(ctx->ins); + name = flb_filter_name(ctx->ins); ts = cfl_time_now(); - cmt_counter_inc(ctx->cmt_truncated, ts, 1, (char *[]) {name}); + cmt_counter_inc(ctx->cmt_truncated, ts, 1, (char *[]) {(char *) name});- name = (char *) flb_filter_name(ctx->ins); + name = flb_filter_name(ctx->ins); ts = cfl_time_now(); - cmt_counter_inc(ctx->cmt_emitted, ts, 1, (char *[]) {name}); + cmt_counter_inc(ctx->cmt_emitted, ts, 1, (char *[]) {(char *) name});Also applies to: 855-858, 918-921, 932-935
926-939
: Streamline conditional ordering for readability (buffered path).Check OK first, then fall back to debug; same behavior, clearer flow.
- else if (ret != FLB_MULTILINE_OK) { - flb_plg_debug(ctx->ins, - "could not append object from tag: %s", tag); - } - else if (ret == FLB_MULTILINE_OK) { + else if (ret == FLB_MULTILINE_OK) { #ifdef FLB_HAVE_METRICS name = (char *) flb_filter_name(ctx->ins); ts = cfl_time_now(); cmt_counter_inc(ctx->cmt_emitted, ts, 1, (char *[]) {name}); /* old api */ flb_metrics_sum(FLB_MULTILINE_METRIC_EMITTED, 1, ctx->ins->metrics); #endif - } + } + else { + flb_plg_debug(ctx->ins, + "could not append object from tag: %s", tag); + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
plugins/filter_multiline/ml.c
(4 hunks)src/multiline/flb_ml_rule.c
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/multiline/flb_ml_rule.c
🧰 Additional context used
🧬 Code graph analysis (1)
plugins/filter_multiline/ml.c (4)
include/fluent-bit/flb_mem.h (1)
flb_free
(126-128)src/flb_metrics.c (2)
flb_metrics_add
(129-178)flb_metrics_sum
(180-191)lib/cmetrics/src/cmt_counter.c (2)
cmt_counter_create
(26-81)cmt_counter_inc
(100-117)src/flb_filter.c (1)
flb_filter_name
(490-497)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
- GitHub Check: PR - fuzzing test
- GitHub Check: pr-compile-centos-7
🔇 Additional comments (2)
plugins/filter_multiline/ml.c (2)
851-862
: Good: warn + metrics on truncation (unbuffered path).The warning plus TRUNCATED metrics update looks correct and matches the new status propagation.
930-939
: Confirm EMITTED metric scope (buffered-only).EMITTED is only created/used when buffered; unbuffered path doesn’t increment it. If parity across modes is desired, mirror it for the unbuffered flow; otherwise, document that emit_records_total reflects buffered emissions only.
if (!ctx->cmt_emitted) { | ||
flb_errno(); | ||
flb_free(ctx); | ||
|
||
return -1; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix early-exit leaks when cmt_emitted creation fails.
If cmt_counter_create() fails, the emitter instance created earlier is not torn down. Clean up ins_emitter before freeing ctx to avoid leaking an input instance.
if (!ctx->cmt_emitted) {
flb_errno();
- flb_free(ctx);
-
- return -1;
+ /* unwind emitter created earlier */
+ if (ctx->ins_emitter) {
+ flb_input_instance_exit(ctx->ins_emitter, ctx->config);
+ flb_input_instance_destroy(ctx->ins_emitter);
+ ctx->ins_emitter = NULL;
+ }
+ flb_free(ctx);
+ return -1;
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if (!ctx->cmt_emitted) { | |
flb_errno(); | |
flb_free(ctx); | |
return -1; | |
} | |
if (!ctx->cmt_emitted) { | |
flb_errno(); | |
/* unwind emitter created earlier */ | |
if (ctx->ins_emitter) { | |
flb_input_instance_exit(ctx->ins_emitter, ctx->config); | |
flb_input_instance_destroy(ctx->ins_emitter); | |
ctx->ins_emitter = NULL; | |
} | |
flb_free(ctx); | |
return -1; | |
} |
🤖 Prompt for AI Agents
In plugins/filter_multiline/ml.c around lines 372 to 377, the code returns early
when cmt_emitted creation fails but never tears down the previously created
emitter instance, leaking an input instance; before flb_free(ctx) call, destroy
the emitter instance (e.g. call flb_input_instance_destroy(ctx->ins_emitter) or
the repo's matching teardown function), set ctx->ins_emitter = NULL, then free
ctx and return -1.
if (!ctx->cmt_truncated) { | ||
flb_errno(); | ||
flb_free(ctx); | ||
|
||
return -1; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix early-exit leaks when cmt_truncated creation fails (both modes).
Failure here can leak either the emitter (buffered mode) or the msgpack sbuffer (unbuffered mode). Unwind before returning.
if (!ctx->cmt_truncated) {
flb_errno();
- flb_free(ctx);
-
- return -1;
+ /* unwind resources allocated before metrics registration */
+ if (ctx->use_buffer == FLB_TRUE && ctx->ins_emitter) {
+ flb_input_instance_exit(ctx->ins_emitter, ctx->config);
+ flb_input_instance_destroy(ctx->ins_emitter);
+ ctx->ins_emitter = NULL;
+ }
+ else if (ctx->use_buffer == FLB_FALSE) {
+ /* destroy msgpack buffer initialized for unbuffered mode */
+ msgpack_sbuffer_destroy(&ctx->mp_sbuf);
+ }
+ flb_free(ctx);
+ return -1;
}
🤖 Prompt for AI Agents
In plugins/filter_multiline/ml.c around lines 391 to 396, the early return when
ctx->cmt_truncated creation fails leaks resources; before returning you must
unwind and free mode-specific resources: call flb_errno() as shown, then if
ctx->out_mode is buffered (emitter created), destroy/free the emitter, otherwise
if unbuffered free/destroy the msgpack sbuffer used; finally free ctx and return
-1. Ensure both branches clean up their respective allocated objects before
freeing ctx and returning.
We added an interface for configurable buffer limit for multiline.
Also, we implemented robust processing for multiline concatenations.
Enter
[N/A]
in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-test
label to test for all targets (requires maintainer to do).Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Bug Fixes
Tests