Skip to content
This repository was archived by the owner on Mar 21, 2020. It is now read-only.

Add support for nested JSON inside message object #18

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM ruby

ENV HOME=/opt/fluent-plugin-splunk-http-eventcollector

COPY Gemfile ${HOME}/Gemfile
COPY fluent-plugin-splunk-http-eventcollector.gemspec ${HOME}/fluent-plugin-splunk-http-eventcollector.gemspec

WORKDIR ${HOME}

RUN bundle install

COPY . ${HOME}

CMD ["rake", "test"]
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,21 @@ Put the following lines to your fluent.conf:
format kvp
</match>

# log files containing nested JSON
<match **>
type splunk-http-eventcollector
server splunk.example.com:8089
all_items true
nested_json true
</match>

# log metadata in addition to the event
<match **>
type splunk-http-eventcollector
server splunk.example.com:8089
fields { "is_test_log": true }
</match>

## Contributing

1. Fork it
Expand Down
2 changes: 1 addition & 1 deletion fluent-plugin-splunk-http-eventcollector.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ $:.push File.expand_path("../lib", __FILE__)

Gem::Specification.new do |gem|
gem.name = "fluent-plugin-splunk-http-eventcollector"
gem.version = "0.2.0"
gem.version = "0.4.1"
gem.authors = ["Bryce Chidester"]
gem.email = ["[email protected]"]
gem.summary = "Splunk output plugin for Fluentd"
Expand Down
69 changes: 61 additions & 8 deletions lib/fluent/plugin/out_splunk-http-eventcollector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
# http://dev.splunk.com/view/event-collector/SP-CAAAE6M
# http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector

require 'date'

module Fluent
class SplunkHTTPEventcollectorOutput < BufferedOutput

Expand All @@ -47,10 +49,13 @@ class SplunkHTTPEventcollectorOutput < BufferedOutput
config_param :index, :string, :default => 'main'
config_param :all_items, :bool, :default => false

config_param :iso8601_time, :string, :default => nil
config_param :sourcetype, :string, :default => 'fluentd'
config_param :source, :string, :default => nil
config_param :post_retry_max, :integer, :default => 5
config_param :post_retry_interval, :integer, :default => 5
config_param :nested_json, :bool, :default => false
config_param :fields, :hash, :default => {}

# TODO Find better upper limits
config_param :batch_size_limit, :integer, :default => 262144 # 65535
Expand Down Expand Up @@ -115,6 +120,12 @@ def configure(conf)

@placeholder_expander = Fluent::SplunkHTTPEventcollectorOutput.placeholder_expander(log)
@hostname = Socket.gethostname

unless @fields.empty?
@fields = inject_env_vars_into_fields
@fields = inject_files_into_fields
end

# TODO Add other robust input/syntax checks.
end # configure

Expand Down Expand Up @@ -158,7 +169,7 @@ def format(tag, time, record)
placeholders = @placeholder_expander.prepare_placeholders(placeholder_values)

