Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions aggregator/pkg/common/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type CommitVerificationAggregatedStore interface {
QueryAggregatedReports(ctx context.Context, start int64, committeeID string, token *string) (*model.PaginatedAggregatedReports, error)
// GetCCVData retrieves the aggregated CCV data for a specific message ID.
GetCCVData(ctx context.Context, messageID model.MessageID, committeeID string) (*model.CommitAggregatedReport, error)
// GetBatchCCVData retrieves the aggregated CCV data for multiple message IDs efficiently.
// Returns a map of messageID hex string to CommitAggregatedReport. Missing message IDs are not included in the map.
GetBatchCCVData(ctx context.Context, messageIDs []model.MessageID, committeeID string) (map[string]*model.CommitAggregatedReport, error)
}

// ChainStatus represents chain status data with finalized block height and disabled flag.
Expand Down
18 changes: 11 additions & 7 deletions aggregator/pkg/handlers/batch_write_commit_ccv_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"sync"

"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
grpcstatus "google.golang.org/grpc/status" //nolint:gci

"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/scope"
"github.com/smartcontractkit/chainlink-common/pkg/logger"

pb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go/v1"
pb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go/v1" //nolint:gci
)

// WriteCommitCCVNodeDataHandler handles requests to write commit verification records.
Expand All @@ -26,7 +27,7 @@ func (h *BatchWriteCommitCCVNodeDataHandler) logger(ctx context.Context) logger.
func (h *BatchWriteCommitCCVNodeDataHandler) Handle(ctx context.Context, req *pb.BatchWriteCommitCCVNodeDataRequest) (*pb.BatchWriteCommitCCVNodeDataResponse, error) {
requests := req.GetRequests()
responses := make([]*pb.WriteCommitCCVNodeDataResponse, len(requests))
errors := make([]error, len(requests))
errors := make([]*status.Status, len(requests))

wg := sync.WaitGroup{}

Expand All @@ -36,20 +37,23 @@ func (h *BatchWriteCommitCCVNodeDataHandler) Handle(ctx context.Context, req *pb
defer wg.Done()
resp, err := h.handler.Handle(ctx, r)
if err != nil {
statusErr, ok := status.FromError(err)
statusErr, ok := grpcstatus.FromError(err)
if !ok {
h.logger(ctx).Errorf("unexpected error type: %v", err)
errors[i] = status.Error(codes.Unknown, "unexpected error")
errors[i] = grpcstatus.New(codes.Unknown, "unexpected error").Proto()
} else {
errors[i] = statusErr.Proto()
}
errors[i] = statusErr.Err()
} else {
responses[i] = resp
}
responses[i] = resp
}(i, r)
}

wg.Wait()
return &pb.BatchWriteCommitCCVNodeDataResponse{
Responses: responses,
Errors: errors,
}, nil
}

Expand Down
116 changes: 116 additions & 0 deletions aggregator/pkg/handlers/get_batch_ccv_data_for_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package handlers

import (
"context"

"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status" //nolint:gci

"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/common"
"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/model"
"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/scope"
"github.com/smartcontractkit/chainlink-common/pkg/logger"

ethcommon "github.com/ethereum/go-ethereum/common" //nolint:goimports
pb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go/v1" //nolint:gci
)

// GetBatchCCVDataForMessageHandler handles batch requests to retrieve commit verification data for multiple message IDs.
type GetBatchCCVDataForMessageHandler struct {
storage common.CommitVerificationAggregatedStore
committee map[string]*model.Committee
l logger.SugaredLogger
maxMessageIDsPerBatch int
}

func (h *GetBatchCCVDataForMessageHandler) logger(ctx context.Context) logger.SugaredLogger {
return scope.AugmentLogger(ctx, h.l)
}

