Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions codec/fullevent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import (

"github.com/elastic/apm-data/model/modelpb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

func fullEvent(t *testing.B) *modelpb.APMEvent {
return &modelpb.APMEvent{
Timestamp: timestamppb.New(time.Unix(1, 1)),
Timestamp: uint64(time.Second.Nanoseconds() + 1),
Span: &modelpb.Span{
Message: &modelpb.Message{
Body: "body",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func SetStructValues(in interface{}, values *Values, opts ...SetStructValuesOpti
switch fKind := f.Kind(); fKind {
case reflect.String:
fieldVal = reflect.ValueOf(values.Str)
case reflect.Int, reflect.Int32, reflect.Int64:
case reflect.Int, reflect.Int32, reflect.Int64, reflect.Uint64:
fieldVal = reflect.ValueOf(values.Int).Convert(f.Type())
case reflect.Float64:
fieldVal = reflect.ValueOf(values.Float).Convert(f.Type())
Expand Down
7 changes: 2 additions & 5 deletions input/elasticapm/internal/modeldecoder/rumv3/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/nullable"
"github.com/elastic/apm-data/model/modelpb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

var (
Expand Down Expand Up @@ -248,7 +247,7 @@ func mapToErrorModel(from *errorEvent, event *modelpb.APMEvent) {
event.ParentId = from.ParentID.Val
}
if !from.Timestamp.Val.IsZero() {
event.Timestamp = timestamppb.New(from.Timestamp.Val)
event.Timestamp = modelpb.FromTime(from.Timestamp.Val)
}
if from.TraceID.IsSet() {
event.Trace = &modelpb.Trace{
Expand Down Expand Up @@ -628,9 +627,7 @@ func mapToSpanModel(from *span, event *modelpb.APMEvent) {
if from.Start.IsSet() {
// event.Timestamp is initialized to the time the payload was
// received; offset that by "start" milliseconds for RUM.
event.Timestamp = timestamppb.New(event.Timestamp.AsTime().Add(
time.Duration(float64(time.Millisecond) * from.Start.Val),
))
event.Timestamp += uint64(time.Duration(float64(time.Millisecond) * from.Start.Val).Nanoseconds())
}
}

Expand Down
17 changes: 8 additions & 9 deletions input/elasticapm/internal/modeldecoder/rumv3/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder"
Expand All @@ -47,17 +46,17 @@ func TestResetErrorOnRelease(t *testing.T) {

func TestDecodeNestedError(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := time.Now().UTC()
now := modelpb.FromTime(time.Now())
eventBase := initializedMetadata()
eventBase.Timestamp = timestamppb.New(now)
eventBase.Timestamp = now
input := modeldecoder.Input{Base: eventBase}
str := `{"e":{"id":"a-b-c","timestamp":1599996822281000,"log":{"mg":"abc"}}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
var batch modelpb.Batch
require.NoError(t, DecodeNestedError(dec, &input, &batch))
require.Len(t, batch, 1)
require.NotNil(t, batch[0].Error)
assert.Equal(t, time.Unix(1599996822, 281000000).UTC(), batch[0].Timestamp.AsTime())
assert.Equal(t, modelpb.FromTime(time.Unix(1599996822, 281000000)), batch[0].Timestamp)
assert.Empty(t, cmp.Diff(&modelpb.Error{
Id: "a-b-c",
Log: &modelpb.ErrorLog{
Expand All @@ -72,7 +71,7 @@ func TestDecodeNestedError(t *testing.T) {
dec = decoder.NewJSONDecoder(strings.NewReader(str))
batch = modelpb.Batch{}
require.NoError(t, DecodeNestedError(dec, &input, &batch))
assert.Equal(t, now, batch[0].Timestamp.AsTime())
assert.Equal(t, now, batch[0].Timestamp)

// test decode
err := DecodeNestedError(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch)
Expand Down Expand Up @@ -146,24 +145,24 @@ func TestDecodeMapToErrorModel(t *testing.T) {
}
var input errorEvent
var out1, out2 modelpb.APMEvent
reqTime := time.Now().Add(time.Second).UTC()
out1.Timestamp = timestamppb.New(reqTime)
reqTime := modelpb.FromTime(time.Now().Add(time.Second))
out1.Timestamp = reqTime
defaultVal := modeldecodertest.DefaultValues()
modeldecodertest.SetStructValues(&input, defaultVal)
mapToErrorModel(&input, &out1)
input.Reset()
modeldecodertest.AssertStructValues(t, out1.Error, exceptions, defaultVal)

// leave event timestamp unmodified if eventTime is zero
out1.Timestamp = timestamppb.New(reqTime)
out1.Timestamp = reqTime
modeldecodertest.SetStructValues(&input, defaultVal)
mapToErrorModel(&input, &out1)
input.Reset()
modeldecodertest.AssertStructValues(t, out1.Error, exceptions, defaultVal)

// reuse input model for different event
// ensure memory is not shared by reusing input model
out2.Timestamp = timestamppb.New(reqTime)
out2.Timestamp = reqTime
otherVal := modeldecodertest.NonDefaultValues()
modeldecodertest.SetStructValues(&input, otherVal)
mapToErrorModel(&input, &out2)
Expand Down
29 changes: 14 additions & 15 deletions input/elasticapm/internal/modeldecoder/rumv3/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder"
Expand All @@ -47,9 +46,9 @@ func TestResetTransactionOnRelease(t *testing.T) {

func TestDecodeNestedTransaction(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := time.Now().UTC()
now := modelpb.FromTime(time.Now())
eventBase := initializedMetadata()
eventBase.Timestamp = timestamppb.New(now)
eventBase.Timestamp = now
input := modeldecoder.Input{Base: eventBase}
str := `{"x":{"n":"tr-a","d":100,"id":"100","tid":"1","t":"request","yc":{"sd":2},"y":[{"n":"a","d":10,"t":"http","id":"123","s":20}],"me":[{"sa":{"ysc":{"v":5}},"y":{"t":"span_type","su":"span_subtype"}}]}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
Expand All @@ -62,7 +61,7 @@ func TestDecodeNestedTransaction(t *testing.T) {

assert.Equal(t, "request", batch[0].Transaction.Type)
// fall back to request time
assert.Equal(t, now, batch[0].Timestamp.AsTime())
assert.Equal(t, now, batch[0].Timestamp)

// Ensure nested metricsets are decoded. RUMv3 only sends
// breakdown metrics, so the Metricsets will be empty and
Expand All @@ -78,11 +77,11 @@ func TestDecodeNestedTransaction(t *testing.T) {
Subtype: "span_subtype",
SelfTime: &modelpb.AggregatedDuration{Count: 5},
}, batch[1].Span, protocmp.Transform()))
assert.Equal(t, now, batch[1].Timestamp.AsTime())
assert.Equal(t, now, batch[1].Timestamp)

// ensure nested spans are decoded
start := time.Duration(20 * 1000 * 1000)
assert.Equal(t, now.Add(start), batch[2].Timestamp.AsTime()) // add start to timestamp
start := uint64(time.Duration(20 * 1000 * 1000).Nanoseconds())
assert.Equal(t, now+start, batch[2].Timestamp) // add start to timestamp
assert.Equal(t, "100", batch[2].Transaction.Id)
assert.Equal(t, "1", batch[2].Trace.Id)
assert.Equal(t, "100", batch[2].ParentId)
Expand All @@ -93,8 +92,8 @@ func TestDecodeNestedTransaction(t *testing.T) {
})

t.Run("decode-marks", func(t *testing.T) {
now := time.Now()
eventBase := modelpb.APMEvent{Timestamp: timestamppb.New(now)}
now := modelpb.FromTime(time.Now())
eventBase := modelpb.APMEvent{Timestamp: now}
input := modeldecoder.Input{Base: &eventBase}
str := `{"x":{"d":100,"id":"100","tid":"1","t":"request","yc":{"sd":2},"k":{"a":{"dc":0.1,"di":0.2,"ds":0.3,"de":0.4,"fb":0.5,"fp":0.6,"lp":0.7,"long":0.8},"nt":{"fs":0.1,"ls":0.2,"le":0.3,"cs":0.4,"ce":0.5,"qs":0.6,"rs":0.7,"re":0.8,"dl":0.9,"di":0.11,"ds":0.21,"de":0.31,"dc":0.41,"es":0.51,"ee":6,"long":0.99},"long":{"long":0.1}}}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
Expand Down Expand Up @@ -215,16 +214,16 @@ func TestDecodeMapToTransactionModel(t *testing.T) {

var input transaction
var out1, out2 modelpb.APMEvent
reqTime := time.Now().Add(time.Second)
out1.Timestamp = timestamppb.New(reqTime)
reqTime := modelpb.FromTime(time.Now().Add(time.Second))
out1.Timestamp = reqTime
defaultVal := modeldecodertest.DefaultValues()
modeldecodertest.SetStructValues(&input, defaultVal)
mapToTransactionModel(&input, &out1)
input.Reset()
modeldecodertest.AssertStructValues(t, out1.Transaction, exceptions, defaultVal)

// ensure memory is not shared by reusing input model
out2.Timestamp = timestamppb.New(reqTime)
out2.Timestamp = reqTime
otherVal := modeldecodertest.NonDefaultValues()
modeldecodertest.SetStructValues(&input, otherVal)
mapToTransactionModel(&input, &out2)
Expand Down Expand Up @@ -267,16 +266,16 @@ func TestDecodeMapToTransactionModel(t *testing.T) {

var input span
var out1, out2 modelpb.APMEvent
reqTime := time.Now().Add(time.Second)
out1.Timestamp = timestamppb.New(reqTime)
reqTime := modelpb.FromTime(time.Now().Add(time.Second))
out1.Timestamp = reqTime
defaultVal := modeldecodertest.DefaultValues()
modeldecodertest.SetStructValues(&input, defaultVal)
mapToSpanModel(&input, &out1)
input.Reset()
modeldecodertest.AssertStructValues(t, out1.Span, exceptions, defaultVal)

// ensure memory is not shared by reusing input model
out2.Timestamp = timestamppb.New(reqTime)
out2.Timestamp = reqTime
otherVal := modeldecodertest.NonDefaultValues()
modeldecodertest.SetStructValues(&input, otherVal)
mapToSpanModel(&input, &out2)
Expand Down
19 changes: 6 additions & 13 deletions input/elasticapm/internal/modeldecoder/v2/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/elastic/apm-data/input/otlp"
"github.com/elastic/apm-data/model/modelpb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -470,7 +469,7 @@ func mapToErrorModel(from *errorEvent, event *modelpb.APMEvent) {
event.ParentId = from.ParentID.Val
}
if !from.Timestamp.Val.IsZero() {
event.Timestamp = timestamppb.New(from.Timestamp.Val)
event.Timestamp = modelpb.FromTime(from.Timestamp.Val)
}
if from.TraceID.IsSet() {
event.Trace = &modelpb.Trace{
Expand Down Expand Up @@ -731,7 +730,7 @@ func mapToMetricsetModel(from *metricset, event *modelpb.APMEvent) bool {
event.Metricset = &modelpb.Metricset{Name: "app"}

if !from.Timestamp.Val.IsZero() {
event.Timestamp = timestamppb.New(from.Timestamp.Val)
event.Timestamp = modelpb.FromTime(from.Timestamp.Val)
}

if len(from.Samples) > 0 {
Expand Down Expand Up @@ -1179,18 +1178,12 @@ func mapToSpanModel(from *span, event *modelpb.APMEvent) {
out.Sync = &val
}
if !from.Timestamp.Val.IsZero() {
event.Timestamp = timestamppb.New(from.Timestamp.Val)
event.Timestamp = modelpb.FromTime(from.Timestamp.Val)
} else if from.Start.IsSet() {
// event.Timestamp should have been initialized to the time the
// payload was received; offset that by "start" milliseconds for
// RUM.
base := time.Time{}
if event.Timestamp != nil {
base = event.Timestamp.AsTime()
}
event.Timestamp = timestamppb.New(base.Add(
time.Duration(float64(time.Millisecond) * from.Start.Val),
))
event.Timestamp += uint64(time.Duration(float64(time.Millisecond) * from.Start.Val).Nanoseconds())
}
if from.TraceID.IsSet() {
event.Trace = &modelpb.Trace{
Expand Down Expand Up @@ -1411,7 +1404,7 @@ func mapToTransactionModel(from *transaction, event *modelpb.APMEvent) {
out.SpanCount.Started = &started
}
if !from.Timestamp.Val.IsZero() {
event.Timestamp = timestamppb.New(from.Timestamp.Val)
event.Timestamp = modelpb.FromTime(from.Timestamp.Val)
}
if from.TraceID.IsSet() {
event.Trace = &modelpb.Trace{
Expand Down Expand Up @@ -1467,7 +1460,7 @@ func mapToLogModel(from *log, event *modelpb.APMEvent) {
mapToFAASModel(from.FAAS, event.Faas)
}
if !from.Timestamp.Val.IsZero() {
event.Timestamp = timestamppb.New(from.Timestamp.Val)
event.Timestamp = modelpb.FromTime(from.Timestamp.Val)
}
if from.TraceID.IsSet() {
event.Trace = &modelpb.Trace{
Expand Down
9 changes: 4 additions & 5 deletions input/elasticapm/internal/modeldecoder/v2/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder"
Expand All @@ -47,18 +46,18 @@ func TestResetErrorOnRelease(t *testing.T) {

func TestDecodeNestedError(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := time.Now().UTC()
now := modelpb.FromTime(time.Now())
defaultVal := modeldecodertest.DefaultValues()
_, eventBase := initializedInputMetadata(defaultVal)
eventBase.Timestamp = timestamppb.New(now)
eventBase.Timestamp = now
input := modeldecoder.Input{Base: eventBase}
str := `{"error":{"id":"a-b-c","timestamp":1599996822281000,"log":{"message":"abc"}}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
var batch modelpb.Batch
require.NoError(t, DecodeNestedError(dec, &input, &batch))
require.Len(t, batch, 1)
require.NotNil(t, batch[0].Error)
assert.Equal(t, time.Unix(1599996822, 281000000).UTC(), batch[0].Timestamp.AsTime())
assert.Equal(t, modelpb.FromTime(time.Unix(1599996822, 281000000)), batch[0].Timestamp)
assert.Empty(t, cmp.Diff(&modelpb.Error{
Id: "a-b-c",
Log: &modelpb.ErrorLog{Message: "abc"},
Expand All @@ -69,7 +68,7 @@ func TestDecodeNestedError(t *testing.T) {
batch = modelpb.Batch{}
require.NoError(t, DecodeNestedError(dec, &input, &batch))
// if no timestamp is provided, leave base event time unmodified
assert.Equal(t, now, batch[0].Timestamp.AsTime())
assert.Equal(t, now, batch[0].Timestamp)

err := DecodeNestedError(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch)
require.Error(t, err)
Expand Down
15 changes: 7 additions & 8 deletions input/elasticapm/internal/modeldecoder/v2/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder"
Expand All @@ -50,7 +49,7 @@ func TestDecodeNestedLog(t *testing.T) {
require.NoError(t, DecodeNestedLog(dec, &input, &batch))
require.Len(t, batch, 1)
assert.Equal(t, "something happened", batch[0].Message)
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", batch[0].Timestamp.AsTime().String())
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", modelpb.ToTime(batch[0].Timestamp).String())
assert.Equal(t, "trace-id", batch[0].Trace.Id)
assert.Equal(t, "transaction-id", batch[0].Transaction.Id)
assert.Equal(t, "warn", batch[0].Log.Level)
Expand All @@ -68,13 +67,13 @@ func TestDecodeNestedLog(t *testing.T) {
})

t.Run("withoutTimestamp", func(t *testing.T) {
now := time.Now().UTC()
input := modeldecoder.Input{Base: &modelpb.APMEvent{Timestamp: timestamppb.New(now)}}
now := modelpb.FromTime(time.Now())
input := modeldecoder.Input{Base: &modelpb.APMEvent{Timestamp: now}}
str := `{"log":{"message":"something happened"}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
var batch modelpb.Batch
require.NoError(t, DecodeNestedLog(dec, &input, &batch))
assert.Equal(t, now, batch[0].Timestamp.AsTime())
assert.Equal(t, now, batch[0].Timestamp)
})

t.Run("withError", func(t *testing.T) {
Expand All @@ -84,7 +83,7 @@ func TestDecodeNestedLog(t *testing.T) {
var batch modelpb.Batch
require.NoError(t, DecodeNestedLog(dec, &input, &batch))
require.Len(t, batch, 1)
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", batch[0].Timestamp.AsTime().String())
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", modelpb.ToTime(batch[0].Timestamp).String())
assert.Equal(t, "trace-id", batch[0].Trace.Id)
assert.Equal(t, "transaction-id", batch[0].Transaction.Id)
assert.Equal(t, "error", batch[0].Log.Level)
Expand All @@ -111,7 +110,7 @@ func TestDecodeNestedLog(t *testing.T) {
var batch modelpb.Batch
require.NoError(t, DecodeNestedLog(dec, &input, &batch))
require.Len(t, batch, 1)
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", batch[0].Timestamp.AsTime().String())
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", modelpb.ToTime(batch[0].Timestamp).String())
assert.Equal(t, "trace-id", batch[0].Trace.Id)
assert.Equal(t, "transaction-id", batch[0].Transaction.Id)
assert.Equal(t, "error", batch[0].Log.Level)
Expand All @@ -138,7 +137,7 @@ func TestDecodeNestedLog(t *testing.T) {
var batch modelpb.Batch
require.NoError(t, DecodeNestedLog(dec, &input, &batch))
require.Len(t, batch, 1)
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", batch[0].Timestamp.AsTime().String())
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", modelpb.ToTime(batch[0].Timestamp).String())
assert.Equal(t, "trace-id", batch[0].Trace.Id)
assert.Equal(t, "transaction-id", batch[0].Transaction.Id)
assert.Equal(t, "error", batch[0].Log.Level)
Expand Down
Loading