From e7bb3d118f2511932af37feffddfebb917357b5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20=C3=98deg=C3=A5rd?= Date: Fri, 30 Jun 2017 12:22:27 +0200 Subject: [PATCH 1/7] Add support for nested JSON inside message object --- README.md | 8 +++++++ .../plugin/out_splunk-http-eventcollector.rb | 21 +++++++++++++------ 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index cf34039..0f9b11e 100644 --- a/README.md +++ b/README.md @@ -197,6 +197,14 @@ Put the following lines to your fluent.conf: format kvp + # log files containing nested JSON + + type splunk-http-eventcollector + server splunk.example.com:8089 + all_items true + nested_json true + + ## Contributing 1. Fork it diff --git a/lib/fluent/plugin/out_splunk-http-eventcollector.rb b/lib/fluent/plugin/out_splunk-http-eventcollector.rb index 19080f6..7e53447 100644 --- a/lib/fluent/plugin/out_splunk-http-eventcollector.rb +++ b/lib/fluent/plugin/out_splunk-http-eventcollector.rb @@ -51,6 +51,7 @@ class SplunkHTTPEventcollectorOutput < BufferedOutput config_param :source, :string, :default => nil config_param :post_retry_max, :integer, :default => 5 config_param :post_retry_interval, :integer, :default => 5 + config_param :nested_json, :bool, :default => false # TODO Find better upper limits config_param :batch_size_limit, :integer, :default => 262144 # 65535 @@ -174,7 +175,11 @@ def format(tag, time, record) json_event = splunk_object.to_json #log.debug "Generated JSON(#{json_event.class.to_s}): #{json_event.to_s}" #log.debug "format: returning: #{[tag, record].to_json.to_s}" - json_event + if @nested_json + json_event + "\n" + else + json_event + end end # By this point, fluentd has decided its buffer is full and it's time to flush @@ -190,11 +195,15 @@ def format(tag, time, record) def write(chunk) log.trace "splunk-http-eventcollector(write) called" - # Break the concatenated string of JSON-formatted events into an Array - split_chunk = chunk.read.split("}{").each do |x| - # Reconstruct the opening{/closing} that #split() strips off. - x.prepend("{") unless x.start_with?("{") - x << "}" unless x.end_with?("}") + if @nested_json + split_chunk = chunk.read.split("\n") + else + # Break the concatenated string of JSON-formatted events into an Array + split_chunk = chunk.read.split("}{").each do |x| + # Reconstruct the opening{/closing} that #split() strips off. + x.prepend("{") unless x.start_with?("{") + x << "}" unless x.end_with?("}") + end end log.debug "Pushing #{numfmt(split_chunk.size)} events (" + "#{numfmt(chunk.read.bytesize)} bytes) to Splunk." From 6e697e9f20d029814b22493c35c57fa3684a101d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20=C3=98deg=C3=A5rd?= Date: Tue, 18 Jul 2017 15:10:28 +0200 Subject: [PATCH 2/7] Add support for metadata through fields param - Metadata can be passed to Splunk by using the "fields" property --- README.md | 7 ++++++ ...-plugin-splunk-http-eventcollector.gemspec | 2 +- .../plugin/out_splunk-http-eventcollector.rb | 22 +++++++++++++++++++ .../test_out_splunk-http-eventcollector.rb | 20 +++++++++++++++++ 4 files changed, 50 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 0f9b11e..7675ee1 100644 --- a/README.md +++ b/README.md @@ -204,6 +204,13 @@ Put the following lines to your fluent.conf: all_items true nested_json true + + # log metadata in addition to the event + + type splunk-http-eventcollector + server splunk.example.com:8089 + fields { "is_test_log": true } + ## Contributing diff --git a/fluent-plugin-splunk-http-eventcollector.gemspec b/fluent-plugin-splunk-http-eventcollector.gemspec index 7b4c95d..58f67e0 100644 --- a/fluent-plugin-splunk-http-eventcollector.gemspec +++ b/fluent-plugin-splunk-http-eventcollector.gemspec @@ -3,7 +3,7 @@ $:.push File.expand_path("../lib", __FILE__) Gem::Specification.new do |gem| gem.name = "fluent-plugin-splunk-http-eventcollector" - gem.version = "0.2.0" + gem.version = "0.3.0" gem.authors = ["Bryce Chidester"] gem.email = ["bryce.chidester@calyptix.com"] gem.summary = "Splunk output plugin for Fluentd" diff --git a/lib/fluent/plugin/out_splunk-http-eventcollector.rb b/lib/fluent/plugin/out_splunk-http-eventcollector.rb index 7e53447..8e1c6b9 100644 --- a/lib/fluent/plugin/out_splunk-http-eventcollector.rb +++ b/lib/fluent/plugin/out_splunk-http-eventcollector.rb @@ -52,6 +52,7 @@ class SplunkHTTPEventcollectorOutput < BufferedOutput config_param :post_retry_max, :integer, :default => 5 config_param :post_retry_interval, :integer, :default => 5 config_param :nested_json, :bool, :default => false + config_param :fields, :hash, :default => {} # TODO Find better upper limits config_param :batch_size_limit, :integer, :default => 262144 # 65535 @@ -116,6 +117,11 @@ def configure(conf) @placeholder_expander = Fluent::SplunkHTTPEventcollectorOutput.placeholder_expander(log) @hostname = Socket.gethostname + + unless @fields.empty? + @fields = inject_env_vars_into_fields + end + # TODO Add other robust input/syntax checks. end # configure @@ -172,6 +178,10 @@ def format(tag, time, record) splunk_object["event"] = convert_to_utf8(record["message"]) end + unless @fields.empty? + splunk_object["fields"] = @fields + end + json_event = splunk_object.to_json #log.debug "Generated JSON(#{json_event.class.to_s}): #{json_event.to_s}" #log.debug "format: returning: #{[tag, record].to_json.to_s}" @@ -329,5 +339,17 @@ def convert_to_utf8(input) end end end + + # Environment variables are passed in with the following format: + # @{ENV['NAME_OF_ENV_VAR']} + def inject_env_vars_into_fields + @fields.each { | _, field_value| + match_data = field_value.to_s.match(/^@\{ENV\['(?.+)'\]\}$/) + if match_data && match_data["env_name"] + field_value.replace(ENV[match_data["env_name"]]) + end + } + end + end # class SplunkHTTPEventcollectorOutput end # module Fluent diff --git a/test/plugin/test_out_splunk-http-eventcollector.rb b/test/plugin/test_out_splunk-http-eventcollector.rb index d1feda7..1db36a9 100644 --- a/test/plugin/test_out_splunk-http-eventcollector.rb +++ b/test/plugin/test_out_splunk-http-eventcollector.rb @@ -176,4 +176,24 @@ def test_utf8 body: { time: time, source: "test", sourcetype: "fluentd", host: "", index: "main", event: { some: { nested: " f-8", with: [" "," ","f-8"]}}}, times: 1 end + + def test_write_fields + stub_request(:post, "https://localhost:8089/services/collector"). + with(headers: {"Authorization" => "Splunk changeme"}). + to_return(body: '{"text":"Success","code":0}') + + d = create_driver(CONFIG + %[ + fields { "cluster": "aws" } + source ${record["source"]} + ]) + + time = Time.parse("2010-01-02 13:14:15 UTC").to_i + d.emit({ "message" => "a message", "source" => "source-from-record"}, time) + d.run + + assert_requested :post, "https://localhost:8089/services/collector", + headers: {"Authorization" => "Splunk changeme"}, + body: { time: time, source: "source-from-record", sourcetype: "fluentd", host: "", index: "main", event: "a message", fields: { cluster: "aws" } }, + times: 1 + end end From 440b9d9a4a3163f802270a13c8e036dc314756d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20=C3=98deg=C3=A5rd?= Date: Tue, 18 Jul 2017 15:17:51 +0200 Subject: [PATCH 3/7] Add Dockerfile to run tests --- Dockerfile | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b2160cc --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM ruby + +ENV HOME=/opt/fluent-plugin-splunk-http-eventcollector + +COPY Gemfile ${HOME}/Gemfile +COPY fluent-plugin-splunk-http-eventcollector.gemspec ${HOME}/fluent-plugin-splunk-http-eventcollector.gemspec + +WORKDIR ${HOME} + +RUN bundle install + +COPY . ${HOME} + +CMD ["rake", "test"] From 134af90b4dea99451c4d48d4cd23f5494b78c3ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20=C3=98deg=C3=A5rd?= Date: Tue, 18 Jul 2017 16:38:05 +0200 Subject: [PATCH 4/7] Add support for reading file for fields --- lib/fluent/plugin/out_splunk-http-eventcollector.rb | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lib/fluent/plugin/out_splunk-http-eventcollector.rb b/lib/fluent/plugin/out_splunk-http-eventcollector.rb index 8e1c6b9..9dae780 100644 --- a/lib/fluent/plugin/out_splunk-http-eventcollector.rb +++ b/lib/fluent/plugin/out_splunk-http-eventcollector.rb @@ -120,6 +120,7 @@ def configure(conf) unless @fields.empty? @fields = inject_env_vars_into_fields + @fields = inject_files_into_fields end # TODO Add other robust input/syntax checks. @@ -351,5 +352,14 @@ def inject_env_vars_into_fields } end + def inject_files_into_fields + @fields.each { | _, field_value | + match_data = field_value.to_s.match(/^@\{FILE\['(?.+)'\]\}$/) + if match_data && match_data["file_path"] + field_value.replace(IO.read(match_data["file_path"])) + end + } + end + end # class SplunkHTTPEventcollectorOutput end # module Fluent From 5079f5a4fc7bff46985f82eee9b78cf7603c8aab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20=C3=98deg=C3=A5rd?= Date: Thu, 20 Jul 2017 10:00:56 +0200 Subject: [PATCH 5/7] Add support for overriding time with iso8601_time param --- .../plugin/out_splunk-http-eventcollector.rb | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/out_splunk-http-eventcollector.rb b/lib/fluent/plugin/out_splunk-http-eventcollector.rb index 9dae780..1664508 100644 --- a/lib/fluent/plugin/out_splunk-http-eventcollector.rb +++ b/lib/fluent/plugin/out_splunk-http-eventcollector.rb @@ -30,6 +30,8 @@ # http://dev.splunk.com/view/event-collector/SP-CAAAE6M # http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector +require 'date' + module Fluent class SplunkHTTPEventcollectorOutput < BufferedOutput @@ -47,6 +49,7 @@ class SplunkHTTPEventcollectorOutput < BufferedOutput config_param :index, :string, :default => 'main' config_param :all_items, :bool, :default => false + config_param :iso8601_time, :string, :default => nil config_param :sourcetype, :string, :default => 'fluentd' config_param :source, :string, :default => nil config_param :post_retry_max, :integer, :default => 5 @@ -166,7 +169,7 @@ def format(tag, time, record) placeholders = @placeholder_expander.prepare_placeholders(placeholder_values) splunk_object = Hash[ - "time" => time.to_i, + "time" => handle_get_time(time, placeholders), "source" => if @source.nil? then tag.to_s else @placeholder_expander.expand(@source, placeholders) end, "sourcetype" => @placeholder_expander.expand(@sourcetype.to_s, placeholders), "host" => @placeholder_expander.expand(@host.to_s, placeholders), @@ -361,5 +364,14 @@ def inject_files_into_fields } end + def handle_get_time(emitted_at_timestamp, placeholders) + if @iso8601_time.nil? + emitted_at_timestamp.to_i + else + time = @placeholder_expander.expand(@iso8601_time, placeholders) + DateTime.iso8601(time).to_time.to_i + end + end + end # class SplunkHTTPEventcollectorOutput end # module Fluent From d41f64c62ab56508ef93da777ef72cb55205dd4f Mon Sep 17 00:00:00 2001 From: Morten Knudsen Date: Thu, 7 Sep 2017 09:09:48 +0200 Subject: [PATCH 6/7] Add support for subsecond timestamps (#1) * Add support for subsecond timestamps * Release 0.4.0 --- fluent-plugin-splunk-http-eventcollector.gemspec | 2 +- lib/fluent/plugin/out_splunk-http-eventcollector.rb | 4 ++-- test/plugin/test_out_splunk-http-eventcollector.rb | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/fluent-plugin-splunk-http-eventcollector.gemspec b/fluent-plugin-splunk-http-eventcollector.gemspec index 58f67e0..22b6960 100644 --- a/fluent-plugin-splunk-http-eventcollector.gemspec +++ b/fluent-plugin-splunk-http-eventcollector.gemspec @@ -3,7 +3,7 @@ $:.push File.expand_path("../lib", __FILE__) Gem::Specification.new do |gem| gem.name = "fluent-plugin-splunk-http-eventcollector" - gem.version = "0.3.0" + gem.version = "0.4.0" gem.authors = ["Bryce Chidester"] gem.email = ["bryce.chidester@calyptix.com"] gem.summary = "Splunk output plugin for Fluentd" diff --git a/lib/fluent/plugin/out_splunk-http-eventcollector.rb b/lib/fluent/plugin/out_splunk-http-eventcollector.rb index 1664508..b92d99a 100644 --- a/lib/fluent/plugin/out_splunk-http-eventcollector.rb +++ b/lib/fluent/plugin/out_splunk-http-eventcollector.rb @@ -366,10 +366,10 @@ def inject_files_into_fields def handle_get_time(emitted_at_timestamp, placeholders) if @iso8601_time.nil? - emitted_at_timestamp.to_i + emitted_at_timestamp.to_f else time = @placeholder_expander.expand(@iso8601_time, placeholders) - DateTime.iso8601(time).to_time.to_i + DateTime.iso8601(time).to_time.to_f end end diff --git a/test/plugin/test_out_splunk-http-eventcollector.rb b/test/plugin/test_out_splunk-http-eventcollector.rb index 1db36a9..389e22f 100644 --- a/test/plugin/test_out_splunk-http-eventcollector.rb +++ b/test/plugin/test_out_splunk-http-eventcollector.rb @@ -120,7 +120,7 @@ def test_write_splitting batch_size_limit 250 ]) - time = Time.parse("2010-01-02 13:14:15 UTC").to_i + time = Time.parse("2010-01-02 13:14:15 UTC").to_f d.emit({"message" => "a" }, time) d.emit({"message" => "b" }, time) d.emit({"message" => "c" }, time) From d35a70e69a9343e380dba889b3cab09a36f46436 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20=C3=98deg=C3=A5rd?= Date: Mon, 16 Oct 2017 09:08:30 +0200 Subject: [PATCH 7/7] Improve logging for 40X errors --- fluent-plugin-splunk-http-eventcollector.gemspec | 2 +- lib/fluent/plugin/out_splunk-http-eventcollector.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fluent-plugin-splunk-http-eventcollector.gemspec b/fluent-plugin-splunk-http-eventcollector.gemspec index 22b6960..177a81b 100644 --- a/fluent-plugin-splunk-http-eventcollector.gemspec +++ b/fluent-plugin-splunk-http-eventcollector.gemspec @@ -3,7 +3,7 @@ $:.push File.expand_path("../lib", __FILE__) Gem::Specification.new do |gem| gem.name = "fluent-plugin-splunk-http-eventcollector" - gem.version = "0.4.0" + gem.version = "0.4.1" gem.authors = ["Bryce Chidester"] gem.email = ["bryce.chidester@calyptix.com"] gem.summary = "Splunk output plugin for Fluentd" diff --git a/lib/fluent/plugin/out_splunk-http-eventcollector.rb b/lib/fluent/plugin/out_splunk-http-eventcollector.rb index b92d99a..25854e6 100644 --- a/lib/fluent/plugin/out_splunk-http-eventcollector.rb +++ b/lib/fluent/plugin/out_splunk-http-eventcollector.rb @@ -288,7 +288,7 @@ def push_buffer(body) next elsif response.code.match(/^40/) # user error - log.error "#{@splunk_uri}: #{response.code} (#{response.message})\n#{response.body}" + log.error "#{@splunk_uri}: #{response.code} (#{response.message})\nReq: #{body}\nRes: #{response.body}" break elsif c < @post_retry_max # retry