Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions input/elasticapm/internal/decoder/stream_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func (dec *NDJSONStreamDecoder) resetLatestLineReader() {
func (dec *NDJSONStreamDecoder) IsEOF() bool { return dec.isEOF }

// LatestLine returns the latest line read as []byte
//
// Deprecated
func (dec *NDJSONStreamDecoder) LatestLine() []byte { return dec.latestLine }

// JSONDecodeError is a custom error that can occur during JSON decoding
Expand Down
16 changes: 5 additions & 11 deletions input/elasticapm/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ func (p *Processor) readMetadata(reader *streamReader, out *modelpb.APMEvent) er
return errEmptyBody
}
return &InvalidInputError{
Message: "EOF while reading metadata",
Document: string(reader.LatestLine()),
Message: "EOF while reading metadata",
}
}
return reader.wrapError(err)
Expand All @@ -136,8 +135,7 @@ func (p *Processor) readMetadata(reader *streamReader, out *modelpb.APMEvent) er
}
default:
return &InvalidInputError{
Message: fmt.Sprintf("%q or %q required", v2MetadataKey, rumv3MetadataKey),
Document: string(reader.LatestLine()),
Message: fmt.Sprintf("%q or %q required", v2MetadataKey, rumv3MetadataKey),
}
}
return nil
Expand Down Expand Up @@ -223,8 +221,7 @@ func (p *Processor) readBatch(
}
if err != nil && err != io.EOF {
result.addError(&InvalidInputError{
Message: err.Error(),
Document: string(reader.LatestLine()),
Message: err.Error(),
})
}
}
Expand Down Expand Up @@ -274,8 +271,7 @@ func (p *Processor) HandleStream(
return fmt.Errorf("cannot read metadata in stream: %w", err)
}
return &InvalidInputError{
Message: err.Error(),
Document: string(sr.LatestLine()),
Message: err.Error(),
}
}

Expand Down Expand Up @@ -351,8 +347,7 @@ type streamReader struct {
func (sr *streamReader) wrapError(err error) error {
if _, ok := err.(decoder.JSONDecodeError); ok {
return &InvalidInputError{
Message: err.Error(),
Document: string(sr.LatestLine()),
Message: err.Error(),
}
}

Expand All @@ -364,7 +359,6 @@ func (sr *streamReader) wrapError(err error) error {
return &InvalidInputError{
TooLarge: true,
Message: "event exceeded the permitted size",
Document: string(sr.LatestLine()),
}
}
return err
Expand Down
19 changes: 6 additions & 13 deletions input/elasticapm/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ func TestHandleStreamErrors(t *testing.T) {
invalid: 1,
errors: []error{
&InvalidInputError{
Message: `decode error: data read error: v2.transactionRoot.Transaction: v2.transaction.ID: ReadString: expects " or n,`,
Document: invalidEvent,
Message: `decode error: data read error: v2.transactionRoot.Transaction: v2.transaction.ID: ReadString: expects " or n,`,
},
},
}, {
Expand All @@ -145,39 +144,34 @@ func TestHandleStreamErrors(t *testing.T) {
invalid: 1,
errors: []error{
&InvalidInputError{
Message: `did not recognize object type: "invalid-json"`,
Document: invalidJSONEvent,
Message: `did not recognize object type: "invalid-json"`,
},
},
}, {
name: "InvalidJSONMetadata",
payload: invalidJSONMetadata + "\n",
err: &InvalidInputError{
Message: "decode error: data read error: v2.metadataRoot.Metadata: v2.metadata.readFieldHash: expect :,",
Document: invalidJSONMetadata,
Message: "decode error: data read error: v2.metadataRoot.Metadata: v2.metadata.readFieldHash: expect :,",
},
}, {
name: "InvalidMetadata",
payload: invalidMetadata + "\n",
err: &InvalidInputError{
Message: "validation error: 'metadata' required",
Document: invalidMetadata,
Message: "validation error: 'metadata' required",
},
}, {
name: "InvalidMetadata2",
payload: invalidMetadata2 + "\n",
err: fmt.Errorf("cannot read metadata in stream: %w", &InvalidInputError{
Message: `"metadata" or "m" required`,
Document: invalidMetadata2,
Message: `"metadata" or "m" required`,
}),
}, {
name: "UnrecognizedEvent",
payload: validMetadata + "\n" + invalidEventType + "\n",
invalid: 1,
errors: []error{
&InvalidInputError{
Message: `did not recognize object type: "tennis-court"`,
Document: invalidEventType,
Message: `did not recognize object type: "tennis-court"`,
},
},
}, {
Expand All @@ -190,7 +184,6 @@ func TestHandleStreamErrors(t *testing.T) {
&InvalidInputError{
TooLarge: true,
Message: "event exceeded the permitted size",
Document: tooLargeEvent[:len(validMetadata)+1],
},
},
}} {
Expand Down
3 changes: 2 additions & 1 deletion input/elasticapm/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func (r *Result) addError(err error) {
}

type InvalidInputError struct {
Message string
Message string
// Deprecated
Document string
TooLarge bool
}
Expand Down