diff --git a/core/exchange.go b/core/exchange.go index 3297db0bb5..697880e22d 100644 --- a/core/exchange.go +++ b/core/exchange.go @@ -6,17 +6,23 @@ import ( "fmt" "time" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" "github.com/celestiaorg/celestia-app/v6/pkg/da" libhead "github.com/celestiaorg/go-header" "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/libs/utils" "github.com/celestiaorg/celestia-node/store" ) const concurrencyLimit = 16 +var tracer = otel.Tracer("core") + type Exchange struct { fetcher *BlockFetcher store *store.Store @@ -70,16 +76,12 @@ func (ce *Exchange) GetRangeByHeight( from *header.ExtendedHeader, to uint64, ) ([]*header.ExtendedHeader, error) { - start := time.Now() - amount := to - (from.Height() + 1) headers, err := ce.getRangeByHeight(ctx, from.Height()+1, amount) if err != nil { return nil, err } - ce.metrics.requestDurationPerHeader(ctx, time.Since(start), amount) - for _, h := range headers { err := libhead.Verify[*header.ExtendedHeader](from, h) if err != nil { @@ -165,26 +167,41 @@ func (ce *Exchange) Head( } func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height int64) (*header.ExtendedHeader, error) { + var err error + + ctx, span := tracer.Start(ctx, "exchange: getExtendedHeaderByHeight") + defer func() { + utils.SetStatusAndEnd(span, err) + }() + span.SetAttributes(attribute.Int64("height", height)) + b, err := ce.fetcher.GetSignedBlock(ctx, height) if err != nil { return nil, fmt.Errorf("fetching signed block at height %d from core: %w", height, err) } + span.AddEvent("fetched signed block from core") log.Debugw("fetched signed block from core", "height", b.Header.Height) eds, err := da.ConstructEDS(b.Data.Txs.ToSliceOfBytes(), b.Header.Version.App, -1) if err != nil { return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err) } + // create extended header eh, err := ce.construct(b.Header, b.Commit, b.ValidatorSet, eds) if err != nil { panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err)) } + span.AddEvent("exchange: constructed extended header", + trace.WithAttributes(attribute.Int("square_size", eh.DAH.SquareSize())), + ) err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow, ce.archival) if err != nil { return nil, err } + span.AddEvent("exchange: stored square") + ce.metrics.observeBlockProcessed(ctx, eh.DAH.SquareSize()) return eh, nil } diff --git a/core/exchange_metrics.go b/core/exchange_metrics.go index 4e5bf5956c..727f1c720d 100644 --- a/core/exchange_metrics.go +++ b/core/exchange_metrics.go @@ -2,24 +2,24 @@ package core import ( "context" - "time" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "github.com/celestiaorg/celestia-node/libs/utils" ) type exchangeMetrics struct { - getByHeightDuration metric.Float64Histogram + totalBlocksProcessed metric.Int64Counter } func newExchangeMetrics() (*exchangeMetrics, error) { m := new(exchangeMetrics) var err error - m.getByHeightDuration, err = meter.Float64Histogram( - "core_ex_get_by_height_request_time", - metric.WithDescription("core exchange client getByHeight request time in seconds (per single height)"), + m.totalBlocksProcessed, err = meter.Int64Counter( + "core_ex_total_blocks_processed", + metric.WithDescription("total number of blocks processed by the exchange"), ) if err != nil { return nil, err @@ -34,16 +34,16 @@ func (m *exchangeMetrics) observe(ctx context.Context, observeFn func(ctx contex } ctx = utils.ResetContextOnError(ctx) - observeFn(ctx) } -func (m *exchangeMetrics) requestDurationPerHeader(ctx context.Context, duration time.Duration, amount uint64) { +func (m *exchangeMetrics) observeBlockProcessed(ctx context.Context, edsSize int) { m.observe(ctx, func(ctx context.Context) { - if amount == 0 { - return - } - durationPerHeader := duration.Seconds() / float64(amount) - m.getByHeightDuration.Record(ctx, durationPerHeader) + m.totalBlocksProcessed.Add(ctx, 1, metric.WithAttributes(edsSizeAttribute(edsSize))) }) } + +// edsSizeAttribute creates an attribute for the EDS square size +func edsSizeAttribute(size int) attribute.KeyValue { + return attribute.Int("eds_size", size) +} diff --git a/core/fetcher.go b/core/fetcher.go index 0b1a63f1a9..8e14186339 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -10,6 +10,7 @@ import ( "github.com/cometbft/cometbft/types" "github.com/gogo/protobuf/proto" logging "github.com/ipfs/go-log/v2" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" libhead "github.com/celestiaorg/go-header" @@ -67,7 +68,7 @@ func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*SignedBlock if err != nil { return nil, err } - block, err := receiveBlockByHeight(stream) + block, err := f.receiveBlockByHeight(ctx, stream) if err != nil { return nil, err } @@ -97,7 +98,7 @@ func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*Signe if err != nil { return nil, err } - return receiveBlockByHeight(stream) + return f.receiveBlockByHeight(ctx, stream) } // Commit queries Core for a `Commit` from the block at @@ -220,10 +221,12 @@ func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) { return resp.SyncInfo.CatchingUp, nil } -func receiveBlockByHeight(streamer coregrpc.BlockAPI_BlockByHeightClient) ( +func (f *BlockFetcher) receiveBlockByHeight(ctx context.Context, streamer coregrpc.BlockAPI_BlockByHeightClient) ( *SignedBlock, error, ) { + span := trace.SpanFromContext(ctx) + parts := make([]*tmproto.Part, 0) // receive the first part to get the block meta, commit, and validator set @@ -231,6 +234,8 @@ func receiveBlockByHeight(streamer coregrpc.BlockAPI_BlockByHeightClient) ( if err != nil { return nil, err } + span.AddEvent("fetcher: received first block part") + commit, err := types.CommitFromProto(firstPart.Commit) if err != nil { return nil, err @@ -248,13 +253,17 @@ func receiveBlockByHeight(streamer coregrpc.BlockAPI_BlockByHeightClient) ( if err != nil { return nil, err } + span.AddEvent("fetcher: received block part") + parts = append(parts, resp.BlockPart) isLast = resp.IsLast } + block, err := partsToBlock(parts) if err != nil { return nil, err } + return &SignedBlock{ Header: &block.Header, Commit: commit, diff --git a/core/listener.go b/core/listener.go index e4250ddb80..a9209d9eb5 100644 --- a/core/listener.go +++ b/core/listener.go @@ -8,19 +8,18 @@ import ( "github.com/cometbft/cometbft/types" pubsub "github.com/libp2p/go-libp2p-pubsub" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/celestiaorg/celestia-app/v6/pkg/da" libhead "github.com/celestiaorg/go-header" "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/libs/utils" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" "github.com/celestiaorg/celestia-node/store" ) -var tracer = otel.Tracer("core/listener") - // Listener is responsible for listening to Core for // new block events and converting new Core blocks into // the main data structure used in the Celestia DA network: @@ -168,8 +167,12 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan types.EventDataSigned } func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataSignedBlock) error { - ctx, span := tracer.Start(ctx, "handle-new-signed-block") - defer span.End() + var err error + + ctx, span := tracer.Start(ctx, "listener: handleNewSignedBlock") + defer func() { + utils.SetStatusAndEnd(span, err) + }() span.SetAttributes( attribute.Int64("height", b.Header.Height), ) @@ -184,16 +187,21 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS if err != nil { panic(fmt.Errorf("making extended header: %w", err)) } + span.AddEvent("listener: constructed extended header", + trace.WithAttributes(attribute.Int("square_size", eh.DAH.SquareSize())), + ) err = storeEDS(ctx, eh, eds, cl.store, cl.availabilityWindow, cl.archival) if err != nil { return fmt.Errorf("storing EDS: %w", err) } + span.AddEvent("listener: stored square") syncing, err := cl.fetcher.IsSyncing(ctx) if err != nil { return fmt.Errorf("getting sync state: %w", err) } + span.AddEvent("listener: fetched sync state") // notify network of new EDS hash only if core is already synced if !syncing {