Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@ import (
"fmt"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"

"github.com/celestiaorg/celestia-app/v6/pkg/da"
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/libs/utils"
"github.com/celestiaorg/celestia-node/store"
)

const concurrencyLimit = 16

var tracer = otel.Tracer("core")

type Exchange struct {
fetcher *BlockFetcher
store *store.Store
Expand Down Expand Up @@ -70,16 +76,12 @@ func (ce *Exchange) GetRangeByHeight(
from *header.ExtendedHeader,
to uint64,
) ([]*header.ExtendedHeader, error) {
start := time.Now()

amount := to - (from.Height() + 1)
headers, err := ce.getRangeByHeight(ctx, from.Height()+1, amount)
if err != nil {
return nil, err
}

ce.metrics.requestDurationPerHeader(ctx, time.Since(start), amount)

for _, h := range headers {
err := libhead.Verify[*header.ExtendedHeader](from, h)
if err != nil {
Expand Down Expand Up @@ -165,26 +167,41 @@ func (ce *Exchange) Head(
}

func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height int64) (*header.ExtendedHeader, error) {
var err error

ctx, span := tracer.Start(ctx, "exchange: getExtendedHeaderByHeight")
defer func() {
utils.SetStatusAndEnd(span, err)
}()
span.SetAttributes(attribute.Int64("height", height))

b, err := ce.fetcher.GetSignedBlock(ctx, height)
if err != nil {
return nil, fmt.Errorf("fetching signed block at height %d from core: %w", height, err)
}
span.AddEvent("fetched signed block from core")
log.Debugw("fetched signed block from core", "height", b.Header.Height)

eds, err := da.ConstructEDS(b.Data.Txs.ToSliceOfBytes(), b.Header.Version.App, -1)
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err)
}

// create extended header
eh, err := ce.construct(b.Header, b.Commit, b.ValidatorSet, eds)
if err != nil {
panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err))
}
span.AddEvent("exchange: constructed extended header",
trace.WithAttributes(attribute.Int("square_size", eh.DAH.SquareSize())),
)

err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow, ce.archival)
if err != nil {
return nil, err
}
span.AddEvent("exchange: stored square")

ce.metrics.observeBlockProcessed(ctx, eh.DAH.SquareSize())
return eh, nil
}
24 changes: 12 additions & 12 deletions core/exchange_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@ package core

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/celestiaorg/celestia-node/libs/utils"
)

type exchangeMetrics struct {
getByHeightDuration metric.Float64Histogram
totalBlocksProcessed metric.Int64Counter
}

func newExchangeMetrics() (*exchangeMetrics, error) {
m := new(exchangeMetrics)

var err error
m.getByHeightDuration, err = meter.Float64Histogram(
"core_ex_get_by_height_request_time",
metric.WithDescription("core exchange client getByHeight request time in seconds (per single height)"),
m.totalBlocksProcessed, err = meter.Int64Counter(
"core_ex_total_blocks_processed",
metric.WithDescription("total number of blocks processed by the exchange"),
)
if err != nil {
return nil, err
Expand All @@ -34,16 +34,16 @@ func (m *exchangeMetrics) observe(ctx context.Context, observeFn func(ctx contex
}

ctx = utils.ResetContextOnError(ctx)

observeFn(ctx)
}

func (m *exchangeMetrics) requestDurationPerHeader(ctx context.Context, duration time.Duration, amount uint64) {
func (m *exchangeMetrics) observeBlockProcessed(ctx context.Context, edsSize int) {
m.observe(ctx, func(ctx context.Context) {
if amount == 0 {
return
}
durationPerHeader := duration.Seconds() / float64(amount)
m.getByHeightDuration.Record(ctx, durationPerHeader)
m.totalBlocksProcessed.Add(ctx, 1, metric.WithAttributes(edsSizeAttribute(edsSize)))
})
}

// edsSizeAttribute creates an attribute for the EDS square size
func edsSizeAttribute(size int) attribute.KeyValue {
return attribute.Int("eds_size", size)
}
15 changes: 12 additions & 3 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/cometbft/cometbft/types"
"github.com/gogo/protobuf/proto"
logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"

libhead "github.com/celestiaorg/go-header"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*SignedBlock
if err != nil {
return nil, err
}
block, err := receiveBlockByHeight(stream)
block, err := f.receiveBlockByHeight(ctx, stream)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -97,7 +98,7 @@ func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*Signe
if err != nil {
return nil, err
}
return receiveBlockByHeight(stream)
return f.receiveBlockByHeight(ctx, stream)
}

// Commit queries Core for a `Commit` from the block at
Expand Down Expand Up @@ -220,17 +221,21 @@ func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) {
return resp.SyncInfo.CatchingUp, nil
}

func receiveBlockByHeight(streamer coregrpc.BlockAPI_BlockByHeightClient) (
func (f *BlockFetcher) receiveBlockByHeight(ctx context.Context, streamer coregrpc.BlockAPI_BlockByHeightClient) (
*SignedBlock,
error,
) {
span := trace.SpanFromContext(ctx)

parts := make([]*tmproto.Part, 0)

// receive the first part to get the block meta, commit, and validator set
firstPart, err := streamer.Recv()
if err != nil {
return nil, err
}
span.AddEvent("fetcher: received first block part")

commit, err := types.CommitFromProto(firstPart.Commit)
if err != nil {
return nil, err
Expand All @@ -248,13 +253,17 @@ func receiveBlockByHeight(streamer coregrpc.BlockAPI_BlockByHeightClient) (
if err != nil {
return nil, err
}
span.AddEvent("fetcher: received block part")

parts = append(parts, resp.BlockPart)
isLast = resp.IsLast
}

block, err := partsToBlock(parts)
if err != nil {
return nil, err
}

return &SignedBlock{
Header: &block.Header,
Commit: commit,
Expand Down
18 changes: 13 additions & 5 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,18 @@ import (

"github.com/cometbft/cometbft/types"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/celestiaorg/celestia-app/v6/pkg/da"
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/libs/utils"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
"github.com/celestiaorg/celestia-node/store"
)

var tracer = otel.Tracer("core/listener")

// Listener is responsible for listening to Core for
// new block events and converting new Core blocks into
// the main data structure used in the Celestia DA network:
Expand Down Expand Up @@ -168,8 +167,12 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan types.EventDataSigned
}

func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataSignedBlock) error {
ctx, span := tracer.Start(ctx, "handle-new-signed-block")
defer span.End()
var err error

ctx, span := tracer.Start(ctx, "listener: handleNewSignedBlock")
defer func() {
utils.SetStatusAndEnd(span, err)
}()
span.SetAttributes(
attribute.Int64("height", b.Header.Height),
)
Expand All @@ -184,16 +187,21 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
if err != nil {
panic(fmt.Errorf("making extended header: %w", err))
}
span.AddEvent("listener: constructed extended header",
trace.WithAttributes(attribute.Int("square_size", eh.DAH.SquareSize())),
)

err = storeEDS(ctx, eh, eds, cl.store, cl.availabilityWindow, cl.archival)
if err != nil {
return fmt.Errorf("storing EDS: %w", err)
}
span.AddEvent("listener: stored square")

syncing, err := cl.fetcher.IsSyncing(ctx)
if err != nil {
return fmt.Errorf("getting sync state: %w", err)
}
span.AddEvent("listener: fetched sync state")

// notify network of new EDS hash only if core is already synced
if !syncing {
Expand Down
Loading