Skip to content

Commit df5d2ac

Browse files
committed
[otel config translate] allow hosts to be string not just slice
1 parent 7ffacf3 commit df5d2ac

File tree

3 files changed

+107
-18
lines changed

3 files changed

+107
-18
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# REQUIRED
2+
# Kind can be one of:
3+
# - breaking-change: a change to previously-documented behavior
4+
# - deprecation: functionality that is being removed in a later release
5+
# - bug-fix: fixes a problem in a previous version
6+
# - enhancement: extends functionality but does not break or fix existing behavior
7+
# - feature: new functionality
8+
# - known-issue: problems that we are aware of in a given version
9+
# - security: impacts on the security of a product or a user’s deployment.
10+
# - upgrade: important information for someone upgrading from a prior version
11+
# - other: does not fit into any of the other categories
12+
kind: enhancement
13+
14+
# REQUIRED for all kinds
15+
# Change summary; a 80ish characters long description of the change.
16+
summary: allow host to be a string for otel configuration translation
17+
18+
# REQUIRED for breaking-change, deprecation, known-issue
19+
# Long description; in case the summary is not enough to describe the change
20+
# this field accommodate a description without length limits.
21+
# description:
22+
23+
# REQUIRED for breaking-change, deprecation, known-issue
24+
# impact:
25+
26+
# REQUIRED for breaking-change, deprecation, known-issue
27+
# action:
28+
29+
# REQUIRED for all kinds
30+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
31+
component: elastic-agent
32+
33+
# AUTOMATED
34+
# OPTIONAL to manually add other PR URLs
35+
# PR URL: A link the PR that added the changeset.
36+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
37+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
38+
# Please provide it if you are adding a fragment for a different PR.
39+
pr: https://github.com/elastic/elastic-agent/pull/11394
40+
41+
# AUTOMATED
42+
# OPTIONAL to manually add other issue URLs
43+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
44+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
45+
issue: https://github.com/elastic/elastic-agent/issues/11352
46+

internal/pkg/otel/translate/output_elasticsearch.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"net/url"
1212
"reflect"
13+
"slices"
1314
"strings"
1415
"time"
1516

