Skip to content

Commit 1f81f83

Browse files
authored
plugins model doc (#209)
1 parent 1a8a3c0 commit 1f81f83

File tree

4 files changed

+59
-12
lines changed

4 files changed

+59
-12
lines changed

core/event.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ func ShareTracker(from, to *Event) {
161161
}
162162

163163
if t := from.tracker; t != nil {
164-
t.Increace()
165-
to.tracker = t
164+
to.tracker = t.Copy()
166165
}
167166
}

core/tracker.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,6 @@ func (d *tracker) Copy() *tracker {
2727
return d
2828
}
2929

30-
func (d *tracker) Increace() {
31-
atomic.AddInt32(&d.duty, 1)
32-
}
33-
3430
func (d *tracker) Decreace() {
3531
n := atomic.AddInt32(&d.duty, -1)
3632

docs/PLUGINS_MODEL.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Neptunus Plugins Model
2+
3+
This section is for developers who want to create a new plugin.
4+
5+
There a nine types of plugins and some of it works directly with channels (we call it streaming plugins), and some of it not (callable or child plugins). Start by looking at the interfaces that plugins must conform to and some base structs that must be embedded - [here](../core/plugin.go) and [here](../core/base.go).
6+
7+
## Plugin lifecycle
8+
9+
Any plugin lifecycle is `create` -> `init` -> `set channels (for streaming plugins)` -> `run/call` -> `close`, and the engine takes care of some stages.
10+
11+
### Create
12+
13+
So, at the creation stage engine do some work:
14+
- initialize and set embedded base plugin (already with logger, metric observer and other fields, except channels);
15+
- if required, set child plugins, already created and initialized;
16+
- decode configuration to plugin struct.
17+
18+
Configuration decoder uses old good [mapstructure](https://github.com/mitchellh/mapstructure) with custom decode hooks - for time, duration and [datasize](https://pkg.go.dev/kythe.io/kythe/go/util/datasize#Size). See [kafka](../plugins/inputs/kafka/) as an example of datasize usage.
19+
20+
### Init and set channels
21+
22+
If creation stage completed successfully, engine call `Init() error` plugin method - and it is the place where your plugin MUST create and check all needed resources (e.g. database connection) and validate provided config. If something goes wrong, you MUST free all resources and return error. If no error returned, plugin is considered ready to data processing.
23+
24+
After that, the engine will create and set up channels. In most cases you don't need to think about it - base plugin handles `SetChannels(....)` call.
25+
26+
### Run
27+
28+
Then, if it is a streaming plugin, engine call `Run()` method and this method MUST be blocking. If it is a callable plugin, it will be called by it parent.
29+
30+
Inside `Run()` loop:
31+
- if your plugin is an `input`, you need to write events into `Out` channel;
32+
- if it is a `filter`, plugin read incoming events from `In` channel, do some calculations, then write it to `Acc` if condition satisfied, or write to `Rej` if it's not;
33+
- if it is a `processor`, plugin read incoming events from `In`, do some work, then write it to `Out` or to `Drop`, if event no longer needed;
34+
- finally, if it is an `output`, you read events from `In`, write it to target, and when you done with it, write event into `Done` channel.
35+
36+
So, the basic rule here is **any event MUST be sent to some channel in the end** because of [delivery control](DATA_MODEL.md#delivery-control).
37+
38+
Also, it is your responsibility to write plugin metrics. The base struct contains `Observe()` func which accepts status (Accepted, Failed or Rejected - the last one should be used in filters only) and duration taken to process the event.
39+
40+
You can find some heplers in [plugins/common/](../plugins/common/) dir, such as [Batcher](../plugins/common/batcher/), [Retryer](../plugins/common/retryer/) or [Pool](../plugins/common/pool/).
41+
42+
### Close
43+
44+
When the engine receives a signal to stop a pipeline, it calls inputs `Close() error` method.
45+
46+
If your plugin is an `input`, you need to handle this call, free all resources and break the `Run()` loop. Do not close the output channel! Engine will do this automatically.
47+
48+
If your plugin is a `filter`, `processor` or `output`, you must break the loop when plugin input channel closes. After that, the engine will call the `Close() error` method itself.
49+
50+
There is no guarantee that the close method will be called exactly once, so it MUST be idempotent.
51+
52+
If your streaming plugin uses some callable plugins, you need to close it in `Close() error`.

plugins/processors/stats/stats.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,26 +126,26 @@ func (p *Stats) Run() {
126126
case <-clearTicker.C:
127127
p.cache.dropOlderThan(p.MetricTTL)
128128
case <-flushTicker.C:
129-
p.Flush()
129+
p.flush()
130130
case e, ok := <-p.In:
131131
if !ok {
132-
p.Flush()
132+
p.flush()
133133
return
134134
}
135135

136136
now := time.Now()
137-
p.Observe(e)
137+
p.observe(e)
138138
if p.DropOrigin {
139139
p.Drop <- e
140140
} else {
141141
p.Out <- e
142142
}
143-
p.BaseProcessor.Observe(metrics.EventAccepted, time.Since(now))
143+
p.Observe(metrics.EventAccepted, time.Since(now))
144144
}
145145
}
146146
}
147147

148-
func (p *Stats) Flush() {
148+
func (p *Stats) flush() {
149149
now := time.Now()
150150

151151
p.cache.flush(p.Out, func(m *metric, ch chan<- *core.Event) {
@@ -210,7 +210,7 @@ func (p *Stats) Flush() {
210210
})
211211
}
212212

213-
func (p *Stats) Observe(e *core.Event) {
213+
func (p *Stats) observe(e *core.Event) {
214214
// it is okay if multiple stats will store one label set
215215
// because there is no race condition
216216
labels, ok := p.labelsFunc(e)

0 commit comments

Comments
 (0)