Skip to content

Commit f4d599c

Browse files
committed
common sql connector
1 parent 4d44a2f commit f4d599c

File tree

8 files changed

+140
-221
lines changed

8 files changed

+140
-221
lines changed

plugins/common/sql/connector.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package sql
2+
3+
import (
4+
"errors"
5+
"time"
6+
7+
"github.com/jmoiron/sqlx"
8+
9+
"github.com/gekatateam/neptunus/plugins/common/tls"
10+
)
11+
12+
type Connector struct {
13+
Driver string `mapstructure:"driver"`
14+
Dsn string `mapstructure:"dsn"`
15+
Username string `mapstructure:"username"`
16+
Password string `mapstructure:"password"`
17+
ConnsMaxIdleTime time.Duration `mapstructure:"conns_max_idle_time"`
18+
ConnsMaxLifetime time.Duration `mapstructure:"conns_max_life_time"`
19+
ConnsMaxOpen int `mapstructure:"conns_max_open"`
20+
ConnsMaxIdle int `mapstructure:"conns_max_idle"`
21+
QueryTimeout time.Duration `mapstructure:"query_timeout"`
22+
*tls.TLSClientConfig `mapstructure:",squash"`
23+
}
24+
25+
func (c *Connector) Init() (*sqlx.DB, error) {
26+
if len(c.Dsn) == 0 {
27+
return nil, errors.New("dsn required")
28+
}
29+
30+
if len(c.Driver) == 0 {
31+
return nil, errors.New("driver required")
32+
}
33+
34+
tlsConfig, err := c.TLSClientConfig.Config()
35+
if err != nil {
36+
return nil, err
37+
}
38+
39+
println(tlsConfig == nil)
40+
41+
db, err := OpenDB(c.Driver, c.Dsn, c.Username, c.Password, tlsConfig)
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
if c.QueryTimeout <= 0 {
47+
return nil, errors.New("queryTimeout cannot be less or equal to zero")
48+
}
49+
50+
db.DB.SetConnMaxIdleTime(c.ConnsMaxIdleTime)
51+
db.DB.SetConnMaxLifetime(c.ConnsMaxLifetime)
52+
db.DB.SetMaxIdleConns(c.ConnsMaxIdle)
53+
db.DB.SetMaxOpenConns(c.ConnsMaxOpen)
54+
55+
if err := db.Ping(); err != nil {
56+
db.Close()
57+
return nil, err
58+
}
59+
60+
return db, nil
61+
}

plugins/common/sql/sql.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ type QueryInfo struct {
120120
File string `mapstructure:"file"`
121121
}
122122

