diff --git a/cmd/otelbench/chdump/ingester.go b/cmd/otelbench/chdump/ingester.go new file mode 100644 index 00000000..c3aafbd8 --- /dev/null +++ b/cmd/otelbench/chdump/ingester.go @@ -0,0 +1,56 @@ +package chdump + +import ( + "context" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "golang.org/x/sync/errgroup" +) + +// IngestLogs loads logs from dump and sends them to the collector. +type IngestLogs struct { + // Workers is the number of workers to use. + // + // Defaults to 1. + Workers int +} + +// Run ingests logs. +func (lg IngestLogs) Run(ctx context.Context, client plogotlp.GRPCClient, tr TableReader) error { + var ( + workers = max(lg.Workers, 1) + batcnCh = make(chan plog.Logs, workers) + ) + grp, grpCtx := errgroup.WithContext(ctx) + grp.Go(func() error { + defer close(batcnCh) + return Consume(tr, ConsumeOptions{ + OnLogs: func(t *Logs) error { + ctx := grpCtx + + batch := plog.NewLogs() + t.ToOTLP(batch) + + select { + case <-ctx.Done(): + return ctx.Err() + case batcnCh <- batch: + return nil + } + }, + }) + }) + for range workers { + grp.Go(func() error { + ctx := grpCtx + for batch := range batcnCh { + if _, err := client.Export(ctx, plogotlp.NewExportRequestFromLogs(batch)); err != nil { + return err + } + } + return nil + }) + } + return grp.Wait() +} diff --git a/cmd/otelbench/chdump/logs.go b/cmd/otelbench/chdump/logs.go index 18295265..ace93d46 100644 --- a/cmd/otelbench/chdump/logs.go +++ b/cmd/otelbench/chdump/logs.go @@ -5,6 +5,8 @@ import ( "slices" "github.com/ClickHouse/ch-go/proto" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -75,6 +77,81 @@ func (c *Logs) Reset() { } } +// ToOTLP appends data from [Logs] to given batch. +func (c *Logs) ToOTLP(batch plog.Logs) { + resMap := map[otelstorage.Hash]plog.ResourceLogs{} + resLogs := batch.ResourceLogs() + for i := range resLogs.Len() { + resLog := resLogs.At(i) + attrs := otelstorage.Attrs(resLog.Resource().Attributes()) + resMap[attrs.Hash()] = resLog + } + + getResLog := func(resourceAttrs otelstorage.Attrs) plog.ResourceLogs { + hash := resourceAttrs.Hash() + + resLog, ok := resMap[hash] + if !ok { + resLog = resLogs.AppendEmpty() + resource := resLog.Resource() + resourceAttrs.AsMap().CopyTo(resource.Attributes()) + + resMap[hash] = resLog + } + return resLog + } + getScopeLog := func(resLog plog.ResourceLogs, scopeAttrs otelstorage.Attrs, scopeName, scopeVersion string) plog.ScopeLogs { + scopeLogs := resLog.ScopeLogs() + scopeAttrsHash := scopeAttrs.Hash() + + for i := range scopeLogs.Len() { + scopeLog := scopeLogs.At(i) + scope := scopeLog.Scope() + if scope.Name() == scopeName && + scope.Version() == scopeVersion && + otelstorage.Attrs(scope.Attributes()).Hash() == scopeAttrsHash { + return scopeLog + } + } + scopeLog := scopeLogs.AppendEmpty() + + scope := scopeLog.Scope() + scope.SetName(scopeName) + scope.SetVersion(scopeVersion) + scopeAttrs.AsMap().CopyTo(scope.Attributes()) + + return scopeLog + } + + for row := range c.Body.Rows() { + timestamp := c.Timestamp.Row(row) + severityText := c.SeverityText.Row(row) + severityNumber := c.SeverityNumber.Row(row) + traceFlags := c.TraceFlags.Row(row) + traceID := c.TraceID.Row(row) + spanID := c.SpanID.Row(row) + body := c.Body.Row(row) + attributes := c.Attributes.Row(row) + resource := c.Resource.Row(row) + scope := c.Scope.Row(row) + scopeName := c.ScopeName.Row(row) + scopeVersion := c.ScopeVersion.Row(row) + + resLog := getResLog(resource) + scopeLog := getScopeLog(resLog, scope, scopeName, scopeVersion) + record := scopeLog.LogRecords().AppendEmpty() + + record.SetTimestamp(otelstorage.NewTimestampFromTime(timestamp)) + record.SetSeverityText(severityText) + record.SetSeverityNumber(plog.SeverityNumber(severityNumber)) + record.SetFlags(plog.LogRecordFlags(traceFlags)) + record.SetTraceID(pcommon.TraceID(traceID)) + record.SetSpanID(pcommon.SpanID(spanID)) + record.Body().SetStr(body) + attributes.CopyTo(record.Attributes()) + } +} + func (c *Logs) columns() iter.Seq[proto.ResultColumn] { return func(yield func(proto.ResultColumn) bool) { for _, col := range []proto.ResultColumn{