Skip to content
This repository was archived by the owner on Dec 7, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
config_name 'kafka'
milestone 1

attr_accessor :exception_repeats

default :codec, 'json'

config :zk_connect, :validate => :string, :default => 'localhost:2181'
Expand All @@ -21,6 +23,7 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
config :decorate_events, :validate => :boolean, :default => true
config :consumer_id, :validate => :string, :default => nil
config :fetch_message_max_bytes, :validate => :number, :default => 1048576
config :logstash_stop_on_exception_repeat, :validate => :number, :default=>0 # stop logstash after an exception repeats for certain times, 0 for never stop

public
def register
Expand Down Expand Up @@ -53,6 +56,9 @@ def register
def run(logstash_queue)
java_import 'kafka.common.ConsumerRebalanceFailedException'
@logger.info('Running kafka', :group_id => @group_id, :topic_id => @topic_id, :zk_connect => @zk_connect)

@exception_repeats = {}

begin
@consumer_group.run(@consumer_threads,@kafka_client_queue)
begin
Expand All @@ -74,6 +80,23 @@ def run(logstash_queue)
if @consumer_group.running?
@consumer_group.shutdown()
end

if @logstash_stop_on_exception_repeat > 0
case @exception_repeats[e.to_s]
when nil
@exception_repeats[e.to_s] = @logstash_stop_on_exception_repeat
when 1
@logger.error("Exception repeated over #{@logstash_stop_on_exception_repeat} times: ",
:exception => e)
finished
raise LogStash::ShutdownSignal
when 2..@logstash_stop_on_exception_repeat
@exception_repeats[e.to_s] -= 1
else
@logger.warn('Exception repeater error')
end
end

sleep(Float(@consumer_restart_sleep_ms) * 1 / 1000)
retry
end
Expand Down
30 changes: 29 additions & 1 deletion lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
class LogStash::Outputs::Kafka < LogStash::Outputs::Base
config_name 'kafka'
milestone 1


attr_accessor :exception_repeats

default :codec, 'json'

config :broker_list, :validate => :string, :default => 'localhost:9092'
Expand All @@ -27,6 +29,9 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
config :send_buffer_bytes, :validate => :number, :default => 100 * 1024
config :client_id, :validate => :string, :default => ""

config :logstash_stop_on_exception_repeat, :validate => :number, :default=>0 # stop logstash after an exception repeats for certain times, 0 for never stop


public
def register
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/kafka*/libs/*.jar")
Expand Down Expand Up @@ -55,11 +60,16 @@ def register
:send_buffer_bytes => @send_buffer_bytes,
:client_id => @client_id
}



@producer = Kafka::Producer.new(options)
@producer.connect()

@logger.info('Registering kafka producer', :topic_id => @topic_id, :broker_list => @broker_list)

@exception_repeats = {}

@codec.on_event do |event|
begin
@producer.sendMsg(@topic_id,nil,event)
Expand All @@ -68,8 +78,25 @@ def register
rescue => e
@logger.warn('kafka producer threw exception, restarting',
:exception => e)

if @logstash_stop_on_exception_repeat > 0
case @exception_repeats[e.to_s]
when nil
@exception_repeats[e.to_s] = @logstash_stop_on_exception_repeat
when 1
@logger.error("Exception repeated over #{@logstash_stop_on_exception_repeat} times: ",
:exception => e)
finished
raise LogStash::ShutdownSignal
when 2..@logstash_stop_on_exception_repeat
@exception_repeats[e.to_s] -= 1
else
@logger.warn('Exception repeater error')
end
end
end
end

end # def register

def receive(event)
Expand All @@ -82,3 +109,4 @@ def receive(event)
end

end #class LogStash::Outputs::Kafka