123-
func (q *QueryInfo) Init() error {
123+
func (q *QueryInfo) Init(required bool) error {
124+
if required && len(q.File) == 0 && len(q.Query) == 0 {
125+
return errors.New("query or file required")
126+
}
127+
124128
if len(q.File) > 0 {
125129
rawQuery, err := os.ReadFile(q.File)
126130
if err != nil {

plugins/inputs/sql/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ Drivers use plugin TLS configuration.
4343
wait_for_delivery = true
4444

4545
# queries execution timeout
46-
timeout = "30s"
46+
query_timeout = "30s"
4747

4848
# database connection params - https://pkg.go.dev/database/sql#DB.SetConnMaxIdleTime
4949
conns_max_idle_time = "10m"

plugins/inputs/sql/sql.go

Lines changed: 26 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package sql
33
import (
44
"context"
55
"database/sql"
6-
"errors"
76
"fmt"
87
"sync"
98
"time"
@@ -38,19 +37,11 @@ var txIsolationLevels = map[string]sql.IsolationLevel{
3837
}
3938

4039
type Sql struct {
41-
*core.BaseInput `mapstructure:"-"`
42-
EnableMetrics bool `mapstructure:"enable_metrics"`
43-
Driver string `mapstructure:"driver"`
44-
Dsn string `mapstructure:"dsn"`
45-
Username string `mapstructure:"username"`
46-
Password string `mapstructure:"password"`
47-
ConnsMaxIdleTime time.Duration `mapstructure:"conns_max_idle_time"`
48-
ConnsMaxLifetime time.Duration `mapstructure:"conns_max_life_time"`
49-
ConnsMaxOpen int `mapstructure:"conns_max_open"`
50-
ConnsMaxIdle int `mapstructure:"conns_max_idle"`
51-
Timeout time.Duration `mapstructure:"timeout"`
52-
Interval time.Duration `mapstructure:"interval"`
53-
WaitForDelivery bool `mapstructure:"wait_for_delivery"`
40+
*core.BaseInput `mapstructure:"-"`
41+
*csql.Connector `mapstructure:",squash"`
42+
EnableMetrics bool `mapstructure:"enable_metrics"`
43+
Interval time.Duration `mapstructure:"interval"`
44+
WaitForDelivery bool `mapstructure:"wait_for_delivery"`
5445

5546
Transactional bool `mapstructure:"transactional"`
5647
IsolationLevel string `mapstructure:"isolation_level"`
@@ -62,8 +53,7 @@ type Sql struct {
6253
KeepValues KeepValues `mapstructure:"keep_values"`
6354
LabelColumns map[string]string `mapstructure:"labelcolumns"`
6455

65-
*ider.Ider `mapstructure:",squash"`
66-
*tls.TLSClientConfig `mapstructure:",squash"`
56+
*ider.Ider `mapstructure:",squash"`
6757

6858
keepIndex map[string]int
6959
keepValues map[string]any
@@ -83,22 +73,6 @@ type KeepValues struct {
8373
}
8474

8575
func (i *Sql) Init() error {
86-
if len(i.Dsn) == 0 {
87-
return errors.New("dsn required")
88-
}
89-
90-
if len(i.Driver) == 0 {
91-
return errors.New("driver required")
92-
}
93-
94-
if len(i.OnPoll.File) == 0 && len(i.OnPoll.Query) == 0 {
95-
return errors.New("onPoll.query or onPoll.file required")
96-
}
97-
98-
if err := i.Ider.Init(); err != nil {
99-
return err
100-
}
101-
10276
if i.Transactional {
10377
var ok bool
10478
if i.txLevel, ok = txIsolationLevels[i.IsolationLevel]; !ok {
@@ -129,42 +103,31 @@ func (i *Sql) Init() error {
129103
i.keepIndex[v] = keepAll
130104
}
131105

132-
if err := i.OnInit.Init(); err != nil {
106+
if err := i.OnInit.Init(false); err != nil {
133107
return fmt.Errorf("onInit: %w", err)
134108
}
135109

136-
if err := i.OnPoll.Init(); err != nil {
110+
if err := i.OnPoll.Init(true); err != nil {
137111
return fmt.Errorf("onPoll: %w", err)
138112
}
139113

140-
if err := i.OnDone.Init(); err != nil {
114+
if err := i.OnDone.Init(false); err != nil {
141115
return fmt.Errorf("onDone: %w", err)
142116
}
143117

144-
tlsConfig, err := i.TLSClientConfig.Config()
145-
if err != nil {
118+
if err := i.Ider.Init(); err != nil {
146119
return err
147120
}
148121

149-
db, err := csql.OpenDB(i.Driver, i.Dsn, i.Username, i.Password, tlsConfig)
122+
db, err := i.Connector.Init()
150123
if err != nil {
151124
return err
152125
}
153-
154-
db.DB.SetConnMaxIdleTime(i.ConnsMaxIdleTime)
155-
db.DB.SetConnMaxLifetime(i.ConnsMaxLifetime)
156-
db.DB.SetMaxIdleConns(i.ConnsMaxIdle)
157-
db.DB.SetMaxOpenConns(i.ConnsMaxOpen)
158-
159-
if err := db.Ping(); err != nil {
160-
defer db.Close()
161-
return err
162-
}
163126
i.db = db
164127

165128
if len(i.OnInit.Query) > 0 {
166129
if err := i.init(); err != nil {
167-
defer i.db.Close()
130+
i.db.Close()
168131
return fmt.Errorf("onInit query failed: %w", err)
169132
}
170133
}
@@ -216,7 +179,7 @@ func (i *Sql) Run() {
216179
}
217180

218181
func (i *Sql) init() error {
219-
ctx, cancel := context.WithTimeout(context.Background(), i.Timeout)
182+
ctx, cancel := context.WithTimeout(context.Background(), i.QueryTimeout)
220183
defer cancel()
221184

222185
rows, err := i.db.QueryContext(ctx, i.OnInit.Query)
@@ -244,7 +207,7 @@ func (i *Sql) init() error {
244207

245208
func (i *Sql) poll() {
246209
now := time.Now()
247-
ctx, cancel := context.WithTimeout(context.Background(), i.Timeout)
210+
ctx, cancel := context.WithTimeout(context.Background(), i.QueryTimeout)
248211
defer cancel()
249212

250213
var querier sqlx.ExtContext = i.db
@@ -393,18 +356,19 @@ func (i *Sql) keepColumns(from, to map[string]any, first, last bool) {
393356
func init() {
394357
plugins.AddInput("sql", func() core.Input {
395358
return &Sql{
396-
ConnsMaxIdleTime: 10 * time.Minute,
397-
ConnsMaxLifetime: 10 * time.Minute,
398-
ConnsMaxOpen: 2,
399-
ConnsMaxIdle: 1,
400-
Transactional: false,
401-
IsolationLevel: "Default",
402-
Timeout: 30 * time.Second,
403-
Interval: 0,
404-
WaitForDelivery: true,
405-
359+
Connector: &csql.Connector{
360+
ConnsMaxIdleTime: 10 * time.Minute,
361+
ConnsMaxLifetime: 10 * time.Minute,
362+
ConnsMaxOpen: 2,
363+
ConnsMaxIdle: 1,
364+
QueryTimeout: 30 * time.Second,
365+
TLSClientConfig: &tls.TLSClientConfig{},
366+
},
367+
Transactional: false,
368+
IsolationLevel: "Default",
369+
Interval: 0,
370+
WaitForDelivery: true,
406371
Ider: &ider.Ider{},
407-
TLSClientConfig: &tls.TLSClientConfig{},
408372
}
409373
})
410374
}

plugins/lookups/sql/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ And transforms query result into map of maps:
5454
password = ""
5555

5656
# queries execution timeout
57-
timeout = "30s"
57+
query_timeout = "30s"
5858

5959
# plugin mode, "vertical" or "horizontal"
6060
mode = "vertical"

plugins/lookups/sql/sql.go

Lines changed: 15 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -24,40 +24,16 @@ const (
2424

2525
type Sql struct {
2626
*core.BaseLookup `mapstructure:"-"`
27-
EnableMetrics bool `mapstructure:"enable_metrics"`
28-
Driver string `mapstructure:"driver"`
29-
Dsn string `mapstructure:"dsn"`
30-
Username string `mapstructure:"username"`
31-
Password string `mapstructure:"password"`
32-
ConnsMaxIdleTime time.Duration `mapstructure:"conns_max_idle_time"`
33-
ConnsMaxLifetime time.Duration `mapstructure:"conns_max_life_time"`
34-
ConnsMaxOpen int `mapstructure:"conns_max_open"`
35-
ConnsMaxIdle int `mapstructure:"conns_max_idle"`
36-
Timeout time.Duration `mapstructure:"timeout"`
27+
*csql.Connector `mapstructure:",squash"`
3728
OnUpdate csql.QueryInfo `mapstructure:"on_update"`
38-
39-
Mode string `mapstructure:"mode"`
40-
KeyColumn string `mapstructure:"key_column"`
41-
42-
*tls.TLSClientConfig `mapstructure:",squash"`
29+
Mode string `mapstructure:"mode"`
30+
KeyColumn string `mapstructure:"key_column"`
4331

4432
db *sqlx.DB
4533
}
4634

4735
func (l *Sql) Init() error {
48-
if len(l.Dsn) == 0 {
49-
return errors.New("dsn required")
50-
}
51-
52-
if len(l.Driver) == 0 {
53-
return errors.New("driver required")
54-
}
55-
56-
if len(l.OnUpdate.File) == 0 && len(l.OnUpdate.Query) == 0 {
57-
return errors.New("onUpdate.query or onUpdate.file required")
58-
}
59-
60-
if err := l.OnUpdate.Init(); err != nil {
36+
if err := l.OnUpdate.Init(true); err != nil {
6137
return fmt.Errorf("onUpdate: %w", err)
6238
}
6339

@@ -71,27 +47,12 @@ func (l *Sql) Init() error {
7147
return fmt.Errorf("unknown mode: %v", l.Mode)
7248
}
7349

74-
tlsConfig, err := l.TLSClientConfig.Config()
50+
db, err := l.Connector.Init()
7551
if err != nil {
7652
return err
7753
}
7854

79-
db, err := csql.OpenDB(l.Driver, l.Dsn, l.Username, l.Password, tlsConfig)
80-
if err != nil {
81-
return err
82-
}
83-
84-
db.DB.SetConnMaxIdleTime(l.ConnsMaxIdleTime)
85-
db.DB.SetConnMaxLifetime(l.ConnsMaxLifetime)
86-
db.DB.SetMaxIdleConns(l.ConnsMaxIdle)
87-
db.DB.SetMaxOpenConns(l.ConnsMaxOpen)
88-
89-
if err := db.Ping(); err != nil {
90-
defer db.Close()
91-
return err
92-
}
9355
l.db = db
94-
9556
return nil
9657
}
9758

@@ -100,7 +61,7 @@ func (l *Sql) Close() error {
10061
}
10162

10263
func (l *Sql) Update() (any, error) {
103-
ctx, cancel := context.WithTimeout(context.Background(), l.Timeout)
64+
ctx, cancel := context.WithTimeout(context.Background(), l.QueryTimeout)
10465
defer cancel()
10566

10667
rows, err := l.db.QueryContext(ctx, l.OnUpdate.Query)
@@ -164,13 +125,15 @@ func init() {
164125
plugins.AddLookup("sql", func() core.Lookup {
165126
return &lookup.Lookup{
166127
LazyLookup: &Sql{
167-
ConnsMaxIdleTime: 10 * time.Minute,
168-
ConnsMaxLifetime: 10 * time.Minute,
169-
ConnsMaxOpen: 2,
170-
ConnsMaxIdle: 1,
171-
Timeout: 30 * time.Second,
172-
Mode: ModeVertical,
173-
TLSClientConfig: &tls.TLSClientConfig{},
128+
Connector: &csql.Connector{
129+
ConnsMaxIdleTime: 10 * time.Minute,
130+
ConnsMaxLifetime: 10 * time.Minute,
131+
ConnsMaxOpen: 2,
132+
ConnsMaxIdle: 1,
133+
QueryTimeout: 30 * time.Second,
134+
TLSClientConfig: &tls.TLSClientConfig{},
135+
},
136+
Mode: ModeVertical,
174137
},
175138
Interval: 30 * time.Second,
176139
}

0 commit comments

Comments
 (0)