@@ -27,7 +28,6 @@ import (
2728

2829
type esToOTelOptions struct {
2930
elasticsearch.ElasticsearchConfig `config:",inline"`
30-
outputs.HostWorkerCfg `config:",inline"`
3131

3232
Index string `config:"index"`
3333
Preset string `config:"preset"`
@@ -38,9 +38,6 @@ var defaultOptions = esToOTelOptions{
3838

3939
Index: "", // Dynamic routing is disabled if index is set
4040
Preset: "custom", // default is custom if not set
41-
HostWorkerCfg: outputs.HostWorkerCfg{
42-
Workers: 1,
43-
},
4441
}
4542

4643
// ToOTelConfig converts a Beat config into OTel elasticsearch exporter config
@@ -97,13 +94,19 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
9794
}
9895

9996
// Create url using host name, protocol and path
97+
outputHosts, err := outputs.ReadHostList(output)
98+
if err != nil {
99+
return nil, fmt.Errorf("error reading host list: %w", err)
100+
}
100101
hosts := []string{}
101-
for _, h := range escfg.Hosts {
102+
for _, h := range outputHosts {
102103
esURL, err := common.MakeURL(escfg.Protocol, escfg.Path, h, 9200)
103104
if err != nil {
104105
return nil, fmt.Errorf("cannot generate ES URL from host %w", err)
105106
}
106-
hosts = append(hosts, esURL)
107+
if !slices.Contains(hosts, esURL) {
108+
hosts = append(hosts, esURL)
109+
}
107110
}
108111

109112
otelYAMLCfg := map[string]any{
@@ -114,7 +117,7 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
114117
// where it could spin as many goroutines as it liked.
115118
// Given that batcher implementation can change and it has a history of such changes,
116119
// let's keep max_conns_per_host setting for now and remove it once exporterhelper is stable.
117-
"max_conns_per_host": getTotalNumWorkers(escfg), // num_workers * len(hosts) if loadbalance is true
120+
"max_conns_per_host": getTotalNumWorkers(output), // num_workers * len(hosts) if loadbalance is true
118121

119122
// Retry
120123
"retry": map[string]any{
@@ -135,7 +138,7 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
135138
"queue_size": getQueueSize(logger, output),
136139
"block_on_overflow": true,
137140
"wait_for_result": true,
138-
"num_consumers": getTotalNumWorkers(escfg), // num_workers * len(hosts) if loadbalance is true
141+
"num_consumers": getTotalNumWorkers(output), // num_workers * len(hosts) if loadbalance is true
139142
},
140143

141144
"mapping": map[string]any{
@@ -171,13 +174,14 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
171174
return otelYAMLCfg, nil
172175
}
173176

174-
func getTotalNumWorkers(escfg esToOTelOptions) int {
175-
// calculate total workers
176-
totalWorkers := escfg.NumWorkers()
177-
if escfg.LoadBalance && len(escfg.Hosts) > 1 {
178-
totalWorkers = (escfg.NumWorkers() * len(escfg.Hosts))
177+
// getTotalNumWorkers returns the number of hosts that beats would
178+
// have used taking into account hosts, loadbalance and worker
179+
func getTotalNumWorkers(cfg *config.C) int {
180+
hostList, err := outputs.ReadHostList(cfg)
181+
if err != nil {
182+
return 1
179183
}
180-
return totalWorkers
184+
return len(hostList)
181185
}
182186

183187
// log warning for unsupported config

internal/pkg/otel/translate/output_elasticsearch_test.go

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ compression_params:
8282
require.NoError(t, err, "error translating elasticsearch output to ES exporter config")
8383
expOutput := newFromYamlString(t, OTelCfg)
8484
compareAndAssert(t, expOutput, confmap.NewFromStringMap(got))
85-
8685
})
8786

8887
t.Run("test api key is encoded before mapping to es-exporter", func(t *testing.T) {
@@ -128,7 +127,50 @@ compression_params:
128127
require.NoError(t, err, "error translating elasticsearch output to ES exporter config ")
129128
expOutput := newFromYamlString(t, OTelCfg)
130129
compareAndAssert(t, expOutput, confmap.NewFromStringMap(got))
130+
})
131+
132+
t.Run("test hosts can be a string", func(t *testing.T) {
133+
beatCfg := `
134+
hosts: "localhost:9200"
135+
index: "some-index"
136+
api_key: "TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA"
137+
`
131138

139+
OTelCfg := `
140+
endpoints:
141+
- http://localhost:9200
142+
logs_index: some-index
143+
logs_dynamic_pipeline:
144+
enabled: true
145+
retry:
146+
enabled: true
147+
initial_interval: 1s
148+
max_interval: 1m0s
149+
max_retries: 3
150+
sending_queue:
151+
batch:
152+
flush_timeout: 10s
153+
max_size: 1600
154+
min_size: 0
155+
sizer: items
156+
block_on_overflow: true
157+
enabled: true
158+
num_consumers: 1
159+
queue_size: 3200
160+
wait_for_result: true
161+
mapping:
162+
mode: bodymap
163+
max_conns_per_host: 1
164+
api_key: VGlOQUdHNEJhYU1kYUgxdFJmdVU6S25SNnlFNDFSclNvd2Iwa1EwSFdvQQ==
165+
compression: gzip
166+
compression_params:
167+
level: 1
168+
`
169+
cfg := config.MustNewConfigFrom(beatCfg)
170+
got, err := ToOTelConfig(cfg, logger)
171+
require.NoError(t, err, "error translating elasticsearch output to ES exporter config ")
172+
expOutput := newFromYamlString(t, OTelCfg)
173+
compareAndAssert(t, expOutput, confmap.NewFromStringMap(got))
132174
})
133175

134176
// when preset is configured, we only test worker, bulk_max_size
@@ -281,9 +323,7 @@ sending_queue:
281323
compareAndAssert(t, expOutput, confmap.NewFromStringMap(got))
282324
})
283325
}
284-
285326
})
286-
287327
}
288328

289329
func TestCompressionConfig(t *testing.T) {
@@ -345,7 +385,6 @@ compression: none
345385
compareAndAssert(t, expOutput, confmap.NewFromStringMap(got))
346386
})
347387
}
348-
349388
}
350389

351390
func newFromYamlString(t *testing.T, input string) *confmap.Conf {

0 commit comments

Comments
 (0)