Skip to content

Support Fluent 1.x #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
version: 2.1

orbs:
ruby-orbs: sue445/ruby-orbs@volatile

jobs:
test:
docker:
- image: cimg/ruby:3.0
steps:
- checkout
- ruby-orbs/bundle-install:
gemspec_name: fluent-plugin-time-series-counter
with_gemfile_lock: false
- run: bundle exec rake test
workflows:
test:
jobs:
- test
9 changes: 9 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
require "bundler/gem_tasks"
require 'rake/testtask'

Rake::Task[:release].clear

Rake::TestTask.new(:test) do |t|
t.libs << 'lib' << 'test'
t.pattern = 'test/**/test_*.rb'
end

task default: :test
6 changes: 4 additions & 2 deletions fluent-plugin-time-series-counter.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Gem::Specification.new do |spec|
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ["lib"]

spec.add_development_dependency "bundler", "~> 1.7"
spec.add_development_dependency "rake", "~> 10.0"
spec.add_dependency 'fluentd', '>= 1', '< 2'

spec.add_development_dependency 'rake'
spec.add_development_dependency 'test-unit'
end
171 changes: 80 additions & 91 deletions lib/fluent/plugin/out_time_series_counter.rb
Original file line number Diff line number Diff line change
@@ -1,113 +1,102 @@
module Fluent
class TimeSeriesCounter < Fluent::BufferedOutput
Fluent::Plugin.register_output('time_series_counter', self)

unless method_defined?(:log)
define_method('log') { $log }
require 'fluent/plugin/output'

class Fluent::Plugin::TimeSeriesCounter < Fluent::Plugin::Output
Fluent::Plugin.register_output('time_series_counter', self)
helpers :event_emitter

config_param :tag, :string, default: "tsc"
config_param :count_keys, :array, value_type: :string, alias: :count_key
config_param :count_key_delimiter, :string, default: ":"
config_param :count_value_name, :string, default: "count"
config_param :unit, :array, value_type: :string
config_param :uniq_key, :string, default: "tsc_key"
config_param :unit_key, :string, default: "tsc_unit"
config_param :time_key, :string, default: "tsc_time"

def configure(conf)
super

@units = unit.inject({}) do |hash, i|
hash[i] = true
hash
end
end

config_param :tag, :string, :default => "tsc"
config_param :count_key, :string, :default => nil
config_param :count_key_delimiter, :string, :default => ":"
config_param :count_value_name, :string, :default => "count"
config_param :unit, :string, :default => nil
config_param :uniq_key, :string, :default => "tsc_key"
config_param :unit_key, :string, :default => "tsc_unit"
config_param :time_key, :string, :default => "tsc_time"

def initialize
super
end
def formatted_to_msgpack_binary?
true
end

def configure(conf)
super

if !count_key
raise ConfigError, "out_time_series_counter: required 'count_key' parameter."
end
def format(tag, time, record)
[tag, time, record].to_msgpack
end

if !unit
raise ConfigError, "out_time_series_counter: required 'unit' parameter."
def write(chunk)
stats = {}

chunk.msgpack_each do |tag, time, record|
skip = false

@count_keys.each do |k|
# skip record if a record does not have requried count_keys
skip = true unless record[k]
end
next if skip

@count_keys = count_key.split(/\s*,\s*/).sort
@unit = unit.split(/\s*,\s*/).inject({}) do |hash, i|
hash[i] = true
hash
if @units['min']
count(stats, record, time, "min")
end
end

def format(tag, time, record)
[tag, time, record].to_msgpack
end
if @units['hour']
count(stats, record, time, "hour")
end

def write(chunk)
stats = {}
chunk.msgpack_each do |tag, time, record|
skip = false
next unless time
@count_keys.each do |k|
# skip record if a record does not have requried count_keys
skip = true unless record[k]
end
next if skip

if @unit['min']
count(stats, record, time, "min")
end

if @unit['hour']
count(stats, record, time, "hour")
end

if @unit['day']
count(stats, record, time, "day")
end
if @units['day']
count(stats, record, time, "day")
end
end
output_stats(stats)
end

output_stats(stats)
private
def create_uniq_key(record, unit, time)
uniq_key = []
@count_keys.each do |k|
uniq_key << record[k]
end
uniq_key << time.to_s
uniq_key << unit
uniq_key.join(@count_key_delimiter)
end

private
def create_uniq_key(record, unit, time)
uniq_key = []
@count_keys.each do |k|
uniq_key << record[k]
end
uniq_key << time.to_s
uniq_key << unit
uniq_key.join(@count_key_delimiter)
def count(stats, record, time, unit)
case unit
when "min"
unit_time = time - (time % 60)
when "hour"
unit_time = time - (time % 3600)
when "day"
unit_time = time - (time % 86400)
else
return
end

def count(stats, record, time, unit)
unix_time = 0
case unit
when "min"
unit_time = time - (time % 60)
when "hour"
unit_time = time - (time % 3600)
when "day"
unit_time = time - (time % 86400)
else
return
end
tsc_key = create_uniq_key(record, unit, unit_time)
unless stats[tsc_key]
stats[tsc_key] = {@count_value_name => 0} unless stats[tsc_key]
@count_keys.each do |k|
stats[tsc_key][k] = record[k]
end
stats[tsc_key][@unit_key] = unit
stats[tsc_key][@time_key] = unit_time
tsc_key = create_uniq_key(record, unit, unit_time)
unless stats[tsc_key]
stats[tsc_key] = {@count_value_name => 0} unless stats[tsc_key]
@count_keys.each do |k|
stats[tsc_key][k] = record[k]
end
stats[tsc_key][@count_value_name] += 1
stats[tsc_key][@unit_key] = unit
stats[tsc_key][@time_key] = unit_time
end
stats[tsc_key][@count_value_name] += 1
end

