Skip to content
Open
27 changes: 19 additions & 8 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@ func NewExchange(
opt(&p)
}

var (
metrics *exchangeMetrics
err error
)
var metrics *exchangeMetrics
if p.metrics {
// set metrics for fetcher
fetcherMetrics, err := newFetcherMetrics()
if err != nil {
return nil, err
}
fetcher.metrics = fetcherMetrics

metrics, err = newExchangeMetrics()
if err != nil {
return nil, err
Expand All @@ -70,16 +74,12 @@ func (ce *Exchange) GetRangeByHeight(
from *header.ExtendedHeader,
to uint64,
) ([]*header.ExtendedHeader, error) {
start := time.Now()

amount := to - (from.Height() + 1)
headers, err := ce.getRangeByHeight(ctx, from.Height()+1, amount)
if err != nil {
return nil, err
}

ce.metrics.requestDurationPerHeader(ctx, time.Since(start), amount)

for _, h := range headers {
err := libhead.Verify[*header.ExtendedHeader](from, h)
if err != nil {
Expand Down Expand Up @@ -165,26 +165,37 @@ func (ce *Exchange) Head(
}

func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height int64) (*header.ExtendedHeader, error) {
start := time.Now()
b, err := ce.fetcher.GetSignedBlock(ctx, height)
if err != nil {
return nil, fmt.Errorf("fetching signed block at height %d from core: %w", height, err)
}
downloadTime := time.Since(start)
log.Debugw("fetched signed block from core", "height", b.Header.Height)

start = time.Now()
eds, err := da.ConstructEDS(b.Data.Txs.ToSliceOfBytes(), b.Header.Version.App, -1)
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err)
}

// create extended header
eh, err := ce.construct(b.Header, b.Commit, b.ValidatorSet, eds)
if err != nil {
panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err))
}
constructTime := time.Since(start)

start = time.Now()
err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow, ce.archival)
if err != nil {
return nil, err
}
storeTime := time.Since(start)

ce.metrics.observeBlockDownload(ctx, downloadTime, eh.DAH.SquareSize())
ce.metrics.observeEDSConstruction(ctx, constructTime, eh.DAH.SquareSize())
ce.metrics.observeEDSStorage(ctx, storeTime, eh.DAH.SquareSize())

return eh, nil
}
59 changes: 48 additions & 11 deletions core/exchange_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,44 @@ import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/celestiaorg/celestia-node/libs/utils"
)

type exchangeMetrics struct {
getByHeightDuration metric.Float64Histogram
downloadDuration metric.Float64Histogram
edsConstructionDuration metric.Float64Histogram
edsStorageDuration metric.Float64Histogram
}