// Handle processes the batch get request and retrieves commit verification data for multiple message IDs.
func (h *GetBatchCCVDataForMessageHandler) Handle(ctx context.Context, req *pb.BatchGetVerifierResultForMessageRequest) (*pb.BatchGetVerifierResultForMessageResponse, error) {
committeeID := LoadCommitteeIDFromContext(ctx)
ctx = scope.WithCommitteeID(ctx, committeeID)

reqLogger := h.logger(ctx)
reqLogger.Infof("Received batch verifier result request for %d requests", len(req.GetRequests()))

// Validate batch size limits
if len(req.GetRequests()) == 0 {
return nil, grpcstatus.Errorf(codes.InvalidArgument, "requests cannot be empty")
}
if len(req.GetRequests()) > h.maxMessageIDsPerBatch {
return nil, grpcstatus.Errorf(codes.InvalidArgument, "too many requests: %d, maximum allowed: %d", len(req.GetRequests()), h.maxMessageIDsPerBatch)
}

// Convert proto message IDs to model.MessageID and track original requests
messageIDs := make([]model.MessageID, len(req.GetRequests()))
originalRequests := make(map[string]*pb.GetVerifierResultForMessageRequest)
for i, request := range req.GetRequests() {
messageIDs[i] = request.GetMessageId()
messageIDHex := ethcommon.Bytes2Hex(request.GetMessageId())
originalRequests[messageIDHex] = request
}

// Call storage for efficient batch retrieval
results, err := h.storage.GetBatchCCVData(ctx, messageIDs, committeeID)
if err != nil {
reqLogger.Errorf("Failed to retrieve batch CCV data: %v", err)
return nil, grpcstatus.Errorf(codes.Internal, "failed to retrieve batch data: %v", err)
}
// Prepare response
response := &pb.BatchGetVerifierResultForMessageResponse{
Results: make([]*pb.VerifierResult, 0),
Errors: make([]*status.Status, 0),
}

// Process results and track which message IDs were found
foundMessageIDs := make(map[string]bool)
for messageIDHex, report := range results {
// Map aggregated report to proto
ccvData, err := model.MapAggregatedReportToCCVDataProto(report, h.committee)
if err != nil {
reqLogger.Errorf("Failed to map aggregated report to proto for message ID %s: %v", messageIDHex, err)
// Add error for this specific message
errorStatus := grpcstatus.New(codes.Internal, "failed to map aggregated report").Proto()
response.Errors = append(response.Errors, errorStatus)
continue
}

// Create VerifierResult
verifierResult := &pb.VerifierResult{
Message: ccvData.Message,
SourceVerifierAddress: ccvData.SourceVerifierAddress,
DestVerifierAddress: ccvData.DestVerifierAddress,
CcvData: ccvData.CcvData,
Timestamp: ccvData.Timestamp,
Sequence: ccvData.Sequence,
}

response.Results = append(response.Results, verifierResult)
foundMessageIDs[messageIDHex] = true
}

// Add errors for message IDs that were not found
for _, request := range req.GetRequests() {
messageIDHex := ethcommon.Bytes2Hex(request.GetMessageId())
if !foundMessageIDs[messageIDHex] {
errorStatus := grpcstatus.New(codes.NotFound, "message ID not found").Proto()
response.Errors = append(response.Errors, errorStatus)
}
}

reqLogger.Infof("Batch request completed: %d found, %d errors", len(response.Results), len(response.Errors))
return response, nil
}

// NewGetBatchCCVDataForMessageHandler creates a new instance of GetBatchCCVDataForMessageHandler.
func NewGetBatchCCVDataForMessageHandler(storage common.CommitVerificationAggregatedStore, committee map[string]*model.Committee, maxMessageIDsPerBatch int, l logger.SugaredLogger) *GetBatchCCVDataForMessageHandler {
return &GetBatchCCVDataForMessageHandler{
storage: storage,
committee: committee,
l: l,
maxMessageIDsPerBatch: maxMessageIDsPerBatch,
}
}
21 changes: 9 additions & 12 deletions aggregator/pkg/handlers/get_messages_since.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package handlers

