-
Notifications
You must be signed in to change notification settings - Fork 1.8k
downstream: implement pause and resume functionality #10790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Eduardo Silva <[email protected]>
Signed-off-by: Eduardo Silva <[email protected]>
WalkthroughAdds a pause/resume mechanism to downstreams and integrates it into the in_tcp input. Introduces a paused state in downstreams, APIs to control it, and connection-level busy/pending_close flags. The TCP plugin wires pause/resume callbacks, adjusts collect behavior when connections drop, and refactors connection event handling for consistent cleanup. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant IM as Input Manager
participant TCP as in_tcp_plugin
participant DS as Downstream
participant Conn as tcp_conn (many)
IM->>TCP: cb_pause()
TCP->>DS: flb_downstream_pause(stream)
loop for each active connection
TCP->>Conn: if busy: set pending_close<br/>else: delete connection
end
Note over TCP,Conn: New: defer close when busy
IM->>TCP: cb_resume()
TCP->>DS: flb_downstream_resume(stream)
sequenceDiagram
autonumber
participant EP as Event Loop
participant Conn as tcp_conn
participant DS as Downstream
EP->>DS: flb_downstream_conn_get()
alt stream paused
DS-->>EP: return NULL (and drop server_fd pending accept)
else not paused
DS-->>EP: return connection
end
EP->>Conn: tcp_conn_event()
Note over Conn: busy = true
alt read/process success
Conn-->>EP: ret = bytes
else error/invalid
Conn-->>EP: ret = -1
end
alt pending_close set
Conn->>EP: delete connection
EP-->>Conn: finalize
end
Note over Conn: busy = false (on cleanup)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
fba79d8
to
c143bed
Compare
c143bed
to
8db61eb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (6)
plugins/in_tcp/tcp_conn.h (1)
53-55
: Add brief concurrency note for the new flagsClarify in a comment that busy is only toggled within the event handler and pending_close may be set from pause callbacks; helps future maintainers reason about races.
- int busy; /* Connection is being processed */ - int pending_close; /* Defer closing until processing ends */ + int busy; /* Toggled in tcp_conn_event() during processing */ + int pending_close; /* Set by pause path; tcp_conn_event() will close on exit */src/flb_downstream.c (2)
286-296
: Paused mode: UDP/unix_dgram may spinWhen paused and transport is dgram, returning NULL without draining can cause the FD to stay readable and the collector to fire repeatedly. Consider dropping one datagram to relieve pressure, or temporarily disabling the event while paused.
- if (stream->paused) { - if (transport != FLB_TRANSPORT_UDP && - transport != FLB_TRANSPORT_UNIX_DGRAM) { + if (stream->paused) { + if (transport != FLB_TRANSPORT_UDP && + transport != FLB_TRANSPORT_UNIX_DGRAM) { connection_fd = flb_net_accept(stream->server_fd); if (connection_fd >= 0) { flb_socket_close(connection_fd); } - } + } + else { + /* Drain one datagram to avoid hot loop while paused */ + char discard[512]; + (void) recvfrom(stream->server_fd, discard, sizeof(discard), MSG_DONTWAIT, NULL, NULL); + } return NULL; }
372-384
: Guard pause/resume with stream lock; init paused explicitlyTiny race window exists setting paused without a lock. Also, initialize paused to FLB_FALSE in setup for non-calloc callers.
void flb_downstream_pause(struct flb_downstream *stream) { if (stream) { - stream->paused = FLB_TRUE; + flb_stream_acquire_lock(&stream->base, FLB_TRUE); + stream->paused = FLB_TRUE; + flb_stream_release_lock(&stream->base); } } void flb_downstream_resume(struct flb_downstream *stream) { if (stream) { - stream->paused = FLB_FALSE; + flb_stream_acquire_lock(&stream->base, FLB_TRUE); + stream->paused = FLB_FALSE; + flb_stream_release_lock(&stream->base); } }Additionally, set default in setup:
int flb_downstream_setup(struct flb_downstream *stream, @@ stream->server_fd = FLB_INVALID_SOCKET; stream->host = flb_strdup(host); stream->port = port; + stream->paused = FLB_FALSE;
include/fluent-bit/flb_downstream.h (1)
49-51
: Public contract: document paused semanticsAdd a brief comment that paused only affects acceptance of new connections; existing connections are left intact until closed by the plugin (or flagged pending_close).
- /* pause state */ - int paused; + /* pause state: when true, new connections are accepted-and-closed (streaming) + * or dropped (dgram). Existing active connections are not forcibly terminated. */ + int paused;Also applies to: 82-84
plugins/in_tcp/tcp.c (1)
147-167
: Pause path: verify callback threading and list safetyEnsure cb_pause runs on the same engine thread that drives tcp_conn_event; otherwise, iterating ctx->connections while events may fire risks races. If multi-worker or cross-thread invocations are possible, guard with an appropriate lock or quiesce the collector before iterating.
static void in_tcp_pause(void *data, struct flb_config *config) { struct flb_in_tcp_config *ctx = data; @@ - flb_downstream_pause(ctx->downstream); + flb_downstream_pause(ctx->downstream); + /* If cb_pause can be called off-thread, pause collector here */ + /* flb_input_collector_pause(ctx->ins, ctx->collector_id); */ @@ if (conn->busy) { conn->pending_close = FLB_TRUE; continue; } tcp_conn_del(conn); } }plugins/in_tcp/tcp_conn.c (1)
370-372
: Return 0 on normal read to match event-loop expectationsReturning bytes is unnecessary; zero is typical for “handled” without teardown.
- ret = bytes; + ret = 0; goto cleanup;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
include/fluent-bit/flb_downstream.h
(2 hunks)plugins/in_tcp/tcp.c
(3 hunks)plugins/in_tcp/tcp_conn.c
(6 hunks)plugins/in_tcp/tcp_conn.h
(1 hunks)src/flb_downstream.c
(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
include/fluent-bit/flb_downstream.h (1)
src/flb_downstream.c (2)
flb_downstream_pause
(372-377)flb_downstream_resume
(379-384)
plugins/in_tcp/tcp_conn.c (1)
src/flb_pack.c (2)
flb_pack_state_reset
(385-395)flb_pack_state_init
(354-383)
src/flb_downstream.c (1)
src/flb_network.c (1)
flb_net_accept
(1850-1877)
plugins/in_tcp/tcp.c (2)
src/flb_downstream.c (2)
flb_downstream_pause
(372-377)flb_downstream_resume
(379-384)plugins/in_tcp/tcp_conn.c (1)
tcp_conn_del
(462-484)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (24)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
- GitHub Check: PR - fuzzing test
🔇 Additional comments (5)
plugins/in_tcp/tcp.c (3)
45-47
: Correct handling when downstream is pausedReturning 0 (no error) here is right; avoids disabling the collector while paused.
169-176
: Resume hook looks goodNo additional work needed; downstream resumes accepting connections.
216-218
: Plugin wires pause/resume correctlycb_pause/cb_resume registration aligns with the new API.
plugins/in_tcp/tcp_conn.c (2)
383-391
: Cleanup path is solidbusy is cleared and pending_close honored; this integrates well with the pause flow.
456-458
: Init new flagsGood default initialization of busy/pending_close.
if (!tmp) { | ||
conn->busy = FLB_FALSE; | ||
flb_errno(); | ||
return -1; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Realloc failure leaks the live connection
On allocation failure, the connection remains registered; return -1 without deleting will leave a stuck FD. Close the connection before returning.
- if (!tmp) {
- conn->busy = FLB_FALSE;
- flb_errno();
- return -1;
- }
+ if (!tmp) {
+ conn->busy = FLB_FALSE;
+ flb_errno();
+ tcp_conn_del(conn);
+ return -1;
+ }
🤖 Prompt for AI Agents
In plugins/in_tcp/tcp_conn.c around lines 289-293, the realloc failure path
returns -1 while leaving the live connection registered, leaking the open FD;
fix by closing and fully cleaning up the connection before returning: close the
socket/FD, unregister the connection from the event loop/io subsystem, free any
allocated buffers and connection state, mark conn as not busy (or remove it from
the active list) and then return -1 so no stray FD remains.
ret_payload = parse_payload_json(conn); | ||
if (ret_payload == 0) { | ||
/* Incomplete JSON message, we need more data */ | ||
return -1; | ||
ret = -1; | ||
goto cleanup; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not return -1 on incomplete payloads
For partial JSON/raw frames you now set ret = -1 and exit via cleanup, which risks the engine treating this as an error. Keep the connection alive and return 0 so we await more data.
- if (ret_payload == 0) {
- /* Incomplete JSON message, we need more data */
- ret = -1;
- goto cleanup;
- }
+ if (ret_payload == 0) {
+ /* Incomplete JSON message, wait for more data */
+ ret = 0;
+ goto cleanup;
+ }
@@
- if (ret_payload == 0) {
- ret = -1;
- goto cleanup;
- }
+ if (ret_payload == 0) {
+ ret = 0;
+ goto cleanup;
+ }
- else if (ret_payload == -1) {
+ else if (ret_payload == -1) {
conn->buf_len = 0;
- ret = -1;
+ /* Keep the connection; skip the invalid chunk */
+ ret = 0;
goto cleanup;
}
Also applies to: 348-355
🤖 Prompt for AI Agents
In plugins/in_tcp/tcp_conn.c around lines 331-336 (and similarly for 348-355),
the code treats an incomplete JSON/raw payload as an error by setting ret = -1
and jumping to cleanup; instead keep the connection alive and indicate we need
more data by setting ret = 0 (or returning 0) and avoid the cleanup/error path
so the caller will wait for additional bytes; update both spots to not set -1 or
call cleanup on incomplete payloads and ensure any buffers/state remain intact
for subsequent reads.
This PR extends the downstream interface with two new functions to pause and resume the acceptance of connections. Once paused, the downstream simply accept and close the socket, on resume it keeps accepting.
Input plugins that uses downstream interface as listeners can now use these news functions from their pause/resume callbacks for better handling of connections.
In addition, this PR extends TCP input plugin and adds a default I/O timeout to HTTP output.
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Bug Fixes