Skip to content

Commit 8023a89

Browse files
[otel config translate] allow hosts to be string not just slice (#11394) (#11430)
* [otel config translate] allow hosts to be string not just slice * update changelog to bugfix (cherry picked from commit 70ef801) Co-authored-by: Lee E Hinman <[email protected]>
1 parent 55c694b commit 8023a89

File tree

3 files changed

+107
-18
lines changed

3 files changed

+107
-18
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# REQUIRED
2+
# Kind can be one of:
3+
# - breaking-change: a change to previously-documented behavior
4+
# - deprecation: functionality that is being removed in a later release
5+
# - bug-fix: fixes a problem in a previous version
6+
# - enhancement: extends functionality but does not break or fix existing behavior
7+
# - feature: new functionality
8+
# - known-issue: problems that we are aware of in a given version
9+
# - security: impacts on the security of a product or a user’s deployment.
10+
# - upgrade: important information for someone upgrading from a prior version
11+
# - other: does not fit into any of the other categories
12+
kind: bug-fix
13+
14+
# REQUIRED for all kinds
15+
# Change summary; a 80ish characters long description of the change.
16+
summary: allow host to be a string for otel configuration translation
17+
18+
# REQUIRED for breaking-change, deprecation, known-issue
19+
# Long description; in case the summary is not enough to describe the change
20+
# this field accommodate a description without length limits.
21+
# description:
22+
23+
# REQUIRED for breaking-change, deprecation, known-issue
24+
# impact:
25+
26+
# REQUIRED for breaking-change, deprecation, known-issue
27+
# action:
28+
29+
# REQUIRED for all kinds
30+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
31+
component: elastic-agent
32+
33+
# AUTOMATED
34+
# OPTIONAL to manually add other PR URLs
35+
# PR URL: A link the PR that added the changeset.
36+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
37+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
38+
# Please provide it if you are adding a fragment for a different PR.
39+
pr: https://github.com/elastic/elastic-agent/pull/11394
40+
41+
# AUTOMATED
42+
# OPTIONAL to manually add other issue URLs
43+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
44+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
45+
issue: https://github.com/elastic/elastic-agent/issues/11352
46+

internal/pkg/otel/translate/output_elasticsearch.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"net/url"
1212
"reflect"
13+
"slices"
1314
"strings"
1415
"time"
1516

@@ -28,7 +29,6 @@ import (
2829

2930
type esToOTelOptions struct {
3031
elasticsearch.ElasticsearchConfig `config:",inline"`
31-
outputs.HostWorkerCfg `config:",inline"`
3232

3333
Index string `config:"index"`
3434
Preset string `config:"preset"`
@@ -39,9 +39,6 @@ var defaultOptions = esToOTelOptions{
3939

4040
Index: "", // Dynamic routing is disabled if index is set
4141
Preset: "custom", // default is custom if not set
42-
HostWorkerCfg: outputs.HostWorkerCfg{
43-
Workers: 1,
44-
},
4542
}
4643

4744
// ToOTelConfig converts a Beat config into OTel elasticsearch exporter config
@@ -98,13 +95,19 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
9895
}
9996

10097
// Create url using host name, protocol and path
98+
outputHosts, err := outputs.ReadHostList(output)
99+
if err != nil {
100+
return nil, fmt.Errorf("error reading host list: %w", err)
101+
}
101102
hosts := []string{}
102-
for _, h := range escfg.Hosts {
103+
for _, h := range outputHosts {
103104
esURL, err := common.MakeURL(escfg.Protocol, escfg.Path, h, 9200)
104105
if err != nil {
105106
return nil, fmt.Errorf("cannot generate ES URL from host %w", err)
106107
}
107-
hosts = append(hosts, esURL)
108+
if !slices.Contains(hosts, esURL) {
109+
hosts = append(hosts, esURL)
110+
}
108111
}
109112

110113
otelYAMLCfg := map[string]any{
@@ -115,7 +118,7 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
115118
// where it could spin as many goroutines as it liked.
116119
// Given that batcher implementation can change and it has a history of such changes,
117120
// let's keep max_conns_per_host setting for now and remove it once exporterhelper is stable.
118-
"max_conns_per_host": getTotalNumWorkers(escfg), // num_workers * len(hosts) if loadbalance is true
121+
"max_conns_per_host": getTotalNumWorkers(output), // num_workers * len(hosts) if loadbalance is true
119122

120123
// Retry
121124
"retry": map[string]any{
@@ -136,7 +139,7 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
136139
"queue_size": getQueueSize(logger, output),
137140
"block_on_overflow": true,
138141
"wait_for_result": true,
139-
"num_consumers": getTotalNumWorkers(escfg), // num_workers * len(hosts) if loadbalance is true
142+
"num_consumers": getTotalNumWorkers(output), // num_workers * len(hosts) if loadbalance is true
140143
},
141144

142145
"mapping": map[string]any{
@@ -172,13 +175,14 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
172175
return otelYAMLCfg, nil
173176
}
174177

175-
func getTotalNumWorkers(escfg esToOTelOptions) int {
176-
// calculate total workers
177-
totalWorkers := escfg.NumWorkers()
178-
if escfg.LoadBalance && len(escfg.Hosts) > 1 {
179-
totalWorkers = (escfg.NumWorkers() * len(escfg.Hosts))
178+
// getTotalNumWorkers returns the number of hosts that beats would
179+
// have used taking into account hosts, loadbalance and worker
180+
func getTotalNumWorkers(cfg *config.C) int {
181+
hostList, err := outputs.ReadHostList(cfg)
182+
if err != nil {
183+
return 1
180184
}
181-
return totalWorkers
185+
return len(hostList)
182186
}
183187

184188
// log warning for unsupported config

internal/pkg/otel/translate/output_elasticsearch_test.go

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ compression_params:
8282
require.NoError(t, err, "error translating elasticsearch output to ES exporter config")
8383
expOutput := newFromYamlString(t, OTelCfg)
8484
compareAndAssert(t, expOutput, confmap.NewFromStringMap(got))
85-
8685
})
8786

8887
t.Run("test api key is encoded before mapping to es-exporter", func(t *testing.T) {
@@ -128,7 +127,50 @@ compression_params:
128127
require.NoError(t, err, "error translating elasticsearch output to ES exporter config ")
129128
expOutput := newFromYamlString(t, OTelCfg)
130129
compareAndAssert(t, expOutput, confmap.NewFromStringMap(got))
130+
})
131+
132+
t.Run("test hosts can be a string", func(t *testing.T) {
133+
beatCfg := `
134+
hosts: "localhost:9200"
135+
index: "some-index"
136+
api_key: "TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA"
137+
`
131138

139+
OTelCfg := `
140+
endpoints:
141+
- http://localhost:9200
142+
logs_index: some-index
143+
logs_dynamic_pipeline:
144+
enabled: true
145+
retry:
146+
enabled: true
147+
initial_interval: 1s
148+
max_interval: 1m0s
149+
max_retries: 3
150+
sending_queue:
151+
batch:
152+
flush_timeout: 10s
153+
max_size: 1600
154+
min_size: 0
155+
sizer: items
156+
block_on_overflow: true
157+
enabled: true
158+
num_consumers: 1
159+
queue_size: 3200
160+
wait_for_result: true
161+
mapping:
162+
mode: bodymap
163+
max_conns_per_host: 1
164+
api_key: VGlOQUdHNEJhYU1kYUgxdFJmdVU6S25SNnlFNDFSclNvd2Iwa1EwSFdvQQ==
165+
compression: gzip
166+
compression_params:
167+
level: 1
168+
`
169+
cfg := config.MustNewConfigFrom(beatCfg)
170+
got, err := ToOTelConfig(cfg, logger)
171+
require.NoError(t, err, "error translating elasticsearch output to ES exporter config ")
172+
expOutput := newFromYamlString(t, OTelCfg)
173+
compareAndAssert(t, expOutput, confmap.NewFromStringMap(got))
132174
})
133175

134176
// when preset is configured, we only test worker, bulk_max_size
@@ -281,9 +323,7 @@ sending_queue:
281323
compareAndAssert(t, expOutput, confmap.NewFromStringMap(got))
282324
})
283325
}
284-
285326
})
286-
287327
}
288328

289329
func TestCompressionConfig(t *testing.T) {
@@ -345,7 +385,6 @@ compression: none
345385
compareAndAssert(t, expOutput, confmap.NewFromStringMap(got))
346386
})
347387
}
348-
349388
}
350389

351390
func newFromYamlString(t *testing.T, input string) *confmap.Conf {

0 commit comments

Comments
 (0)