def output_stats(stats)
stats.each do |k, v|
v[@uniq_key] = k
Fluent::Engine.emit("#{@tag}", Fluent::Engine.now, v)
end
def output_stats(stats)
stats.each do |k, v|
v[@uniq_key] = k
router.emit(@tag, Fluent::Engine.now, v)
end
end
end
8 changes: 8 additions & 0 deletions test/helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
$LOAD_PATH.unshift(File.expand_path("../../", __FILE__))
require "test-unit"
require "fluent/test"
require "fluent/test/driver/output"
require "fluent/test/helpers"

Test::Unit::TestCase.include(Fluent::Test::Helpers)
Test::Unit::TestCase.extend(Fluent::Test::Helpers)
151 changes: 151 additions & 0 deletions test/plugin/test_out_time_series_counter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
require 'helper'

require 'fluent/plugin/out_time_series_counter'

class TimeSeriesCounterTest < Test::Unit::TestCase
include Fluent::Test::Helpers

def setup
Fluent::Test.setup # Setup test for Fluentd (Required)
end

CONFIG = %[
count_key id
unit hour
]

def create_driver(conf = CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::TimeSeriesCounter).configure(conf)
end

sub_test_case 'configuration' do
test 'basic configuration' do
d = create_driver
assert_equal ['id'], d.instance.count_keys
assert_equal ['hour'], d.instance.unit
end

test 'invalid configration' do
assert_raise(Fluent::ConfigError) {
create_driver(
<<~EOS
count_key id
EOS
)
}

assert_raise(Fluent::ConfigError) {
create_driver(
<<~EOS
unit hour
EOS
)
}
end
end

sub_test_case 'count' do
test 'count per count_key' do
d = create_driver
d.run(default_tag: 'test') do
d.feed({'id' => 1})
d.feed({'id' => 1})
d.feed({'id' => 2})
end

# result == uniq(count_key)
assert_equal(2, d.events.size)

results = d.events.flat_map{|_key, _time, result| result }

result = results.find{ |r| r["id"] == 1 }
assert_equal(2, result["count"])

result = results.find{ |r| r["id"] == 2 }
assert_equal(1, result["count"])
end

test 'count per unit(hour)' do
d = create_driver

d.run(default_tag: 'test') do
d.feed(event_time('2022-09-01 10:00:00 UTC'), {'id' => 1})
d.feed(event_time('2022-09-01 10:00:00 UTC'), {'id' => 1})

d.feed(event_time('2022-09-01 11:00:00 UTC'), {'id' => 1})
end

# result == uniq(count_key)
assert_equal(2, d.events.size)

results = d.events.flat_map{|_key, _time, result| result }

# Time.at(1662026400) => 2022-09-01 10:00:00 UTC
result = results.find{ |r| r["tsc_time"] == 1662026400 }
assert_equal(2, result["count"])

# Time.at(1662030000) => 2022-09-01 11:00:00 UTC
result = results.find{ |r| r["tsc_time"] == 1662030000 }
assert_equal(1, result["count"])
end

test 'count only feeds with all keys' do
d = create_driver(
<<~EOS
count_key a,b,c
unit hour
EOS
)

d.run(default_tag: 'test') do
# no count
d.feed({a: 1})
d.feed({a: 1, b: 2})
d.feed({a: 1, b: 2})

# count
d.feed({a: 1, b: 2, c: 3})
d.feed({a: 1, b: 2, c: 3})
d.feed({a: 1, b: 2, c: 3})
end

assert_equal(1, d.events.size)

_key, _time, record = d.events.first

assert_equal(3, record['count'])
end

test 'count per units' do
d = create_driver(
<<~EOS
count_key id
unit min,hour,day
EOS
)

d.run(default_tag: 'test') do
d.feed(event_time('2022-09-01 10:00:00 UTC'), {'id' => 1})
d.feed(event_time('2022-09-01 10:01:00 UTC'), {'id' => 1})

d.feed(event_time('2022-09-01 11:00:00 UTC'), {'id' => 1})

d.feed(event_time('2022-09-02 10:00:00 UTC'), {'id' => 1})
d.feed(event_time('2022-09-02 10:00:00 UTC'), {'id' => 1})
d.feed(event_time('2022-09-02 10:00:00 UTC'), {'id' => 1})

d.feed(event_time('2022-09-01 10:00:00 UTC'), {'id' => 2})
end

results = d.events.map{|_,_,r| r}

assert_equal(4, results.count{|r| r['id'] == 1 && r['tsc_unit'] == 'min'})
assert_equal(3, results.count{|r| r['id'] == 1 && r['tsc_unit'] == 'hour'})
assert_equal(2, results.count{|r| r['id'] == 1 && r['tsc_unit'] == 'day'})

assert_equal(1, results.count{|r| r['id'] == 2 && r['tsc_unit'] == 'min'})
assert_equal(1, results.count{|r| r['id'] == 2 && r['tsc_unit'] == 'hour'})
assert_equal(1, results.count{|r| r['id'] == 2 && r['tsc_unit'] == 'day'})
end
end
end