Skip to content

Commit d2907e2

Browse files
authored
Set pipeline name automatically in transform definitions (#2973)
This PR adds support to set automatically the pipeline name used in transforms adding the current package version as a prefix. In order to achieve that elasticsearch/transform/*/transform.yml is managed by elastic-package as a template file (Golang text template). And it is allowed to use a function in the template named "ingestPipelineName" that adds the current version of the package to the pipeline.
1 parent f6debed commit d2907e2

37 files changed

+16757
-10
lines changed

internal/builder/packages.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,11 @@ func BuildPackage(ctx context.Context, options BuildOptions) (string, error) {
218218
logger.Debugf("Linked file included (path: %s)", l.TargetFilePath(destinationDir))
219219
}
220220

221+
err = resolveTransformDefinitions(destinationDir)
222+
if err != nil {
223+
return "", fmt.Errorf("resolving transform manifests failed: %w", err)
224+
}
225+
221226
if options.CreateZip {
222227
return buildZippedPackage(ctx, options, destinationDir)
223228
}

internal/builder/transform.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package builder
6+
7+
import (
8+
"fmt"
9+
"os"
10+
"path/filepath"
11+
12+
"github.com/elastic/elastic-package/internal/packages"
13+
)
14+
15+
// resolveTransformDefinitions processes all transform definition files in the given destination directory.
16+
// It reads each file, applies templating to set the final ingest pipeline name, and writes the processed
17+
// content back to the same file.
18+
func resolveTransformDefinitions(destinationDir string) error {
19+
files, err := filepath.Glob(filepath.Join(destinationDir, "elasticsearch", "transform", "*", "transform.yml"))
20+
if err != nil {
21+
return fmt.Errorf("failed matching files with transform definitions: %w", err)
22+
}
23+
24+
for _, file := range files {
25+
stat, err := os.Stat(file)
26+
if err != nil {
27+
return fmt.Errorf("stat failed for transform definition file %q: %w", file, err)
28+
}
29+
contents, _, err := packages.ReadTransformDefinitionFile(file, destinationDir)
30+
if err != nil {
31+
return fmt.Errorf("failed reading transform definition file %q: %w", file, err)
32+
}
33+
34+
err = os.WriteFile(file, contents, stat.Mode())
35+
if err != nil {
36+
return err
37+
}
38+
}
39+
return nil
40+
}

internal/packages/packages.go

Lines changed: 102 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package packages
66

77
import (
88
"archive/zip"
9+
"bytes"
910
"encoding/json"
1011
"errors"
1112
"fmt"
@@ -14,6 +15,7 @@ import (
1415
"path/filepath"
1516
"slices"
1617
"strings"
18+
"text/template"
1719

1820
yamlv3 "gopkg.in/yaml.v3"
1921

@@ -243,6 +245,9 @@ type TransformDefinition struct {
243245
Source struct {
244246
Index []string `config:"index" yaml:"index"`
245247
} `config:"source" yaml:"source"`
248+
Dest struct {
249+
Pipeline string `config:"pipeline" yaml:"pipeline"`
250+
} `config:"dest" yaml:"dest"`
246251
Meta struct {
247252
FleetTransformVersion string `config:"fleet_transform_version" yaml:"fleet_transform_version"`
248253
} `config:"_meta" yaml:"_meta"`
@@ -414,6 +419,101 @@ func ReadPackageManifest(path string) (*PackageManifest, error) {
414419
return &m, nil
415420
}
416421

422+
// ReadTransformDefinitionFile reads and parses the transform definition (elasticsearch/transform/<name>/transform.yml)
423+
// file for the given transform. It also applies templating to the file, allowing to set the final ingest pipeline name
424+
// by adding the package version defined in the package manifest.
425+
// It fails if the referenced destination pipeline doesn't exist.
426+
func ReadTransformDefinitionFile(transformPath, packageRootPath string) ([]byte, TransformDefinition, error) {
427+
manifest, err := ReadPackageManifestFromPackageRoot(packageRootPath)
428+
if err != nil {
429+
return nil, TransformDefinition{}, fmt.Errorf("could not read package manifest: %w", err)
430+
}
431+
432+
if manifest.Version == "" {
433+
return nil, TransformDefinition{}, fmt.Errorf("package version is not defined in the package manifest")
434+
}
435+
436+
t, err := template.New(filepath.Base(transformPath)).Funcs(template.FuncMap{
437+
"ingestPipelineName": func(pipelineName string) (string, error) {
438+
if pipelineName == "" {
439+
return "", fmt.Errorf("ingest pipeline name is empty")
440+
}
441+
return fmt.Sprintf("%s-%s", manifest.Version, pipelineName), nil
442+
},
443+
}).ParseFiles(transformPath)
444+
if err != nil {
445+
return nil, TransformDefinition{}, fmt.Errorf("parsing transform template failed (path: %s): %w", transformPath, err)
446+
}
447+
448+
var rendered bytes.Buffer
449+
err = t.Execute(&rendered, nil)
450+
if err != nil {
451+
return nil, TransformDefinition{}, fmt.Errorf("executing template failed: %w", err)
452+
}
453+
cfg, err := yaml.NewConfig(rendered.Bytes(), ucfg.PathSep("."))
454+
if err != nil {
455+
return nil, TransformDefinition{}, fmt.Errorf("reading file failed (path: %s): %w", transformPath, err)
456+
}
457+
458+
var definition TransformDefinition
459+
err = cfg.Unpack(&definition)
460+
if err != nil {
461+
return nil, TransformDefinition{}, fmt.Errorf("failed to parse transform file \"%s\": %w", transformPath, err)
462+
}
463+
464+
if definition.Dest.Pipeline == "" {
465+
return rendered.Bytes(), definition, nil
466+
}
467+
468+
// Is it using the Ingest pipeline defined in the package (elasticsearch/ingest_pipeline/<version>-<pipeline>.yml)?
469+
// <version>-<pipeline>.yml
470+
// example: 0.1.0-pipeline_extract_metadata
471+
472+
pipelineFileName := fmt.Sprintf("%s.yml", strings.TrimPrefix(definition.Dest.Pipeline, manifest.Version+"-"))
473+
_, err = os.Stat(filepath.Join(packageRootPath, "elasticsearch", "ingest_pipeline", pipelineFileName))
474+
if err != nil && !errors.Is(err, os.ErrNotExist) {
475+
return nil, TransformDefinition{}, fmt.Errorf("checking for destination ingest pipeline file %s: %w", pipelineFileName, err)
476+
}
477+
if err == nil {
478+
return rendered.Bytes(), definition, nil
479+
}
480+
481+
// Is it using the Ingest pipeline from any data stream (data_stream/*/elasticsearch/pipeline/*.yml)?
482+
// <data_stream>-<version>-<data_stream_pipeline>.yml
483+
// example: metrics-aws_billing.cur-0.1.0-pipeline_extract_metadata
484+
dataStreamPaths, err := filepath.Glob(filepath.Join(packageRootPath, "data_stream", "*"))
485+
if err != nil {
486+
return nil, TransformDefinition{}, fmt.Errorf("error finding data streams: %w", err)
487+
}
488+
489+
for _, dataStreamPath := range dataStreamPaths {
490+
matched, err := filepath.Glob(filepath.Join(dataStreamPath, "elasticsearch", "ingest_pipeline", "*.yml"))
491+
if err != nil {
492+
return nil, TransformDefinition{}, fmt.Errorf("error finding ingest pipelines in data stream %s: %w", dataStreamPath, err)
493+
}
494+
dataStreamName := filepath.Base(dataStreamPath)
495+
for _, pipelinePath := range matched {
496+
dataStreamPipelineName := strings.TrimSuffix(filepath.Base(pipelinePath), filepath.Ext(pipelinePath))
497+
expectedSuffix := fmt.Sprintf("-%s.%s-%s-%s.yml", manifest.Name, dataStreamName, manifest.Version, dataStreamPipelineName)
498+
if strings.HasSuffix(pipelineFileName, expectedSuffix) {
499+
return rendered.Bytes(), definition, nil
500+
}
501+
}
502+
}
503+
pipelinePaths, err := filepath.Glob(filepath.Join(packageRootPath, "data_stream", "*", "elasticsearch", "ingest_pipeline", "*.yml"))
504+
if err != nil {
505+
return nil, TransformDefinition{}, fmt.Errorf("error finding ingest pipelines in data streams: %w", err)
506+
}
507+
for _, pipelinePath := range pipelinePaths {
508+
dataStreamPipelineName := strings.TrimSuffix(filepath.Base(pipelinePath), filepath.Ext(pipelinePath))
509+
if strings.HasSuffix(pipelineFileName, fmt.Sprintf("-%s-%s.yml", manifest.Version, dataStreamPipelineName)) {
510+
return rendered.Bytes(), definition, nil
511+
}
512+
}
513+
514+
return nil, TransformDefinition{}, fmt.Errorf("destination ingest pipeline file %s not found: incorrect version used in pipeline or unknown pipeline", pipelineFileName)
515+
}
516+
417517
// ReadTransformsFromPackageRoot looks for transforms in the given package root.
418518
func ReadTransformsFromPackageRoot(packageRoot string) ([]Transform, error) {
419519
files, err := filepath.Glob(filepath.Join(packageRoot, "elasticsearch", "transform", "*", "transform.yml"))
@@ -423,15 +523,9 @@ func ReadTransformsFromPackageRoot(packageRoot string) ([]Transform, error) {
423523

424524
var transforms []Transform
425525
for _, file := range files {
426-
cfg, err := yaml.NewConfigWithFile(file, ucfg.PathSep("."))
427-
if err != nil {
428-
return nil, fmt.Errorf("reading file failed (path: %s): %w", file, err)
429-
}
430-
431-
var definition TransformDefinition
432-
err = cfg.Unpack(&definition)
526+
_, definition, err := ReadTransformDefinitionFile(file, packageRoot)
433527
if err != nil {
434-
return nil, fmt.Errorf("failed to parse transform file \"%s\": %w", file, err)
528+
return nil, fmt.Errorf("failed reading transform definition file %q: %w", file, err)
435529
}
436530

437531
transforms = append(transforms, Transform{

internal/packages/packages_test.go

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ package packages
66

77
import (
88
"encoding/json"
9+
"os"
10+
"path/filepath"
911
"testing"
1012

1113
"github.com/stretchr/testify/assert"
@@ -94,3 +96,190 @@ func TestDataStreamManifest_IndexTemplateName(t *testing.T) {
9496
})
9597
}
9698
}
99+
100+
func TestReadTransformDefinitionFile(t *testing.T) {
101+
t.Parallel()
102+
103+
cases := map[string]struct {
104+
packageManifest string
105+
transformManifest string
106+
createIngestPipelineFile bool
107+
createIngestPipelineFileDatastream bool
108+
ingestPipelineName string
109+
expectedError bool
110+
expectedErrorMessage string
111+
expectedTransform string
112+
}{
113+
"valid transform manifest with package version": {
114+
packageManifest: `
115+
name: test-package
116+
version: 1.2.3
117+
`,
118+
createIngestPipelineFile: true,
119+
createIngestPipelineFileDatastream: false,
120+
ingestPipelineName: "my-pipeline",
121+
transformManifest: `
122+
source:
123+
index: "logs-package.dataset"
124+
dest:
125+
index: "logs-package_latest-index-1"
126+
pipeline: "{{ ingestPipelineName "my-pipeline" }}"
127+
latest:
128+
unique_key:
129+
- event.dataset
130+
`,
131+
expectedTransform: `
132+
source:
133+
index: "logs-package.dataset"
134+
dest:
135+
index: "logs-package_latest-index-1"
136+
pipeline: "1.2.3-my-pipeline"
137+
latest:
138+
unique_key:
139+
- event.dataset
140+
`,
141+
expectedError: false,
142+
},
143+
"invalid transform manifest without package version": {
144+
packageManifest: `
145+
name: test-package
146+
`,
147+
createIngestPipelineFile: false,
148+
createIngestPipelineFileDatastream: false,
149+
ingestPipelineName: "my-pipeline",
150+
transformManifest: `
151+
source:
152+
index: "logs-package.dataset"
153+
dest:
154+
index: "logs-package_latest-index-1"
155+
pipeline: "{{ ingestPipelineName "my-pipeline" }}"
156+
latest:
157+
unique_key:
158+
- event.dataset
159+
`,
160+
expectedError: true,
161+
expectedErrorMessage: "package version is not defined in the package manifest",
162+
},
163+
"ingest_pipeline not exists": {
164+
packageManifest: `
165+
name: test-package
166+
version: 1.2.3
167+
`,
168+
createIngestPipelineFile: false,
169+
createIngestPipelineFileDatastream: false,
170+
ingestPipelineName: "my-pipeline",
171+
transformManifest: `
172+
source:
173+
index: "logs-package.dataset"
174+
dest:
175+
index: "logs-package_latest-index-1"
176+
pipeline: "{{ ingestPipelineName "my-pipeline" }}"
177+
latest:
178+
unique_key:
179+
- event.dataset
180+
`,
181+
expectedError: true,
182+
expectedErrorMessage: "destination ingest pipeline file my-pipeline.yml not found: incorrect version used in pipeline or unknown pipeline",
183+
},
184+
"ingest_pipeline name empty": {
185+
packageManifest: `
186+
name: test-package
187+
version: 1.2.3
188+
`,
189+
createIngestPipelineFile: false,
190+
createIngestPipelineFileDatastream: false,
191+
ingestPipelineName: "my-pipeline",
192+
transformManifest: `
193+
source:
194+
index: "logs-package.dataset"
195+
dest:
196+
index: "logs-package_latest-index-1"
197+
pipeline: "{{ ingestPipelineName "" }}"
198+
latest:
199+
unique_key:
200+
- event.dataset
201+
`,
202+
expectedError: true,
203+
expectedErrorMessage: "error calling ingestPipelineName: ingest pipeline name is empty",
204+
},
205+
"ingest_pipeline exists on data stream": {
206+
packageManifest: `
207+
name: test-package
208+
version: 1.2.3
209+
`,
210+
createIngestPipelineFile: false,
211+
createIngestPipelineFileDatastream: true,
212+
ingestPipelineName: "my-pipeline",
213+
transformManifest: `
214+
source:
215+
index: "logs-package.dataset"
216+
dest:
217+
index: "logs-package_latest-index-1"
218+
pipeline: "logs-test_package.test-{{ ingestPipelineName "my-pipeline" }}"
219+
latest:
220+
unique_key:
221+
- event.dataset
222+
`,
223+
expectedError: false,
224+
expectedTransform: `
225+
source:
226+
index: "logs-package.dataset"
227+
dest:
228+
index: "logs-package_latest-index-1"
229+
pipeline: "logs-test_package.test-1.2.3-my-pipeline"
230+
latest:
231+
unique_key:
232+
- event.dataset
233+
`,
234+
},
235+
}
236+
237+
for name, tc := range cases {
238+
t.Run(name, func(t *testing.T) {
239+
// Setup temporary directory for the package
240+
packageDir := t.TempDir()
241+
packageManifestPath := filepath.Join(packageDir, "manifest.yml")
242+
err := os.WriteFile(packageManifestPath, []byte(tc.packageManifest), 0644)
243+
require.NoError(t, err)
244+
245+
// Optionally create an ingest pipeline file
246+
if tc.createIngestPipelineFile {
247+
ingestPipelineDir := filepath.Join(packageDir, "elasticsearch", "ingest_pipeline")
248+
err = os.MkdirAll(ingestPipelineDir, 0755)
249+
require.NoError(t, err)
250+
ingestPipelinePath := filepath.Join(ingestPipelineDir, tc.ingestPipelineName+".yml")
251+
err = os.WriteFile(ingestPipelinePath, []byte(`---\nprocessors: {}\n`), 0644)
252+
require.NoError(t, err)
253+
}
254+
255+
if tc.createIngestPipelineFileDatastream {
256+
ingestPipelineDir := filepath.Join(packageDir, "data_stream", "test", "elasticsearch", "ingest_pipeline")
257+
err = os.MkdirAll(ingestPipelineDir, 0755)
258+
require.NoError(t, err)
259+
ingestPipelinePath := filepath.Join(ingestPipelineDir, tc.ingestPipelineName+".yml")
260+
err = os.WriteFile(ingestPipelinePath, []byte(`---\nprocessors: {}\n`), 0644)
261+
require.NoError(t, err)
262+
}
263+
264+
// Setup temporary directory for the transform
265+
transformDir := filepath.Join(packageDir, "elasticsearch", "transform", "latest")
266+
err = os.MkdirAll(transformDir, 0755)
267+
require.NoError(t, err)
268+
transformManifestPath := filepath.Join(transformDir, "transform.yml")
269+
err = os.WriteFile(transformManifestPath, []byte(tc.transformManifest), 0644)
270+
require.NoError(t, err)
271+
272+
// Call the function under test
273+
contents, _, err := ReadTransformDefinitionFile(transformManifestPath, packageDir)
274+
if tc.expectedError {
275+
require.Error(t, err)
276+
assert.ErrorContains(t, err, tc.expectedErrorMessage)
277+
} else {
278+
require.NoError(t, err)
279+
require.NotEmpty(t, contents)
280+
281+
assert.Equal(t, tc.expectedTransform, string(contents))
282+
}
283+
})
284+
}
285+
}

0 commit comments

Comments
 (0)