Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions lib/logstash/inputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "logstash/util/socket_peer"
require "logstash-input-tcp_jars"
require 'logstash/plugin_mixins/ecs_compatibility_support'
require 'logstash/plugin_mixins/port_management_support'

require "socket"
require "openssl"
Expand Down Expand Up @@ -68,6 +69,8 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
# ecs_compatibility option, provided by Logstash core or the support adapter.
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)

include LogStash::PluginMixins::PortManagementSupport

config_name "tcp"

default :codec, "line"
Expand Down Expand Up @@ -177,15 +180,22 @@ def register
validate_ssl_config!

if server?
@loop = InputLoop.new(@id, @host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)
@port_reservation = port_management.reserve(addr: @host, port: @port) do |reserved_addr, reserved_port|
# we create the loop for the *requested* host addr, because the *reserved* addr
# may be reported overly-broad (e.g, ipv4-only `0.0.0.0` can become ipv6-tolerating `::` depending on arch)
@loop = InputLoop.new(@id, @host, reserved_port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)
end
end
end

def run(output_queue)
@output_queue = output_queue
if server?
@logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}", :ssl_enabled => @ssl_enabled)
@loop.run
@port_reservation.convert do |_, reserved_port|
@logger.info("Starting tcp input listener", :address => "#{@host}:#{reserved_port}", :ssl_enabled => @ssl_enabled)
@loop.start
end
@loop.wait_until_closed
else
run_client()
end
Expand Down
1 change: 1 addition & 0 deletions logstash-input-tcp.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Gem::Specification.new do |s|
# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.2'
s.add_runtime_dependency 'logstash-mixin-port_management_support', '~>1.0'

s.add_runtime_dependency 'logstash-core', '>= 8.1.0'

Expand Down
38 changes: 27 additions & 11 deletions spec/inputs/tcp_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,27 @@
#Cabin::Channel.get(LogStash).level = :debug
describe LogStash::Inputs::Tcp, :ecs_compatibility_support do

def get_port
begin
# Start high to better avoid common services
port = rand(10000..65535)
s = TCPServer.new("127.0.0.1", port)
s.close
return port
rescue Errno::EADDRINUSE
retry
end
##
# yield the block with a port that is available
# @return [Integer]: a port that is available
def find_available_port(host)
with_bound_port(host: host, &:itself)
end

##
# Yields block with a port that is unavailable
# @yieldparam port [Integer]
# @yieldreturn [Object]
# @return [Object]
def with_bound_port(host:"::", port:0, &block)
server = TCPServer.new(host, port)

return yield(server.local_address.ip_port)
ensure
server.close
end

let(:port) { get_port }
let(:port) { find_available_port("127.0.0.1") }

context "codec (PR #1372)" do
it "switches from plain to line" do
Expand Down Expand Up @@ -373,6 +381,14 @@ def get_port
expect { subject.register }.to_not raise_error
end

context "when the port is unavailable" do
it 'raises a helpful exception' do
with_bound_port(host: "127.0.0.1", port: port) do |unavailable_port|
expect { subject.register }.to raise_error(Errno::EADDRINUSE)
end
end
end

context "when using ssl" do
let(:config) do
{
Expand Down
30 changes: 25 additions & 5 deletions src/main/java/org/logstash/tcp/InputLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* Plain TCP Server Implementation.
*/
public final class InputLoop implements Runnable, Closeable {
public final class InputLoop implements Closeable {

// historically this class was passing around the plugin's logger
private static final Logger logger = LogManager.getLogger("logstash.inputs.tcp");
Expand All @@ -46,6 +46,11 @@ public final class InputLoop implements Runnable, Closeable {
*/
private final ServerBootstrap serverBootstrap;

/**
* The channel after starting
*/
private volatile Channel channel;

/**
* SSL configuration.
*/
Expand Down Expand Up @@ -82,11 +87,26 @@ public InputLoop(final String id, final String host, final int port, final Decod
.childHandler(new InputLoop.InputHandler(decoder, sslContext));
}

@Override
public void run() {
public synchronized void start() {
if (channel != null) {
throw new IllegalStateException("Already started");
}
try {
serverBootstrap.bind(host, port).sync().channel().closeFuture().sync();
} catch (final InterruptedException ex) {
channel = serverBootstrap.bind(host, port).sync().channel();
}catch (final InterruptedException ex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}catch (final InterruptedException ex) {
} catch (final InterruptedException ex) {

throw new IllegalStateException(ex);
}
}

public void waitUntilClosed() {
synchronized (this) {
if (channel == null) {
throw new IllegalStateException("not started");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new IllegalStateException("not started");
throw new IllegalStateException("Not started");

}
}
try {
channel.closeFuture().sync();
}catch (final InterruptedException ex) {
throw new IllegalStateException(ex);
}
}
Expand Down