Skip to content

Commit cb90ad7

Browse files
daipomWatson1978
andauthored
Backport(v1.16): server plugin helper: ensure to close all connections at shutdown (#5026) (#5088)
**Which issue(s) this PR fixes**: * Backport #5026 **What this PR does / why we need it**: TCP server with `server` helper does not close all connections at shutdown process. When receiving data from multiple clients, the server receive the data continuously because the connection is not closed. The server will shut down properly by this PR ### Reproduce 1. Launch Fluentd with following config file. 2. Send syslog data from two or more clients using following client script 3. Terminate Fluentd 4. Relaunch Fluentd, then it shows the `2025-07-16 14:12:26 +0900 [warn]: #0 restoring buffer file: path = xxxxxxxxx` in logs. * config ``` <source> @type syslog tag system <transport tcp> </transport> bind 0.0.0.0 port 5140 </source> <match **> @type file path "#{File.expand_path('~/tmp/fluentd/maillog')}" <buffer> @type file path "#{File.expand_path('~/tmp/fluentd/buffer/buffer_syslog_maillog')}" flush_at_shutdown true </buffer> </match> ``` * client script ```ruby require 'bundler/inline' gemfile do source 'https://rubygems.org' gem 'remote_syslog_sender' end def create_client Thread.new do sender = RemoteSyslogSender.new('127.0.0.1', 5140, protocol: :tcp) loop do sender.transmit("message body") sleep 0.5 end end end clients = [] 3.times do clients << create_client end clients.each(&:join) ``` Example code. ``` @ary = [1,2,3] # It would like 1, 2, and 3 to be processed. # However, following code handles 1 and 3. @ary.each do |i| puts i @ary.delete(i) end ``` **Docs Changes**: Not needed. **Release Note**: Same as the title. Signed-off-by: Shizuo Fujita <fujita@clear-code.com> Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com> Co-authored-by: Shizuo Fujita <fujita@clear-code.com>
1 parent c301faa commit cb90ad7

2 files changed

Lines changed: 23 additions & 1 deletion

File tree

lib/fluent/plugin_helper/server.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,10 @@ def stop
347347
end
348348

349349
def shutdown
350-
@_server_connections.each do |conn|
350+
# When it invokes conn.cose, it reduces elements in @_server_connections by close_callback,
351+
# and it reduces the number of loops. This prevents the connection closing.
352+
# So, it requires invoking #dup to avoid the problem.
353+
@_server_connections.dup.each do |conn|
351354
conn.close rescue nil
352355
end
353356

test/plugin_helper/test_server.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,25 @@ class Dummy < Fluent::Plugin::TestBase
364364
d2.stop; d2.before_shutdown; d2.shutdown; d2.after_shutdown; d2.close; d2.terminate
365365
end
366366
end
367+
368+
test 'close all connections by shutdown' do
369+
@d.server_create_tcp(:s, @port) do |data, conn|
370+
end
371+
372+
client_sockets = []
373+
5.times do
374+
client_sockets << TCPSocket.open("127.0.0.1", @port)
375+
end
376+
waiting(4){ sleep 0.1 until @d.instance_variable_get(:@_server_connections).size == 5 }
377+
378+
@d.stop
379+
@d.before_shutdown
380+
@d.shutdown
381+
382+
assert_true @d.instance_variable_get(:@_server_connections).empty?
383+
ensure
384+
client_sockets.each(&:close)
385+
end
367386
end
368387

369388
sub_test_case '#server_create' do

0 commit comments

Comments
 (0)