func newExchangeMetrics() (*exchangeMetrics, error) {
m := new(exchangeMetrics)

var err error
m.getByHeightDuration, err = meter.Float64Histogram(
"core_ex_get_by_height_request_time",
metric.WithDescription("core exchange client getByHeight request time in seconds (per single height)"),
m.downloadDuration, err = meter.Float64Histogram(
"core_ex_block_download_time",
metric.WithDescription("time to download block from core in milliseconds"),
metric.WithUnit("ms"),
)
if err != nil {
return nil, err
}

m.edsConstructionDuration, err = meter.Float64Histogram(
"core_ex_eds_construction_time",
metric.WithDescription("time to construct EDS from block data in milliseconds"),
metric.WithUnit("ms"),
)
if err != nil {
return nil, err
}

m.edsStorageDuration, err = meter.Float64Histogram(
"core_ex_eds_storage_time",
metric.WithDescription("time to store EDS in milliseconds"),
metric.WithUnit("ms"),
)
if err != nil {
return nil, err
Expand All @@ -34,16 +56,31 @@ func (m *exchangeMetrics) observe(ctx context.Context, observeFn func(ctx contex
}

ctx = utils.ResetContextOnError(ctx)

observeFn(ctx)
}

func (m *exchangeMetrics) requestDurationPerHeader(ctx context.Context, duration time.Duration, amount uint64) {
func (m *exchangeMetrics) observeBlockDownload(ctx context.Context, duration time.Duration, edsSize int) {
m.observe(ctx, func(ctx context.Context) {
if amount == 0 {
return
}
durationPerHeader := duration.Seconds() / float64(amount)
m.getByHeightDuration.Record(ctx, durationPerHeader)
m.downloadDuration.Record(ctx, float64(duration.Milliseconds()),
metric.WithAttributes(edsSizeAttribute(edsSize)))
})
}

func (m *exchangeMetrics) observeEDSConstruction(ctx context.Context, duration time.Duration, edsSize int) {
m.observe(ctx, func(ctx context.Context) {
m.edsConstructionDuration.Record(ctx, float64(duration.Milliseconds()),
metric.WithAttributes(edsSizeAttribute(edsSize)))
})
}

func (m *exchangeMetrics) observeEDSStorage(ctx context.Context, duration time.Duration, edsSize int) {
m.observe(ctx, func(ctx context.Context) {
m.edsStorageDuration.Record(ctx, float64(duration.Milliseconds()),
metric.WithAttributes(edsSizeAttribute(edsSize)))
})
}

// edsSizeAttribute creates an attribute for the EDS square size
func edsSizeAttribute(size int) attribute.KeyValue {
return attribute.Int("eds_size", size)
}
14 changes: 11 additions & 3 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var (

type BlockFetcher struct {
client coregrpc.BlockAPIClient

metrics *fetcherMetrics
}

// NewBlockFetcher returns a new `BlockFetcher`.
Expand Down Expand Up @@ -67,7 +69,7 @@ func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*SignedBlock
if err != nil {
return nil, err
}
block, err := receiveBlockByHeight(stream)
block, err := f.receiveBlockByHeight(ctx, stream)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -97,7 +99,7 @@ func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*Signe
if err != nil {
return nil, err
}
return receiveBlockByHeight(stream)
return f.receiveBlockByHeight(ctx, stream)
}

// Commit queries Core for a `Commit` from the block at
Expand Down Expand Up @@ -220,10 +222,12 @@ func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) {
return resp.SyncInfo.CatchingUp, nil
}

func receiveBlockByHeight(streamer coregrpc.BlockAPI_BlockByHeightClient) (
func (f *BlockFetcher) receiveBlockByHeight(ctx context.Context, streamer coregrpc.BlockAPI_BlockByHeightClient) (
*SignedBlock,
error,
) {
start := time.Now()

parts := make([]*tmproto.Part, 0)

// receive the first part to get the block meta, commit, and validator set
Expand Down Expand Up @@ -251,10 +255,14 @@ func receiveBlockByHeight(streamer coregrpc.BlockAPI_BlockByHeightClient) (
parts = append(parts, resp.BlockPart)
isLast = resp.IsLast
}

f.metrics.observeReceiveBlock(ctx, time.Since(start), len(parts))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW - the majority of download time (exchange metric) is actually spent here so I'm not 100% sure we need both exchange block download time and fetcher receive block time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to keep it actually to observe departure in case it happens.


block, err := partsToBlock(parts)
if err != nil {
return nil, err
}

return &SignedBlock{
Header: &block.Header,
Commit: commit,
Expand Down
49 changes: 49 additions & 0 deletions core/fetcher_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package core

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/celestiaorg/celestia-node/libs/utils"
)

type fetcherMetrics struct {
receiveBlockDuration metric.Float64Histogram
}

func newFetcherMetrics() (*fetcherMetrics, error) {
m := new(fetcherMetrics)

var err error
m.receiveBlockDuration, err = meter.Float64Histogram(
"core_fetcher_receive_block_time",
metric.WithDescription("time to receive block from core in milliseconds"),
metric.WithUnit("ms"),
)
if err != nil {
return nil, err
}

return m, nil
}

func (m *fetcherMetrics) observe(ctx context.Context, observeFn func(ctx context.Context)) {
if m == nil {
return
}

ctx = utils.ResetContextOnError(ctx)
observeFn(ctx)
}

func (m *fetcherMetrics) observeReceiveBlock(ctx context.Context, duration time.Duration, numParts int) {
m.observe(ctx, func(ctx context.Context) {
m.receiveBlockDuration.Record(ctx, float64(duration.Milliseconds()),
// can be used to approximate the block size (with each part being roughly
// 64KB)
metric.WithAttributes(attribute.Int("num_parts", numParts)))
})
}
Loading