diff --git a/input/elasticapm/internal/decoder/stream_decoder.go b/input/elasticapm/internal/decoder/stream_decoder.go index ddcf460a..0c47291f 100644 --- a/input/elasticapm/internal/decoder/stream_decoder.go +++ b/input/elasticapm/internal/decoder/stream_decoder.go @@ -90,6 +90,8 @@ func (dec *NDJSONStreamDecoder) resetLatestLineReader() { func (dec *NDJSONStreamDecoder) IsEOF() bool { return dec.isEOF } // LatestLine returns the latest line read as []byte +// +// Deprecated func (dec *NDJSONStreamDecoder) LatestLine() []byte { return dec.latestLine } // JSONDecodeError is a custom error that can occur during JSON decoding diff --git a/input/elasticapm/processor.go b/input/elasticapm/processor.go index 59d903b5..eac6744b 100644 --- a/input/elasticapm/processor.go +++ b/input/elasticapm/processor.go @@ -119,8 +119,7 @@ func (p *Processor) readMetadata(reader *streamReader, out *modelpb.APMEvent) er return errEmptyBody } return &InvalidInputError{ - Message: "EOF while reading metadata", - Document: string(reader.LatestLine()), + Message: "EOF while reading metadata", } } return reader.wrapError(err) @@ -136,8 +135,7 @@ func (p *Processor) readMetadata(reader *streamReader, out *modelpb.APMEvent) er } default: return &InvalidInputError{ - Message: fmt.Sprintf("%q or %q required", v2MetadataKey, rumv3MetadataKey), - Document: string(reader.LatestLine()), + Message: fmt.Sprintf("%q or %q required", v2MetadataKey, rumv3MetadataKey), } } return nil @@ -223,8 +221,7 @@ func (p *Processor) readBatch( } if err != nil && err != io.EOF { result.addError(&InvalidInputError{ - Message: err.Error(), - Document: string(reader.LatestLine()), + Message: err.Error(), }) } } @@ -274,8 +271,7 @@ func (p *Processor) HandleStream( return fmt.Errorf("cannot read metadata in stream: %w", err) } return &InvalidInputError{ - Message: err.Error(), - Document: string(sr.LatestLine()), + Message: err.Error(), } } @@ -351,8 +347,7 @@ type streamReader struct { func (sr *streamReader) wrapError(err error) error { if _, ok := err.(decoder.JSONDecodeError); ok { return &InvalidInputError{ - Message: err.Error(), - Document: string(sr.LatestLine()), + Message: err.Error(), } } @@ -364,7 +359,6 @@ func (sr *streamReader) wrapError(err error) error { return &InvalidInputError{ TooLarge: true, Message: "event exceeded the permitted size", - Document: string(sr.LatestLine()), } } return err diff --git a/input/elasticapm/processor_test.go b/input/elasticapm/processor_test.go index fd343e53..ce7b0cb7 100644 --- a/input/elasticapm/processor_test.go +++ b/input/elasticapm/processor_test.go @@ -135,8 +135,7 @@ func TestHandleStreamErrors(t *testing.T) { invalid: 1, errors: []error{ &InvalidInputError{ - Message: `decode error: data read error: v2.transactionRoot.Transaction: v2.transaction.ID: ReadString: expects " or n,`, - Document: invalidEvent, + Message: `decode error: data read error: v2.transactionRoot.Transaction: v2.transaction.ID: ReadString: expects " or n,`, }, }, }, { @@ -145,30 +144,26 @@ func TestHandleStreamErrors(t *testing.T) { invalid: 1, errors: []error{ &InvalidInputError{ - Message: `did not recognize object type: "invalid-json"`, - Document: invalidJSONEvent, + Message: `did not recognize object type: "invalid-json"`, }, }, }, { name: "InvalidJSONMetadata", payload: invalidJSONMetadata + "\n", err: &InvalidInputError{ - Message: "decode error: data read error: v2.metadataRoot.Metadata: v2.metadata.readFieldHash: expect :,", - Document: invalidJSONMetadata, + Message: "decode error: data read error: v2.metadataRoot.Metadata: v2.metadata.readFieldHash: expect :,", }, }, { name: "InvalidMetadata", payload: invalidMetadata + "\n", err: &InvalidInputError{ - Message: "validation error: 'metadata' required", - Document: invalidMetadata, + Message: "validation error: 'metadata' required", }, }, { name: "InvalidMetadata2", payload: invalidMetadata2 + "\n", err: fmt.Errorf("cannot read metadata in stream: %w", &InvalidInputError{ - Message: `"metadata" or "m" required`, - Document: invalidMetadata2, + Message: `"metadata" or "m" required`, }), }, { name: "UnrecognizedEvent", @@ -176,8 +171,7 @@ func TestHandleStreamErrors(t *testing.T) { invalid: 1, errors: []error{ &InvalidInputError{ - Message: `did not recognize object type: "tennis-court"`, - Document: invalidEventType, + Message: `did not recognize object type: "tennis-court"`, }, }, }, { @@ -190,7 +184,6 @@ func TestHandleStreamErrors(t *testing.T) { &InvalidInputError{ TooLarge: true, Message: "event exceeded the permitted size", - Document: tooLargeEvent[:len(validMetadata)+1], }, }, }} { diff --git a/input/elasticapm/result.go b/input/elasticapm/result.go index 191d2d2d..e982a7f1 100644 --- a/input/elasticapm/result.go +++ b/input/elasticapm/result.go @@ -72,7 +72,8 @@ func (r *Result) addError(err error) { } type InvalidInputError struct { - Message string + Message string + // Deprecated Document string TooLarge bool }