import (
"context"
"time"

"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/common"
"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/model"
Expand All @@ -13,11 +12,10 @@ import (
)

type GetMessagesSinceHandler struct {
storage common.CommitVerificationAggregatedStore
committee map[string]*model.Committee
maxAnonymousGetMessageSinceRange time.Duration
l logger.SugaredLogger
m common.AggregatorMonitoring
storage common.CommitVerificationAggregatedStore
committee map[string]*model.Committee
l logger.SugaredLogger
m common.AggregatorMonitoring
}

func (h *GetMessagesSinceHandler) logger(ctx context.Context) logger.SugaredLogger {
Expand Down Expand Up @@ -72,12 +70,11 @@ func (h *GetMessagesSinceHandler) Handle(ctx context.Context, req *pb.GetMessage
}

// NewGetMessagesSinceHandler creates a new instance of GetMessagesSinceHandler.
func NewGetMessagesSinceHandler(storage common.CommitVerificationAggregatedStore, committee map[string]*model.Committee, l logger.SugaredLogger, m common.AggregatorMonitoring, maxAnonymousGetMessageSinceRange time.Duration) *GetMessagesSinceHandler {
func NewGetMessagesSinceHandler(storage common.CommitVerificationAggregatedStore, committee map[string]*model.Committee, l logger.SugaredLogger, m common.AggregatorMonitoring) *GetMessagesSinceHandler {
return &GetMessagesSinceHandler{
storage: storage,
committee: committee,
l: l,
m: m,
maxAnonymousGetMessageSinceRange: maxAnonymousGetMessageSinceRange,
storage: storage,
committee: committee,
l: l,
m: m,
}
}
50 changes: 36 additions & 14 deletions aggregator/pkg/model/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,27 +344,32 @@ func (c *APIKeyConfig) ValidateAPIKey(apiKey string) error {
// AggregatorConfig is the root configuration for the pb.
type AggregatorConfig struct {
// CommitteeID are just arbitrary names for different committees this is a concept internal to the aggregator
Committees map[CommitteeID]*Committee `toml:"committees"`
Server ServerConfig `toml:"server"`
Storage *StorageConfig `toml:"storage"`
APIKeys APIKeyConfig `toml:"-"`
ChainStatuses ChainStatusConfig `toml:"chainStatuses"`
Aggregation AggregationConfig `toml:"aggregation"`
OrphanRecovery OrphanRecoveryConfig `toml:"orphanRecovery"`
RateLimiting RateLimitingConfig `toml:"rateLimiting"`
HealthCheck HealthCheckConfig `toml:"healthCheck"`
DisableValidation bool `toml:"disableValidation"`
StubMode bool `toml:"stubQuorumValidation"`
Monitoring MonitoringConfig `toml:"monitoring"`
PyroscopeURL string `toml:"pyroscope_url"`
MaxAnonymousGetMessageSinceRange int64 `toml:"maxAnonymousGetMessageSinceRange"`
Committees map[CommitteeID]*Committee `toml:"committees"`
Server ServerConfig `toml:"server"`
Storage *StorageConfig `toml:"storage"`
APIKeys APIKeyConfig `toml:"-"`
ChainStatuses ChainStatusConfig `toml:"chainStatuses"`
Aggregation AggregationConfig `toml:"aggregation"`
OrphanRecovery OrphanRecoveryConfig `toml:"orphanRecovery"`
RateLimiting RateLimitingConfig `toml:"rateLimiting"`
HealthCheck HealthCheckConfig `toml:"healthCheck"`
DisableValidation bool `toml:"disableValidation"`
StubMode bool `toml:"stubQuorumValidation"`
Monitoring MonitoringConfig `toml:"monitoring"`
PyroscopeURL string `toml:"pyroscope_url"`
// MaxMessageIDsPerBatch limits the number of message IDs per batch verifier result request
MaxMessageIDsPerBatch int `toml:"maxMessageIDsPerBatch"`
}

// SetDefaults sets default values for the configuration.
func (c *AggregatorConfig) SetDefaults() {
if c.ChainStatuses.MaxChainStatusesPerRequest == 0 {
c.ChainStatuses.MaxChainStatusesPerRequest = 1000
}
// Batch verifier result defaults
if c.MaxMessageIDsPerBatch == 0 {
c.MaxMessageIDsPerBatch = 100
}
// Aggregation defaults
if c.Aggregation.ChannelBufferSize == 0 {
// Set to 10 by default matching the number of background workers
Expand Down Expand Up @@ -442,6 +447,18 @@ func (c *AggregatorConfig) ValidateChainStatusConfig() error {
return nil
}

// ValidateBatchConfig validates the batch verifier result configuration.
func (c *AggregatorConfig) ValidateBatchConfig() error {
if c.MaxMessageIDsPerBatch <= 0 {
return errors.New("maxMessageIDsPerBatch must be greater than 0")
}
if c.MaxMessageIDsPerBatch > 1000 {
return errors.New("maxMessageIDsPerBatch cannot exceed 1000")
}

return nil
}

// ValidateAggregationConfig validates the aggregation configuration.
func (c *AggregatorConfig) ValidateAggregationConfig() error {
if c.Aggregation.ChannelBufferSize <= 0 {
Expand Down Expand Up @@ -502,6 +519,11 @@ func (c *AggregatorConfig) Validate() error {
return fmt.Errorf("chain status configuration error: %w", err)
}

// Validate batch configuration
if err := c.ValidateBatchConfig(); err != nil {
return fmt.Errorf("batch configuration error: %w", err)
}

// Validate aggregation configuration
if err := c.ValidateAggregationConfig(); err != nil {
return fmt.Errorf("aggregation configuration error: %w", err)
Expand Down
9 changes: 8 additions & 1 deletion aggregator/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Server struct {
writeCommitCCVNodeDataHandler *handlers.WriteCommitCCVNodeDataHandler
getMessagesSinceHandler *handlers.GetMessagesSinceHandler
getCCVDataForMessageHandler *handlers.GetCCVDataForMessageHandler
getBatchCCVDataForMessageHandler *handlers.GetBatchCCVDataForMessageHandler
writeChainStatusHandler *handlers.WriteChainStatusHandler
readChainStatusHandler *handlers.ReadChainStatusHandler
chainStatusStorage common.ChainStatusStorageInterface
Expand Down Expand Up @@ -82,6 +83,10 @@ func (s *Server) GetVerifierResultForMessage(ctx context.Context, req *pb.GetVer
return s.getCCVDataForMessageHandler.Handle(ctx, req)
}

func (s *Server) BatchGetVerifierResultForMessage(ctx context.Context, req *pb.BatchGetVerifierResultForMessageRequest) (*pb.BatchGetVerifierResultForMessageResponse, error) {
return s.getBatchCCVDataForMessageHandler.Handle(ctx, req)
}

func (s *Server) GetMessagesSince(ctx context.Context, req *pb.GetMessagesSinceRequest) (*pb.GetMessagesSinceResponse, error) {
return s.getMessagesSinceHandler.Handle(ctx, req)
}
Expand Down Expand Up @@ -255,8 +260,9 @@ func NewServer(l logger.SugaredLogger, config *model.AggregatorConfig) *Server {

writeHandler := handlers.NewWriteCommitCCVNodeDataHandler(store, agg, l, config.DisableValidation, validator)
readCommitCCVNodeDataHandler := handlers.NewReadCommitCCVNodeDataHandler(store, config.DisableValidation, l)
getMessagesSinceHandler := handlers.NewGetMessagesSinceHandler(store, config.Committees, l, aggMonitoring, time.Duration(config.MaxAnonymousGetMessageSinceRange)*time.Second)
getMessagesSinceHandler := handlers.NewGetMessagesSinceHandler(store, config.Committees, l, aggMonitoring)
getCCVDataForMessageHandler := handlers.NewGetCCVDataForMessageHandler(store, config.Committees, l)
getBatchCCVDataForMessageHandler := handlers.NewGetBatchCCVDataForMessageHandler(store, config.Committees, config.MaxMessageIDsPerBatch, l)
batchWriteCommitCCVNodeDataHandler := handlers.NewBatchWriteCommitCCVNodeDataHandler(writeHandler)

// Initialize chain status storage
Expand Down Expand Up @@ -343,6 +349,7 @@ func NewServer(l logger.SugaredLogger, config *model.AggregatorConfig) *Server {
writeCommitCCVNodeDataHandler: writeHandler,
getMessagesSinceHandler: getMessagesSinceHandler,
getCCVDataForMessageHandler: getCCVDataForMessageHandler,
getBatchCCVDataForMessageHandler: getBatchCCVDataForMessageHandler,
writeChainStatusHandler: writeChainStatusHandler,
readChainStatusHandler: readChainStatusHandler,
chainStatusStorage: chainStatusStorage,
Expand Down
4 changes: 4 additions & 0 deletions aggregator/pkg/storage/ddb/dynamodb_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ func (d *DynamoDBStorage) GetCCVData(ctx context.Context, messageID model.Messag
return report, nil
}

func (d *DynamoDBStorage) GetBatchCCVData(ctx context.Context, messageIDs []model.MessageID, committeeID string) (map[string]*model.CommitAggregatedReport, error) {
return nil, errors.New("batch CCV data retrieval is not yet implemented for DynamoDB storage")
}

func (d *DynamoDBStorage) ListOrphanedMessageIDs(ctx context.Context, committeeID model.CommitteeID) (<-chan model.MessageID, <-chan error) {
resultChan := make(chan model.MessageID, 100)
errorChan := make(chan error, 1)
Expand Down
19 changes: 19 additions & 0 deletions aggregator/pkg/storage/memory/in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package memory
import (
"bytes"
"context"
"encoding/hex"
"errors"
"sync"

Expand Down Expand Up @@ -95,6 +96,24 @@ func (s *InMemoryStorage) GetCCVData(_ context.Context, messageID model.MessageI
return nil, nil
}

// GetBatchCCVData retrieves commit verification data for multiple message IDs.
func (s *InMemoryStorage) GetBatchCCVData(_ context.Context, messageIDs []model.MessageID, committeeID string) (map[string]*model.CommitAggregatedReport, error) {
results := make(map[string]*model.CommitAggregatedReport)

for _, messageID := range messageIDs {
id := model.GetAggregatedReportID(messageID, committeeID)
if value, ok := s.aggregatedReports.Load(id); ok {
if report, ok := value.(*model.CommitAggregatedReport); ok && report.CommitteeID == committeeID {
// Use hex encoding to match PostgreSQL implementation
messageIDHex := hex.EncodeToString(messageID)
results[messageIDHex] = report
}
}
}

return results, nil
}

// ListOrphanedMessageIDs streams unique (messageID, committeeID) combinations that have verification records but no aggregated reports.
// Returns a channel for pairs and a channel for errors. Both channels will be closed when iteration is complete.
func (s *InMemoryStorage) ListOrphanedMessageIDs(ctx context.Context, committeeID model.CommitteeID) (<-chan model.MessageID, <-chan error) {
Expand Down
Loading
Loading