1+ require 'thread'
2+ require 'logger'
3+ require 'fluent/plugin/output'
4+ require 'fluent/plugin/kafka_plugin_util'
5+ require 'waterdrop'
6+
7+ module Fluent ::Plugin
8+ class Fluent ::WaterdropOutput < Output
9+ Fluent ::Plugin . register_output ( 'waterdrop' , self )
10+ helpers :inject , :formatter , :record_accessor
11+
12+ config_param :bootstrap_servers , :string , default : 'localhost:9092' ,
13+ desc : <<-DESC
14+ Set bootstrap servers directly:
15+ <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
16+ DESC
17+
18+ config_param :default_topic , :string , default : nil , desc : <<-DESC
19+ Default output topic when record doesn't have topic field
20+ DESC
21+
22+ config_param :topic_key , :string , :default => 'topic' , :desc => "Field for kafka topic"
23+
24+ config_section :buffer do
25+ config_set_default :chunk_keys , [ "topic" ]
26+ end
27+
28+ config_section :format do
29+ config_set_default :@type , 'json'
30+ config_set_default :add_newline , false
31+ end
32+
33+ def initialize
34+ super
35+
36+ config = {
37+ 'bootstrap.servers' : @bootstrap_servers
38+ }
39+
40+ @producer = WaterDrop ::Producer . new do |conf |
41+ conf . deliver = true
42+ conf . kafka = config
43+ end
44+
45+ @formatter_proc = nil
46+ @topic_key_sym = @topic_key . to_sym
47+ end
48+
49+ def configure ( conf )
50+ super
51+
52+ formatter_conf = conf . elements ( 'format' ) . first
53+ unless formatter_conf
54+ raise Fluent ::ConfigError , "<format> section is required."
55+ end
56+ unless formatter_conf [ "@type" ]
57+ raise Fluent ::ConfigError , "format/@type is required."
58+ end
59+
60+ @formatter_proc = setup_formatter ( formatter_conf )
61+ end
62+
63+ def setup_formatter ( conf )
64+ @formatter = formatter_create ( usage : 'waterdrop-plugin' , conf : conf )
65+ @formatter . method ( :format )
66+ end
67+
68+ def write ( chunk )
69+ tag = chunk . metadata . tag
70+ topic = if @topic
71+ extract_placeholders ( @topic , chunk )
72+ else
73+ ( chunk . metadata . variables && chunk . metadata . variables [ @topic_key_sym ] ) || @default_topic || tag
74+ end
75+ begin
76+ chunk . msgpack_each do |time , record |
77+ record_buf = @formatter_proc . call ( tag , time , record )
78+ @producer . buffer ( topic : topic , payload : record_buf )
79+ end
80+
81+ @producer . flush_sync
82+ end
83+ end
84+
85+ def shutdown
86+ super
87+
88+ @producer . close
89+ end
90+ end
91+ end
0 commit comments