splunk_object = Hash[
"time" => time.to_i,
"time" => handle_get_time(time, placeholders),
"source" => if @source.nil? then tag.to_s else @placeholder_expander.expand(@source, placeholders) end,
"sourcetype" => @placeholder_expander.expand(@sourcetype.to_s, placeholders),
"host" => @placeholder_expander.expand(@host.to_s, placeholders),
Expand All @@ -171,10 +182,18 @@ def format(tag, time, record)
splunk_object["event"] = convert_to_utf8(record["message"])
end

unless @fields.empty?
splunk_object["fields"] = @fields
end

json_event = splunk_object.to_json
#log.debug "Generated JSON(#{json_event.class.to_s}): #{json_event.to_s}"
#log.debug "format: returning: #{[tag, record].to_json.to_s}"
json_event
if @nested_json
json_event + "\n"
else
json_event
end
end

# By this point, fluentd has decided its buffer is full and it's time to flush
Expand All @@ -190,11 +209,15 @@ def format(tag, time, record)
def write(chunk)
log.trace "splunk-http-eventcollector(write) called"

# Break the concatenated string of JSON-formatted events into an Array
split_chunk = chunk.read.split("}{").each do |x|
# Reconstruct the opening{/closing} that #split() strips off.
x.prepend("{") unless x.start_with?("{")
x << "}" unless x.end_with?("}")
if @nested_json
split_chunk = chunk.read.split("\n")
else
# Break the concatenated string of JSON-formatted events into an Array
split_chunk = chunk.read.split("}{").each do |x|
# Reconstruct the opening{/closing} that #split() strips off.
x.prepend("{") unless x.start_with?("{")
x << "}" unless x.end_with?("}")
end
end
log.debug "Pushing #{numfmt(split_chunk.size)} events (" +
"#{numfmt(chunk.read.bytesize)} bytes) to Splunk."
Expand Down Expand Up @@ -265,7 +288,7 @@ def push_buffer(body)
next
elsif response.code.match(/^40/)
# user error
log.error "#{@splunk_uri}: #{response.code} (#{response.message})\n#{response.body}"
log.error "#{@splunk_uri}: #{response.code} (#{response.message})\nReq: #{body}\nRes: #{response.body}"
break
elsif c < @post_retry_max
# retry
Expand Down Expand Up @@ -320,5 +343,35 @@ def convert_to_utf8(input)
end
end
end

# Environment variables are passed in with the following format:
# @{ENV['NAME_OF_ENV_VAR']}
def inject_env_vars_into_fields
@fields.each { | _, field_value|
match_data = field_value.to_s.match(/^@\{ENV\['(?<env_name>.+)'\]\}$/)
if match_data && match_data["env_name"]
field_value.replace(ENV[match_data["env_name"]])
end
}
end

def inject_files_into_fields
@fields.each { | _, field_value |
match_data = field_value.to_s.match(/^@\{FILE\['(?<file_path>.+)'\]\}$/)
if match_data && match_data["file_path"]
field_value.replace(IO.read(match_data["file_path"]))
end
}
end

def handle_get_time(emitted_at_timestamp, placeholders)
if @iso8601_time.nil?
emitted_at_timestamp.to_f
else
time = @placeholder_expander.expand(@iso8601_time, placeholders)
DateTime.iso8601(time).to_time.to_f
end
end

end # class SplunkHTTPEventcollectorOutput
end # module Fluent
22 changes: 21 additions & 1 deletion test/plugin/test_out_splunk-http-eventcollector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_write_splitting
batch_size_limit 250
])

time = Time.parse("2010-01-02 13:14:15 UTC").to_i
time = Time.parse("2010-01-02 13:14:15 UTC").to_f
d.emit({"message" => "a" }, time)
d.emit({"message" => "b" }, time)
d.emit({"message" => "c" }, time)
Expand Down Expand Up @@ -176,4 +176,24 @@ def test_utf8
body: { time: time, source: "test", sourcetype: "fluentd", host: "", index: "main", event: { some: { nested: " f-8", with: [" "," ","f-8"]}}},
times: 1
end

def test_write_fields
stub_request(:post, "https://localhost:8089/services/collector").
with(headers: {"Authorization" => "Splunk changeme"}).
to_return(body: '{"text":"Success","code":0}')

d = create_driver(CONFIG + %[
fields { "cluster": "aws" }
source ${record["source"]}
])

time = Time.parse("2010-01-02 13:14:15 UTC").to_i
d.emit({ "message" => "a message", "source" => "source-from-record"}, time)
d.run

assert_requested :post, "https://localhost:8089/services/collector",
headers: {"Authorization" => "Splunk changeme"},
body: { time: time, source: "source-from-record", sourcetype: "fluentd", host: "", index: "main", event: "a message", fields: { cluster: "aws" } },
times: 1
end
end