Skip to content
95 changes: 88 additions & 7 deletions internal/elasticsearch/ingest/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,63 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"

"gopkg.in/yaml.v3"

"github.com/elastic/elastic-package/internal/elasticsearch"
"github.com/elastic/elastic-package/internal/logger"
)

type simulatePipelineRequest struct {
Docs []pipelineDocument `json:"docs"`
}

type simulatePipelineResponse struct {
Docs []pipelineIngestedDocument `json:"docs"`
Docs []struct {
ProcessorResults []verboseProcessorResult `json:"processor_results"`
} `json:"docs"`
}

type verboseProcessorResult struct {
Processor string `json:"processor_type"`
Status string `json:"status"`
Doc pipelineDocument `json:"doc"`
Error verboseProcessorError `json:"error"`
Ignored struct {
Error verboseProcessorError `json:"error"`
} `json:"ignored_error"`
}

type verboseProcessorError struct {
Type string `json:"type"`
Reason string `json:"reason"`
RootCause json.RawMessage `json:"root_cause"`
}

func (e verboseProcessorError) Error() string {
return fmt.Sprintf("[%s] %s", e.Type, e.Reason)
}

type pipelineDocument struct {
Index string `json:"_index"`
Source json.RawMessage `json:"_source"`
Index string `json:"_index"`
Source json.RawMessage `json:"_source"`
Ingest verboseProcessorIngest `json:"_ingest"`
}

type pipelineIngestedDocument struct {
Doc pipelineDocument `json:"doc"`
type verboseProcessorIngest struct {
Pipeline string `json:"pipeline"`
Timestamp time.Time `json:"timestamp"`

OnFailurePipeline string `json:"on_failure_pipeline"`
OnFailureMessage string `json:"on_failure_message"`
OnFailureProcessorTag string `json:"on_failure_processor_tag"`
OnFailureProcessorType string `json:"on_failure_processor_type"`
}

// Pipeline represents a pipeline resource loaded from a file
Expand Down Expand Up @@ -90,6 +122,7 @@ func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName
r, err := api.Ingest.Simulate(bytes.NewReader(requestBody),
api.Ingest.Simulate.WithContext(ctx),
api.Ingest.Simulate.WithPipelineID(pipelineName),
api.Ingest.Simulate.WithVerbose(true),
)
if err != nil {
return nil, fmt.Errorf("simulate API call failed (pipelineName: %s): %w", pipelineName, err)
Expand All @@ -111,11 +144,59 @@ func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName
return nil, fmt.Errorf("unmarshalling simulate request failed: %w", err)
}

handleErrors := func(ingest verboseProcessorIngest, errs []error) []error {
var filtered []error
for _, err := range errs {
var processorError verboseProcessorError
if errors.As(err, &processorError) && processorError.Reason == ingest.OnFailureMessage {
continue
}
filtered = append(filtered, err)
}
return filtered
}

processedEvents := make([]json.RawMessage, len(response.Docs))
var errs []error
for i, doc := range response.Docs {
processedEvents[i] = doc.Doc.Source
var source json.RawMessage
failed := false
for _, result := range doc.ProcessorResults {
if result.Doc.Ingest.OnFailureMessage != "" {
// This processor is in an on_failure handler, filter out the handled errors
// and assume that processing is going on.
errs = handleErrors(result.Doc.Ingest, errs)
failed = false
}

switch result.Status {
case "success":
// Keep last successful document.
source = result.Doc.Source
case "dropped":
source = nil
case "skipped":
continue
case "error_ignored":
logger.Debugf("error ignored for processor %s: [%s] %s", result.Processor, result.Ignored.Error.Type, result.Ignored.Error.Reason)
continue
case "error":
failed = true
errs = append(errs, fmt.Errorf("error in processor %s: %w", result.Processor, result.Error))
case "failed":
failed = true
errs = append(errs, fmt.Errorf("%q processor failed", result.Processor))
default:
errs = append(errs, fmt.Errorf("unexpected result status %q for processor %q", result.Status, result.Processor))
}
}

if !failed {
processedEvents[i] = source
}
}
return processedEvents, nil

return processedEvents, errors.Join(errs...)
}

func UninstallPipelines(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error {
Expand Down