-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathextension.go
131 lines (113 loc) · 3.15 KB
/
extension.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package redisstorageextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/redisstorageextension"
import (
"context"
"errors"
"fmt"
"time"
"github.com/redis/go-redis/v9"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/xextension/storage"
"go.uber.org/zap"
)
type redisStorage struct {
cfg *Config
logger *zap.Logger
client *redis.Client
}
// Ensure this storage extension implements the appropriate interface
var _ storage.Extension = (*redisStorage)(nil)
func newRedisStorage(logger *zap.Logger, config *Config) (extension.Extension, error) {
return &redisStorage{
cfg: config,
logger: logger,
}, nil
}
// Start runs cleanup if configured
func (rs *redisStorage) Start(context.Context, component.Host) error {
c := redis.NewClient(&redis.Options{
Addr: rs.cfg.Endpoint,
Password: string(rs.cfg.Password),
DB: rs.cfg.DB,
})
rs.client = c
return nil
}
// Shutdown will close any open databases
func (rs *redisStorage) Shutdown(context.Context) error {
if rs.client == nil {
return nil
}
return rs.client.Close()
}
type redisClient struct {
client *redis.Client
prefix string
expiration time.Duration
}
var _ storage.Client = redisClient{}
func (rc redisClient) Get(ctx context.Context, key string) ([]byte, error) {
b, err := rc.client.Get(ctx, rc.prefix+key).Bytes()
if errors.Is(err, redis.Nil) {
return nil, nil
}
return b, err
}
func (rc redisClient) Set(ctx context.Context, key string, value []byte) error {
_, err := rc.client.Set(ctx, rc.prefix+key, value, rc.expiration).Result()
return err
}
func (rc redisClient) Delete(ctx context.Context, key string) error {
_, err := rc.client.Del(ctx, rc.prefix+key).Result()
return err
}
func (rc redisClient) Batch(ctx context.Context, ops ...*storage.Operation) error {
p := rc.client.Pipeline()
for _, op := range ops {
switch op.Type {
case storage.Delete:
p.Del(ctx, op.Key)
case storage.Get:
p.Get(ctx, op.Key)
case storage.Set:
p.Set(ctx, op.Key, op.Value, rc.expiration)
}
}
_, err := p.Exec(ctx)
return err
}
func (rc redisClient) Close(_ context.Context) error {
return nil
}
// GetClient returns a storage client for an individual component
func (rs *redisStorage) GetClient(_ context.Context, kind component.Kind, ent component.ID, name string) (storage.Client, error) {
var rawName string
if name == "" {
rawName = fmt.Sprintf("%s_%s_%s", kindString(kind), ent.Type(), ent.Name())
} else {
rawName = fmt.Sprintf("%s_%s_%s_%s", kindString(kind), ent.Type(), ent.Name(), name)
}
return redisClient{
client: rs.client,
prefix: rawName,
expiration: rs.cfg.Expiration,
}, nil
}
func kindString(k component.Kind) string {
switch k {
case component.KindReceiver:
return "receiver"
case component.KindProcessor:
return "processor"
case component.KindExporter:
return "exporter"
case component.KindExtension:
return "extension"
case component.KindConnector:
return "connector"
default:
return "other" // not expected
}
}