Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# REQUIRED
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# REQUIRED for all kinds
# Change summary; a 80ish characters long description of the change.
summary: allow host to be a string for otel configuration translation

# REQUIRED for breaking-change, deprecation, known-issue
# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# description:

# REQUIRED for breaking-change, deprecation, known-issue
# impact:

# REQUIRED for breaking-change, deprecation, known-issue
# action:

# REQUIRED for all kinds
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# AUTOMATED
# OPTIONAL to manually add other PR URLs
# PR URL: A link the PR that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/11394

# AUTOMATED
# OPTIONAL to manually add other issue URLs
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/11352

32 changes: 18 additions & 14 deletions internal/pkg/otel/translate/output_elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"net/url"
"reflect"
"slices"
"strings"
"time"

Expand All @@ -27,7 +28,6 @@ import (

type esToOTelOptions struct {
elasticsearch.ElasticsearchConfig `config:",inline"`
outputs.HostWorkerCfg `config:",inline"`

Index string `config:"index"`
Preset string `config:"preset"`
Expand All @@ -38,9 +38,6 @@ var defaultOptions = esToOTelOptions{

Index: "", // Dynamic routing is disabled if index is set
Preset: "custom", // default is custom if not set
HostWorkerCfg: outputs.HostWorkerCfg{
Workers: 1,
},
}

// ToOTelConfig converts a Beat config into OTel elasticsearch exporter config
Expand Down Expand Up @@ -97,13 +94,19 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
}

// Create url using host name, protocol and path
outputHosts, err := outputs.ReadHostList(output)
if err != nil {
return nil, fmt.Errorf("error reading host list: %w", err)
}
hosts := []string{}
for _, h := range escfg.Hosts {
for _, h := range outputHosts {
esURL, err := common.MakeURL(escfg.Protocol, escfg.Path, h, 9200)
if err != nil {
return nil, fmt.Errorf("cannot generate ES URL from host %w", err)
}
hosts = append(hosts, esURL)
if !slices.Contains(hosts, esURL) {
hosts = append(hosts, esURL)
}
}

otelYAMLCfg := map[string]any{
Expand All @@ -114,7 +117,7 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
// where it could spin as many goroutines as it liked.
// Given that batcher implementation can change and it has a history of such changes,
// let's keep max_conns_per_host setting for now and remove it once exporterhelper is stable.
"max_conns_per_host": getTotalNumWorkers(escfg), // num_workers * len(hosts) if loadbalance is true
"max_conns_per_host": getTotalNumWorkers(output), // num_workers * len(hosts) if loadbalance is true

// Retry
"retry": map[string]any{
Expand All @@ -135,7 +138,7 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
"queue_size": getQueueSize(logger, output),
"block_on_overflow": true,
"wait_for_result": true,
"num_consumers": getTotalNumWorkers(escfg), // num_workers * len(hosts) if loadbalance is true
"num_consumers": getTotalNumWorkers(output), // num_workers * len(hosts) if loadbalance is true
},

"mapping": map[string]any{
Expand Down Expand Up @@ -171,13 +174,14 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
return otelYAMLCfg, nil
}

func getTotalNumWorkers(escfg esToOTelOptions) int {
// calculate total workers
totalWorkers := escfg.NumWorkers()
if escfg.LoadBalance && len(escfg.Hosts) > 1 {
totalWorkers = (escfg.NumWorkers() * len(escfg.Hosts))
// getTotalNumWorkers returns the number of hosts that beats would
// have used taking into account hosts, loadbalance and worker
func getTotalNumWorkers(cfg *config.C) int {
hostList, err := outputs.ReadHostList(cfg)
if err != nil {
return 1
}
return totalWorkers
return len(hostList)
}

// log warning for unsupported config
Expand Down
47 changes: 43 additions & 4 deletions internal/pkg/otel/translate/output_elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ compression_params:
require.NoError(t, err, "error translating elasticsearch output to ES exporter config")
expOutput := newFromYamlString(t, OTelCfg)
compareAndAssert(t, expOutput, confmap.NewFromStringMap(got))

})

t.Run("test api key is encoded before mapping to es-exporter", func(t *testing.T) {
Expand Down Expand Up @@ -128,7 +127,50 @@ compression_params:
require.NoError(t, err, "error translating elasticsearch output to ES exporter config ")
expOutput := newFromYamlString(t, OTelCfg)
compareAndAssert(t, expOutput, confmap.NewFromStringMap(got))
})

t.Run("test hosts can be a string", func(t *testing.T) {
beatCfg := `
hosts: "localhost:9200"
index: "some-index"
api_key: "TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA"
`

OTelCfg := `
endpoints:
- http://localhost:9200
logs_index: some-index
logs_dynamic_pipeline:
enabled: true
retry:
enabled: true
initial_interval: 1s
max_interval: 1m0s
max_retries: 3
sending_queue:
batch:
flush_timeout: 10s
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
num_consumers: 1
queue_size: 3200
wait_for_result: true
mapping:
mode: bodymap
max_conns_per_host: 1
api_key: VGlOQUdHNEJhYU1kYUgxdFJmdVU6S25SNnlFNDFSclNvd2Iwa1EwSFdvQQ==
compression: gzip
compression_params:
level: 1
`
cfg := config.MustNewConfigFrom(beatCfg)
got, err := ToOTelConfig(cfg, logger)
require.NoError(t, err, "error translating elasticsearch output to ES exporter config ")
expOutput := newFromYamlString(t, OTelCfg)
compareAndAssert(t, expOutput, confmap.NewFromStringMap(got))
})

// when preset is configured, we only test worker, bulk_max_size
Expand Down Expand Up @@ -281,9 +323,7 @@ sending_queue:
compareAndAssert(t, expOutput, confmap.NewFromStringMap(got))
})
}

})

}

func TestCompressionConfig(t *testing.T) {
Expand Down Expand Up @@ -345,7 +385,6 @@ compression: none
compareAndAssert(t, expOutput, confmap.NewFromStringMap(got))
})
}

}

func newFromYamlString(t *testing.T, input string) *confmap.Conf {
Expand Down