diff --git a/aggregator/pkg/common/storage.go b/aggregator/pkg/common/storage.go index daa13242..f6daf987 100644 --- a/aggregator/pkg/common/storage.go +++ b/aggregator/pkg/common/storage.go @@ -25,6 +25,9 @@ type CommitVerificationAggregatedStore interface { QueryAggregatedReports(ctx context.Context, start int64, committeeID string, token *string) (*model.PaginatedAggregatedReports, error) // GetCCVData retrieves the aggregated CCV data for a specific message ID. GetCCVData(ctx context.Context, messageID model.MessageID, committeeID string) (*model.CommitAggregatedReport, error) + // GetBatchCCVData retrieves the aggregated CCV data for multiple message IDs efficiently. + // Returns a map of messageID hex string to CommitAggregatedReport. Missing message IDs are not included in the map. + GetBatchCCVData(ctx context.Context, messageIDs []model.MessageID, committeeID string) (map[string]*model.CommitAggregatedReport, error) } // ChainStatus represents chain status data with finalized block height and disabled flag. diff --git a/aggregator/pkg/handlers/batch_write_commit_ccv_data.go b/aggregator/pkg/handlers/batch_write_commit_ccv_data.go index 2910fed1..f4516513 100644 --- a/aggregator/pkg/handlers/batch_write_commit_ccv_data.go +++ b/aggregator/pkg/handlers/batch_write_commit_ccv_data.go @@ -4,13 +4,14 @@ import ( "context" "sync" + "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + grpcstatus "google.golang.org/grpc/status" //nolint:gci "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/scope" "github.com/smartcontractkit/chainlink-common/pkg/logger" - pb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go/v1" + pb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go/v1" //nolint:gci ) // WriteCommitCCVNodeDataHandler handles requests to write commit verification records. @@ -26,7 +27,7 @@ func (h *BatchWriteCommitCCVNodeDataHandler) logger(ctx context.Context) logger. func (h *BatchWriteCommitCCVNodeDataHandler) Handle(ctx context.Context, req *pb.BatchWriteCommitCCVNodeDataRequest) (*pb.BatchWriteCommitCCVNodeDataResponse, error) { requests := req.GetRequests() responses := make([]*pb.WriteCommitCCVNodeDataResponse, len(requests)) - errors := make([]error, len(requests)) + errors := make([]*status.Status, len(requests)) wg := sync.WaitGroup{} @@ -36,20 +37,23 @@ func (h *BatchWriteCommitCCVNodeDataHandler) Handle(ctx context.Context, req *pb defer wg.Done() resp, err := h.handler.Handle(ctx, r) if err != nil { - statusErr, ok := status.FromError(err) + statusErr, ok := grpcstatus.FromError(err) if !ok { h.logger(ctx).Errorf("unexpected error type: %v", err) - errors[i] = status.Error(codes.Unknown, "unexpected error") + errors[i] = grpcstatus.New(codes.Unknown, "unexpected error").Proto() + } else { + errors[i] = statusErr.Proto() } - errors[i] = statusErr.Err() + } else { + responses[i] = resp } - responses[i] = resp }(i, r) } wg.Wait() return &pb.BatchWriteCommitCCVNodeDataResponse{ Responses: responses, + Errors: errors, }, nil } diff --git a/aggregator/pkg/handlers/get_batch_ccv_data_for_message.go b/aggregator/pkg/handlers/get_batch_ccv_data_for_message.go new file mode 100644 index 00000000..8818f797 --- /dev/null +++ b/aggregator/pkg/handlers/get_batch_ccv_data_for_message.go @@ -0,0 +1,116 @@ +package handlers + +import ( + "context" + + "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" //nolint:gci + + "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/common" + "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/model" + "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/scope" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + + ethcommon "github.com/ethereum/go-ethereum/common" //nolint:goimports + pb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go/v1" //nolint:gci +) + +// GetBatchCCVDataForMessageHandler handles batch requests to retrieve commit verification data for multiple message IDs. +type GetBatchCCVDataForMessageHandler struct { + storage common.CommitVerificationAggregatedStore + committee map[string]*model.Committee + l logger.SugaredLogger + maxMessageIDsPerBatch int +} + +func (h *GetBatchCCVDataForMessageHandler) logger(ctx context.Context) logger.SugaredLogger { + return scope.AugmentLogger(ctx, h.l) +} + +// Handle processes the batch get request and retrieves commit verification data for multiple message IDs. +func (h *GetBatchCCVDataForMessageHandler) Handle(ctx context.Context, req *pb.BatchGetVerifierResultForMessageRequest) (*pb.BatchGetVerifierResultForMessageResponse, error) { + committeeID := LoadCommitteeIDFromContext(ctx) + ctx = scope.WithCommitteeID(ctx, committeeID) + + reqLogger := h.logger(ctx) + reqLogger.Infof("Received batch verifier result request for %d requests", len(req.GetRequests())) + + // Validate batch size limits + if len(req.GetRequests()) == 0 { + return nil, grpcstatus.Errorf(codes.InvalidArgument, "requests cannot be empty") + } + if len(req.GetRequests()) > h.maxMessageIDsPerBatch { + return nil, grpcstatus.Errorf(codes.InvalidArgument, "too many requests: %d, maximum allowed: %d", len(req.GetRequests()), h.maxMessageIDsPerBatch) + } + + // Convert proto message IDs to model.MessageID and track original requests + messageIDs := make([]model.MessageID, len(req.GetRequests())) + originalRequests := make(map[string]*pb.GetVerifierResultForMessageRequest) + for i, request := range req.GetRequests() { + messageIDs[i] = request.GetMessageId() + messageIDHex := ethcommon.Bytes2Hex(request.GetMessageId()) + originalRequests[messageIDHex] = request + } + + // Call storage for efficient batch retrieval + results, err := h.storage.GetBatchCCVData(ctx, messageIDs, committeeID) + if err != nil { + reqLogger.Errorf("Failed to retrieve batch CCV data: %v", err) + return nil, grpcstatus.Errorf(codes.Internal, "failed to retrieve batch data: %v", err) + } + // Prepare response + response := &pb.BatchGetVerifierResultForMessageResponse{ + Results: make([]*pb.VerifierResult, 0), + Errors: make([]*status.Status, 0), + } + + // Process results and track which message IDs were found + foundMessageIDs := make(map[string]bool) + for messageIDHex, report := range results { + // Map aggregated report to proto + ccvData, err := model.MapAggregatedReportToCCVDataProto(report, h.committee) + if err != nil { + reqLogger.Errorf("Failed to map aggregated report to proto for message ID %s: %v", messageIDHex, err) + // Add error for this specific message + errorStatus := grpcstatus.New(codes.Internal, "failed to map aggregated report").Proto() + response.Errors = append(response.Errors, errorStatus) + continue + } + + // Create VerifierResult + verifierResult := &pb.VerifierResult{ + Message: ccvData.Message, + SourceVerifierAddress: ccvData.SourceVerifierAddress, + DestVerifierAddress: ccvData.DestVerifierAddress, + CcvData: ccvData.CcvData, + Timestamp: ccvData.Timestamp, + Sequence: ccvData.Sequence, + } + + response.Results = append(response.Results, verifierResult) + foundMessageIDs[messageIDHex] = true + } + + // Add errors for message IDs that were not found + for _, request := range req.GetRequests() { + messageIDHex := ethcommon.Bytes2Hex(request.GetMessageId()) + if !foundMessageIDs[messageIDHex] { + errorStatus := grpcstatus.New(codes.NotFound, "message ID not found").Proto() + response.Errors = append(response.Errors, errorStatus) + } + } + + reqLogger.Infof("Batch request completed: %d found, %d errors", len(response.Results), len(response.Errors)) + return response, nil +} + +// NewGetBatchCCVDataForMessageHandler creates a new instance of GetBatchCCVDataForMessageHandler. +func NewGetBatchCCVDataForMessageHandler(storage common.CommitVerificationAggregatedStore, committee map[string]*model.Committee, maxMessageIDsPerBatch int, l logger.SugaredLogger) *GetBatchCCVDataForMessageHandler { + return &GetBatchCCVDataForMessageHandler{ + storage: storage, + committee: committee, + l: l, + maxMessageIDsPerBatch: maxMessageIDsPerBatch, + } +} diff --git a/aggregator/pkg/handlers/get_messages_since.go b/aggregator/pkg/handlers/get_messages_since.go index 140655ca..92d009ef 100644 --- a/aggregator/pkg/handlers/get_messages_since.go +++ b/aggregator/pkg/handlers/get_messages_since.go @@ -2,7 +2,6 @@ package handlers import ( "context" - "time" "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/common" "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/model" @@ -13,11 +12,10 @@ import ( ) type GetMessagesSinceHandler struct { - storage common.CommitVerificationAggregatedStore - committee map[string]*model.Committee - maxAnonymousGetMessageSinceRange time.Duration - l logger.SugaredLogger - m common.AggregatorMonitoring + storage common.CommitVerificationAggregatedStore + committee map[string]*model.Committee + l logger.SugaredLogger + m common.AggregatorMonitoring } func (h *GetMessagesSinceHandler) logger(ctx context.Context) logger.SugaredLogger { @@ -72,12 +70,11 @@ func (h *GetMessagesSinceHandler) Handle(ctx context.Context, req *pb.GetMessage } // NewGetMessagesSinceHandler creates a new instance of GetMessagesSinceHandler. -func NewGetMessagesSinceHandler(storage common.CommitVerificationAggregatedStore, committee map[string]*model.Committee, l logger.SugaredLogger, m common.AggregatorMonitoring, maxAnonymousGetMessageSinceRange time.Duration) *GetMessagesSinceHandler { +func NewGetMessagesSinceHandler(storage common.CommitVerificationAggregatedStore, committee map[string]*model.Committee, l logger.SugaredLogger, m common.AggregatorMonitoring) *GetMessagesSinceHandler { return &GetMessagesSinceHandler{ - storage: storage, - committee: committee, - l: l, - m: m, - maxAnonymousGetMessageSinceRange: maxAnonymousGetMessageSinceRange, + storage: storage, + committee: committee, + l: l, + m: m, } } diff --git a/aggregator/pkg/model/configuration.go b/aggregator/pkg/model/configuration.go index 056a92fe..8184682e 100644 --- a/aggregator/pkg/model/configuration.go +++ b/aggregator/pkg/model/configuration.go @@ -344,20 +344,21 @@ func (c *APIKeyConfig) ValidateAPIKey(apiKey string) error { // AggregatorConfig is the root configuration for the pb. type AggregatorConfig struct { // CommitteeID are just arbitrary names for different committees this is a concept internal to the aggregator - Committees map[CommitteeID]*Committee `toml:"committees"` - Server ServerConfig `toml:"server"` - Storage *StorageConfig `toml:"storage"` - APIKeys APIKeyConfig `toml:"-"` - ChainStatuses ChainStatusConfig `toml:"chainStatuses"` - Aggregation AggregationConfig `toml:"aggregation"` - OrphanRecovery OrphanRecoveryConfig `toml:"orphanRecovery"` - RateLimiting RateLimitingConfig `toml:"rateLimiting"` - HealthCheck HealthCheckConfig `toml:"healthCheck"` - DisableValidation bool `toml:"disableValidation"` - StubMode bool `toml:"stubQuorumValidation"` - Monitoring MonitoringConfig `toml:"monitoring"` - PyroscopeURL string `toml:"pyroscope_url"` - MaxAnonymousGetMessageSinceRange int64 `toml:"maxAnonymousGetMessageSinceRange"` + Committees map[CommitteeID]*Committee `toml:"committees"` + Server ServerConfig `toml:"server"` + Storage *StorageConfig `toml:"storage"` + APIKeys APIKeyConfig `toml:"-"` + ChainStatuses ChainStatusConfig `toml:"chainStatuses"` + Aggregation AggregationConfig `toml:"aggregation"` + OrphanRecovery OrphanRecoveryConfig `toml:"orphanRecovery"` + RateLimiting RateLimitingConfig `toml:"rateLimiting"` + HealthCheck HealthCheckConfig `toml:"healthCheck"` + DisableValidation bool `toml:"disableValidation"` + StubMode bool `toml:"stubQuorumValidation"` + Monitoring MonitoringConfig `toml:"monitoring"` + PyroscopeURL string `toml:"pyroscope_url"` + // MaxMessageIDsPerBatch limits the number of message IDs per batch verifier result request + MaxMessageIDsPerBatch int `toml:"maxMessageIDsPerBatch"` } // SetDefaults sets default values for the configuration. @@ -365,6 +366,10 @@ func (c *AggregatorConfig) SetDefaults() { if c.ChainStatuses.MaxChainStatusesPerRequest == 0 { c.ChainStatuses.MaxChainStatusesPerRequest = 1000 } + // Batch verifier result defaults + if c.MaxMessageIDsPerBatch == 0 { + c.MaxMessageIDsPerBatch = 100 + } // Aggregation defaults if c.Aggregation.ChannelBufferSize == 0 { // Set to 10 by default matching the number of background workers @@ -442,6 +447,18 @@ func (c *AggregatorConfig) ValidateChainStatusConfig() error { return nil } +// ValidateBatchConfig validates the batch verifier result configuration. +func (c *AggregatorConfig) ValidateBatchConfig() error { + if c.MaxMessageIDsPerBatch <= 0 { + return errors.New("maxMessageIDsPerBatch must be greater than 0") + } + if c.MaxMessageIDsPerBatch > 1000 { + return errors.New("maxMessageIDsPerBatch cannot exceed 1000") + } + + return nil +} + // ValidateAggregationConfig validates the aggregation configuration. func (c *AggregatorConfig) ValidateAggregationConfig() error { if c.Aggregation.ChannelBufferSize <= 0 { @@ -502,6 +519,11 @@ func (c *AggregatorConfig) Validate() error { return fmt.Errorf("chain status configuration error: %w", err) } + // Validate batch configuration + if err := c.ValidateBatchConfig(); err != nil { + return fmt.Errorf("batch configuration error: %w", err) + } + // Validate aggregation configuration if err := c.ValidateAggregationConfig(); err != nil { return fmt.Errorf("aggregation configuration error: %w", err) diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 241d6bd8..95a081e1 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -52,6 +52,7 @@ type Server struct { writeCommitCCVNodeDataHandler *handlers.WriteCommitCCVNodeDataHandler getMessagesSinceHandler *handlers.GetMessagesSinceHandler getCCVDataForMessageHandler *handlers.GetCCVDataForMessageHandler + getBatchCCVDataForMessageHandler *handlers.GetBatchCCVDataForMessageHandler writeChainStatusHandler *handlers.WriteChainStatusHandler readChainStatusHandler *handlers.ReadChainStatusHandler chainStatusStorage common.ChainStatusStorageInterface @@ -82,6 +83,10 @@ func (s *Server) GetVerifierResultForMessage(ctx context.Context, req *pb.GetVer return s.getCCVDataForMessageHandler.Handle(ctx, req) } +func (s *Server) BatchGetVerifierResultForMessage(ctx context.Context, req *pb.BatchGetVerifierResultForMessageRequest) (*pb.BatchGetVerifierResultForMessageResponse, error) { + return s.getBatchCCVDataForMessageHandler.Handle(ctx, req) +} + func (s *Server) GetMessagesSince(ctx context.Context, req *pb.GetMessagesSinceRequest) (*pb.GetMessagesSinceResponse, error) { return s.getMessagesSinceHandler.Handle(ctx, req) } @@ -255,8 +260,9 @@ func NewServer(l logger.SugaredLogger, config *model.AggregatorConfig) *Server { writeHandler := handlers.NewWriteCommitCCVNodeDataHandler(store, agg, l, config.DisableValidation, validator) readCommitCCVNodeDataHandler := handlers.NewReadCommitCCVNodeDataHandler(store, config.DisableValidation, l) - getMessagesSinceHandler := handlers.NewGetMessagesSinceHandler(store, config.Committees, l, aggMonitoring, time.Duration(config.MaxAnonymousGetMessageSinceRange)*time.Second) + getMessagesSinceHandler := handlers.NewGetMessagesSinceHandler(store, config.Committees, l, aggMonitoring) getCCVDataForMessageHandler := handlers.NewGetCCVDataForMessageHandler(store, config.Committees, l) + getBatchCCVDataForMessageHandler := handlers.NewGetBatchCCVDataForMessageHandler(store, config.Committees, config.MaxMessageIDsPerBatch, l) batchWriteCommitCCVNodeDataHandler := handlers.NewBatchWriteCommitCCVNodeDataHandler(writeHandler) // Initialize chain status storage @@ -343,6 +349,7 @@ func NewServer(l logger.SugaredLogger, config *model.AggregatorConfig) *Server { writeCommitCCVNodeDataHandler: writeHandler, getMessagesSinceHandler: getMessagesSinceHandler, getCCVDataForMessageHandler: getCCVDataForMessageHandler, + getBatchCCVDataForMessageHandler: getBatchCCVDataForMessageHandler, writeChainStatusHandler: writeChainStatusHandler, readChainStatusHandler: readChainStatusHandler, chainStatusStorage: chainStatusStorage, diff --git a/aggregator/pkg/storage/ddb/dynamodb_storage.go b/aggregator/pkg/storage/ddb/dynamodb_storage.go index f6862906..a0c043de 100644 --- a/aggregator/pkg/storage/ddb/dynamodb_storage.go +++ b/aggregator/pkg/storage/ddb/dynamodb_storage.go @@ -455,6 +455,10 @@ func (d *DynamoDBStorage) GetCCVData(ctx context.Context, messageID model.Messag return report, nil } +func (d *DynamoDBStorage) GetBatchCCVData(ctx context.Context, messageIDs []model.MessageID, committeeID string) (map[string]*model.CommitAggregatedReport, error) { + return nil, errors.New("batch CCV data retrieval is not yet implemented for DynamoDB storage") +} + func (d *DynamoDBStorage) ListOrphanedMessageIDs(ctx context.Context, committeeID model.CommitteeID) (<-chan model.MessageID, <-chan error) { resultChan := make(chan model.MessageID, 100) errorChan := make(chan error, 1) diff --git a/aggregator/pkg/storage/memory/in_memory.go b/aggregator/pkg/storage/memory/in_memory.go index 95b51631..1d241c09 100644 --- a/aggregator/pkg/storage/memory/in_memory.go +++ b/aggregator/pkg/storage/memory/in_memory.go @@ -4,6 +4,7 @@ package memory import ( "bytes" "context" + "encoding/hex" "errors" "sync" @@ -95,6 +96,24 @@ func (s *InMemoryStorage) GetCCVData(_ context.Context, messageID model.MessageI return nil, nil } +// GetBatchCCVData retrieves commit verification data for multiple message IDs. +func (s *InMemoryStorage) GetBatchCCVData(_ context.Context, messageIDs []model.MessageID, committeeID string) (map[string]*model.CommitAggregatedReport, error) { + results := make(map[string]*model.CommitAggregatedReport) + + for _, messageID := range messageIDs { + id := model.GetAggregatedReportID(messageID, committeeID) + if value, ok := s.aggregatedReports.Load(id); ok { + if report, ok := value.(*model.CommitAggregatedReport); ok && report.CommitteeID == committeeID { + // Use hex encoding to match PostgreSQL implementation + messageIDHex := hex.EncodeToString(messageID) + results[messageIDHex] = report + } + } + } + + return results, nil +} + // ListOrphanedMessageIDs streams unique (messageID, committeeID) combinations that have verification records but no aggregated reports. // Returns a channel for pairs and a channel for errors. Both channels will be closed when iteration is complete. func (s *InMemoryStorage) ListOrphanedMessageIDs(ctx context.Context, committeeID model.CommitteeID) (<-chan model.MessageID, <-chan error) { diff --git a/aggregator/pkg/storage/memory/in_memory_test.go b/aggregator/pkg/storage/memory/in_memory_test.go new file mode 100644 index 00000000..1c5a3f15 --- /dev/null +++ b/aggregator/pkg/storage/memory/in_memory_test.go @@ -0,0 +1,109 @@ +package memory + +import ( + "context" + "encoding/hex" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/common" + "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/model" +) + +const testCommitteeID = "test-committee" + +func TestInMemoryStorage_GetBatchCCVData(t *testing.T) { + storage := NewInMemoryStorageWithTimeProvider(common.NewRealTimeProvider()) + ctx := context.Background() + committeeID := testCommitteeID + + // Create test data + messageID1 := []byte("message1") + messageID2 := []byte("message2") + messageID3 := []byte("message3") // This one won't have data + + report1 := &model.CommitAggregatedReport{ + MessageID: messageID1, + CommitteeID: committeeID, + Sequence: 1, + } + report2 := &model.CommitAggregatedReport{ + MessageID: messageID2, + CommitteeID: committeeID, + Sequence: 2, + } + + // Store test data + err := storage.SubmitReport(ctx, report1) + require.NoError(t, err) + err = storage.SubmitReport(ctx, report2) + require.NoError(t, err) + + // Test batch retrieval + messageIDs := []model.MessageID{messageID1, messageID2, messageID3} + results, err := storage.GetBatchCCVData(ctx, messageIDs, committeeID) + require.NoError(t, err) + + // Verify results + assert.Len(t, results, 2, "Should return 2 results (excluding messageID3)") + + messageID1Hex := hex.EncodeToString(messageID1) + messageID2Hex := hex.EncodeToString(messageID2) + messageID3Hex := hex.EncodeToString(messageID3) + + assert.Contains(t, results, messageID1Hex) + assert.Contains(t, results, messageID2Hex) + assert.NotContains(t, results, messageID3Hex) + + assert.Equal(t, report1, results[messageID1Hex]) + assert.Equal(t, report2, results[messageID2Hex]) +} + +func TestInMemoryStorage_GetBatchCCVData_EmptyMessageIDs(t *testing.T) { + storage := NewInMemoryStorageWithTimeProvider(common.NewRealTimeProvider()) + ctx := context.Background() + committeeID := testCommitteeID + + // Test with empty message IDs + results, err := storage.GetBatchCCVData(ctx, []model.MessageID{}, committeeID) + require.NoError(t, err) + assert.Empty(t, results) +} + +func TestInMemoryStorage_GetBatchCCVData_NoMatchingData(t *testing.T) { + storage := NewInMemoryStorageWithTimeProvider(common.NewRealTimeProvider()) + ctx := context.Background() + committeeID := testCommitteeID + + // Test with message IDs that don't exist + messageIDs := []model.MessageID{[]byte("nonexistent1"), []byte("nonexistent2")} + results, err := storage.GetBatchCCVData(ctx, messageIDs, committeeID) + require.NoError(t, err) + assert.Empty(t, results) +} + +func TestInMemoryStorage_GetBatchCCVData_WrongCommittee(t *testing.T) { + storage := NewInMemoryStorageWithTimeProvider(common.NewRealTimeProvider()) + ctx := context.Background() + committeeID := testCommitteeID + wrongCommitteeID := "wrong-committee" + + // Create and store test data + messageID := []byte("message1") + report := &model.CommitAggregatedReport{ + MessageID: messageID, + CommitteeID: committeeID, + Sequence: 1, + } + + err := storage.SubmitReport(ctx, report) + require.NoError(t, err) + + // Try to retrieve with wrong committee ID + messageIDs := []model.MessageID{messageID} + results, err := storage.GetBatchCCVData(ctx, messageIDs, wrongCommitteeID) + require.NoError(t, err) + assert.Empty(t, results, "Should return empty results for wrong committee ID") +} diff --git a/aggregator/pkg/storage/metrics_aware_storage.go b/aggregator/pkg/storage/metrics_aware_storage.go index 7ecccaa4..085b955d 100644 --- a/aggregator/pkg/storage/metrics_aware_storage.go +++ b/aggregator/pkg/storage/metrics_aware_storage.go @@ -17,6 +17,7 @@ const ( queryAggregatedReportsOp = "QueryAggregatedReports" getCCVDataOp = "GetCCVData" + getBatchCCVDataOp = "GetBatchCCVData" submitReportOp = "SubmitReport" ListOrphanedMessageIDsOp = "ListOrphanedMessageIDs" ) @@ -72,6 +73,12 @@ func (s *MetricsAwareStorage) GetCCVData(ctx context.Context, messageID model.Me }) } +func (s *MetricsAwareStorage) GetBatchCCVData(ctx context.Context, messageIDs []model.MessageID, committeeID string) (map[string]*model.CommitAggregatedReport, error) { + return captureMetrics(ctx, s.metrics(ctx, getBatchCCVDataOp), func() (map[string]*model.CommitAggregatedReport, error) { + return s.inner.GetBatchCCVData(ctx, messageIDs, committeeID) + }) +} + func (s *MetricsAwareStorage) SubmitReport(ctx context.Context, report *model.CommitAggregatedReport) error { return captureMetricsNoReturn(ctx, s.metrics(ctx, submitReportOp), func() error { return s.inner.SubmitReport(ctx, report) diff --git a/aggregator/pkg/storage/postgres/database_storage.go b/aggregator/pkg/storage/postgres/database_storage.go index 616ea20f..4aa2f6ba 100644 --- a/aggregator/pkg/storage/postgres/database_storage.go +++ b/aggregator/pkg/storage/postgres/database_storage.go @@ -8,6 +8,7 @@ import ( "fmt" "sort" "strconv" + "strings" "time" "github.com/ethereum/go-ethereum/common" @@ -575,6 +576,122 @@ func (d *DatabaseStorage) GetCCVData(ctx context.Context, messageID model.Messag return report, nil } +func (d *DatabaseStorage) GetBatchCCVData(ctx context.Context, messageIDs []model.MessageID, committeeID string) (map[string]*model.CommitAggregatedReport, error) { + if len(messageIDs) == 0 { + return make(map[string]*model.CommitAggregatedReport), nil + } + + // Convert message IDs to hex strings for the query + messageIDHexValues := make([]string, len(messageIDs)) + for i, messageID := range messageIDs { + messageIDHexValues[i] = common.Bytes2Hex(messageID) + } + + // Build parameterized query with placeholders for IN clause + placeholders := make([]string, len(messageIDHexValues)) + args := make([]any, len(messageIDHexValues)+1) + for i, messageIDHex := range messageIDHexValues { + placeholders[i] = fmt.Sprintf("$%d", i+1) + args[i] = messageIDHex + } + args[len(messageIDHexValues)] = committeeID + + stmt := fmt.Sprintf(` + SELECT + car.message_id, + car.committee_id, + car.created_at, + car.seq_num, + cvr.participant_id, + cvr.signer_address, + cvr.signature_r, + cvr.signature_s, + cvr.ccv_node_data + FROM commit_aggregated_reports car + LEFT JOIN LATERAL UNNEST(car.verification_record_ids) WITH ORDINALITY AS vid(id, ord) ON true + LEFT JOIN commit_verification_records cvr ON cvr.id = vid.id + WHERE car.message_id IN (%s) AND car.committee_id = $%d + ORDER BY car.message_id, car.seq_num DESC, vid.ord + `, strings.Join(placeholders, ","), len(messageIDHexValues)+1) + + type joinedRecord struct { + MessageID string `db:"message_id"` + CommitteeID string `db:"committee_id"` + CreatedAt time.Time `db:"created_at"` + SeqNum int64 `db:"seq_num"` + ParticipantID sql.NullString `db:"participant_id"` + SignerAddress sql.NullString `db:"signer_address"` + SignatureR []byte `db:"signature_r"` + SignatureS []byte `db:"signature_s"` + CCVNodeData []byte `db:"ccv_node_data"` + } + + rows, err := d.ds.QueryContext(ctx, stmt, args...) + if err != nil { + return nil, fmt.Errorf("failed to query batch aggregated reports: %w", err) + } + defer func() { + _ = rows.Close() + }() + + reports := make(map[string]*model.CommitAggregatedReport) + + for rows.Next() { + var record joinedRecord + err := rows.Scan( + &record.MessageID, + &record.CommitteeID, + &record.CreatedAt, + &record.SeqNum, + &record.ParticipantID, + &record.SignerAddress, + &record.SignatureR, + &record.SignatureS, + &record.CCVNodeData, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + + // Get or create report for this message ID + report, exists := reports[record.MessageID] + if !exists { + messageIDBytes := common.Hex2Bytes(record.MessageID) + report = &model.CommitAggregatedReport{ + MessageID: messageIDBytes, + CommitteeID: record.CommitteeID, + Verifications: []*model.CommitVerificationRecord{}, + Sequence: record.SeqNum, + WrittenAt: record.CreatedAt.Unix(), + } + reports[record.MessageID] = report + } + + // Add verification record if it exists + if record.ParticipantID.Valid && record.SignerAddress.Valid && len(record.CCVNodeData) > 0 { + var msgWithCCV pb.MessageWithCCVNodeData + err = proto.Unmarshal(record.CCVNodeData, &msgWithCCV) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal ccv node data: %w", err) + } + + verification := &model.CommitVerificationRecord{ + MessageWithCCVNodeData: copyMessageWithCCVNodeData(&msgWithCCV), + IdentifierSigner: reconstructIdentifierSigner(record.ParticipantID.String, record.SignerAddress.String, record.CommitteeID, record.SignatureR, record.SignatureS), + CommitteeID: record.CommitteeID, + } + + report.Verifications = append(report.Verifications, verification) + } + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating rows: %w", err) + } + + return reports, nil +} + func (d *DatabaseStorage) SubmitReport(ctx context.Context, report *model.CommitAggregatedReport) error { if report == nil { return fmt.Errorf("aggregated report cannot be nil") diff --git a/aggregator/tests/commit_verification_api_test.go b/aggregator/tests/commit_verification_api_test.go index 6407bac9..ea5408e1 100644 --- a/aggregator/tests/commit_verification_api_test.go +++ b/aggregator/tests/commit_verification_api_test.go @@ -1423,3 +1423,430 @@ func TestPostQuorumAggregationWhenAggregationAfterQuorumEnabled(t *testing.T) { }) } } + +// TestBatchGetVerifierResult_HappyPath tests basic batch API functionality with multiple messages. +func TestBatchGetVerifierResult_HappyPath(t *testing.T) { + t.Parallel() + storageTypes := []string{"postgres"} // DynamoDB not implemented for batch operations + + testFunc := func(t *testing.T, storageType string) { + sourceVerifierAddress, destVerifierAddress := GenerateVerifierAddresses(t) + signer1 := NewSignerFixture(t, "node1") + signer2 := NewSignerFixture(t, "node2") + signer3 := NewSignerFixture(t, "node3") + config := map[string]*model.Committee{ + "default": { + SourceVerifierAddresses: map[string]string{ + "1": common.Bytes2Hex(sourceVerifierAddress), + }, + QuorumConfigs: map[string]*model.QuorumConfig{ + "2": { + Threshold: 2, + Signers: []model.Signer{ + signer1.Signer, + signer2.Signer, + signer3.Signer, + }, + CommitteeVerifierAddress: common.BytesToAddress(destVerifierAddress).Hex(), + }, + }, + }, + } + aggregatorClient, ccvDataClient, cleanup, err := CreateServerAndClient(t, WithCommitteeConfig(config), WithStorageType(storageType)) + t.Cleanup(cleanup) + require.NoError(t, err, "failed to create server and client") + + // Create two different messages + message1 := NewProtocolMessage(t) + message1.Nonce = protocol.Nonce(1001) + messageId1, err := message1.MessageID() + require.NoError(t, err, "failed to compute message ID 1") + + message2 := NewProtocolMessage(t) + message2.Nonce = protocol.Nonce(2002) + messageId2, err := message2.MessageID() + require.NoError(t, err, "failed to compute message ID 2") + + // Ensure messages have different IDs + require.NotEqual(t, messageId1, messageId2, "message IDs should be different") + + // Create first aggregated report (message1 with signer1 and signer2) + ccvNodeData1_1 := NewMessageWithCCVNodeData(t, message1, sourceVerifierAddress, WithSignatureFrom(t, signer1)) + resp1_1, err := aggregatorClient.WriteCommitCCVNodeData(t.Context(), NewWriteCommitCCVNodeDataRequest(ccvNodeData1_1)) + require.NoError(t, err, "WriteCommitCCVNodeData failed for message1/signer1") + require.Equal(t, pb.WriteStatus_SUCCESS, resp1_1.Status, "expected WriteStatus_SUCCESS") + + ccvNodeData1_2 := NewMessageWithCCVNodeData(t, message1, sourceVerifierAddress, WithSignatureFrom(t, signer2)) + resp1_2, err := aggregatorClient.WriteCommitCCVNodeData(t.Context(), NewWriteCommitCCVNodeDataRequest(ccvNodeData1_2)) + require.NoError(t, err, "WriteCommitCCVNodeData failed for message1/signer2") + require.Equal(t, pb.WriteStatus_SUCCESS, resp1_2.Status, "expected WriteStatus_SUCCESS") + + // Create second aggregated report (message2 with signer2 and signer3) + ccvNodeData2_2 := NewMessageWithCCVNodeData(t, message2, sourceVerifierAddress, WithSignatureFrom(t, signer2)) + resp2_2, err := aggregatorClient.WriteCommitCCVNodeData(t.Context(), NewWriteCommitCCVNodeDataRequest(ccvNodeData2_2)) + require.NoError(t, err, "WriteCommitCCVNodeData failed for message2/signer2") + require.Equal(t, pb.WriteStatus_SUCCESS, resp2_2.Status, "expected WriteStatus_SUCCESS") + + ccvNodeData2_3 := NewMessageWithCCVNodeData(t, message2, sourceVerifierAddress, WithSignatureFrom(t, signer3)) + resp2_3, err := aggregatorClient.WriteCommitCCVNodeData(t.Context(), NewWriteCommitCCVNodeDataRequest(ccvNodeData2_3)) + require.NoError(t, err, "WriteCommitCCVNodeData failed for message2/signer3") + require.Equal(t, pb.WriteStatus_SUCCESS, resp2_3.Status, "expected WriteStatus_SUCCESS") + + // Wait for aggregation to complete + time.Sleep(100 * time.Millisecond) + + // Test batch retrieval with both message IDs + batchReq := &pb.BatchGetVerifierResultForMessageRequest{ + Requests: []*pb.GetVerifierResultForMessageRequest{ + {MessageId: messageId1[:]}, + {MessageId: messageId2[:]}, + }, + } + + batchResp, err := ccvDataClient.BatchGetVerifierResultForMessage(t.Context(), batchReq) + require.NoError(t, err, "BatchGetVerifierResultForMessage failed") + require.NotNil(t, batchResp, "batch response should not be nil") + + // Verify we got results for both messages + require.Len(t, batchResp.Results, 2, "should have 2 results") + require.Len(t, batchResp.Errors, 0, "should have no errors") + + // Verify both messages are present + resultsByNonce := make(map[uint64]*pb.VerifierResult) + for _, result := range batchResp.Results { + resultsByNonce[result.GetMessage().GetNonce()] = result + } + + result1, found := resultsByNonce[1001] + require.True(t, found, "message1 should be found in batch results") + require.Equal(t, sourceVerifierAddress, result1.SourceVerifierAddress, "source verifier address should match") + require.Equal(t, destVerifierAddress, result1.DestVerifierAddress, "dest verifier address should match") + require.NotNil(t, result1.CcvData, "CCV data should not be nil") + + result2, found := resultsByNonce[2002] + require.True(t, found, "message2 should be found in batch results") + require.Equal(t, sourceVerifierAddress, result2.SourceVerifierAddress, "source verifier address should match") + require.Equal(t, destVerifierAddress, result2.DestVerifierAddress, "dest verifier address should match") + require.NotNil(t, result2.CcvData, "CCV data should not be nil") + } + + for _, storageType := range storageTypes { + t.Run(storageType, func(t *testing.T) { + t.Parallel() + testFunc(t, storageType) + }) + } +} + +// TestBatchGetVerifierResult_ReAggregation tests that batch API returns updated results after re-aggregation. +func TestBatchGetVerifierResult_ReAggregation(t *testing.T) { + t.Parallel() + storageTypes := []string{"postgres"} + + testFunc := func(t *testing.T, storageType string) { + sourceVerifierAddress, destVerifierAddress := GenerateVerifierAddresses(t) + signer1 := NewSignerFixture(t, "node1") + signer2 := NewSignerFixture(t, "node2") + config := map[string]*model.Committee{ + "default": { + SourceVerifierAddresses: map[string]string{ + "1": common.Bytes2Hex(sourceVerifierAddress), + }, + QuorumConfigs: map[string]*model.QuorumConfig{ + "2": { + Threshold: 2, + Signers: []model.Signer{ + signer1.Signer, + signer2.Signer, + }, + CommitteeVerifierAddress: common.BytesToAddress(destVerifierAddress).Hex(), + }, + }, + }, + } + + // Create server with enabled EnableAggregationAfterQuorum feature for re-aggregation + configOption := func(c *model.AggregatorConfig, clientConfig *ClientConfig) (*model.AggregatorConfig, *ClientConfig) { + c.Aggregation.EnableAggregationAfterQuorum = true // Enable re-aggregation after quorum + return c, clientConfig + } + + aggregatorClient, ccvDataClient, cleanup, err := CreateServerAndClient(t, WithCommitteeConfig(config), WithStorageType(storageType), configOption) + t.Cleanup(cleanup) + require.NoError(t, err, "failed to create server and client") + + // Create message and aggregate it + message := NewProtocolMessage(t) + message.Nonce = protocol.Nonce(1001) + messageId, err := message.MessageID() + require.NoError(t, err, "failed to compute message ID") + + // Initial aggregation + ccvNodeData1 := NewMessageWithCCVNodeData(t, message, sourceVerifierAddress, WithSignatureFrom(t, signer1)) + _, err = aggregatorClient.WriteCommitCCVNodeData(t.Context(), NewWriteCommitCCVNodeDataRequest(ccvNodeData1)) + require.NoError(t, err, "WriteCommitCCVNodeData failed for signer1") + + ccvNodeData2 := NewMessageWithCCVNodeData(t, message, sourceVerifierAddress, WithSignatureFrom(t, signer2)) + _, err = aggregatorClient.WriteCommitCCVNodeData(t.Context(), NewWriteCommitCCVNodeDataRequest(ccvNodeData2)) + require.NoError(t, err, "WriteCommitCCVNodeData failed for signer2") + + time.Sleep(100 * time.Millisecond) + + // Get initial batch result + batchReq := &pb.BatchGetVerifierResultForMessageRequest{ + Requests: []*pb.GetVerifierResultForMessageRequest{ + {MessageId: messageId[:]}, + }, + } + + batchResp, err := ccvDataClient.BatchGetVerifierResultForMessage(t.Context(), batchReq) + require.NoError(t, err, "BatchGetVerifierResultForMessage failed") + require.Len(t, batchResp.Results, 1, "should have 1 result") + + originalTimestamp := batchResp.Results[0].Timestamp + t.Logf("Original timestamp: %d", originalTimestamp) + + // Wait and submit with newer timestamp to trigger re-aggregation + time.Sleep(1 * time.Second) + newerTimestamp := time.Now().UnixMicro() + ccvNodeData1Newer := NewMessageWithCCVNodeData(t, message, sourceVerifierAddress, + WithSignatureFrom(t, signer1), + WithCustomTimestamp(newerTimestamp)) + + _, err = aggregatorClient.WriteCommitCCVNodeData(t.Context(), NewWriteCommitCCVNodeDataRequest(ccvNodeData1Newer)) + require.NoError(t, err, "WriteCommitCCVNodeData failed for newer timestamp") + + time.Sleep(200 * time.Millisecond) + + // Get batch result after re-aggregation + batchRespReagg, err := ccvDataClient.BatchGetVerifierResultForMessage(t.Context(), batchReq) + require.NoError(t, err, "BatchGetVerifierResultForMessage after re-aggregation failed") + require.Len(t, batchRespReagg.Results, 1, "should have 1 result after re-aggregation") + + newTimestamp := batchRespReagg.Results[0].Timestamp + t.Logf("New timestamp: %d", newTimestamp) + + // Verify the timestamp is newer, indicating re-aggregation occurred + require.Greater(t, newTimestamp, originalTimestamp, + "re-aggregated result should have newer timestamp than original") + } + + for _, storageType := range storageTypes { + t.Run(storageType, func(t *testing.T) { + t.Parallel() + testFunc(t, storageType) + }) + } +} + +// TestBatchGetVerifierResult_DuplicateMessageIDs tests batch API with duplicate message IDs in request. +func TestBatchGetVerifierResult_DuplicateMessageIDs(t *testing.T) { + t.Parallel() + storageTypes := []string{"postgres"} + + testFunc := func(t *testing.T, storageType string) { + sourceVerifierAddress, destVerifierAddress := GenerateVerifierAddresses(t) + signer1 := NewSignerFixture(t, "node1") + signer2 := NewSignerFixture(t, "node2") + config := map[string]*model.Committee{ + "default": { + SourceVerifierAddresses: map[string]string{ + "1": common.Bytes2Hex(sourceVerifierAddress), + }, + QuorumConfigs: map[string]*model.QuorumConfig{ + "2": { + Threshold: 2, + Signers: []model.Signer{ + signer1.Signer, + signer2.Signer, + }, + CommitteeVerifierAddress: common.BytesToAddress(destVerifierAddress).Hex(), + }, + }, + }, + } + aggregatorClient, ccvDataClient, cleanup, err := CreateServerAndClient(t, WithCommitteeConfig(config), WithStorageType(storageType)) + t.Cleanup(cleanup) + require.NoError(t, err, "failed to create server and client") + + // Create and aggregate a message + message := NewProtocolMessage(t) + message.Nonce = protocol.Nonce(1001) + messageId, err := message.MessageID() + require.NoError(t, err, "failed to compute message ID") + + ccvNodeData1 := NewMessageWithCCVNodeData(t, message, sourceVerifierAddress, WithSignatureFrom(t, signer1)) + _, err = aggregatorClient.WriteCommitCCVNodeData(t.Context(), NewWriteCommitCCVNodeDataRequest(ccvNodeData1)) + require.NoError(t, err, "WriteCommitCCVNodeData failed for signer1") + + ccvNodeData2 := NewMessageWithCCVNodeData(t, message, sourceVerifierAddress, WithSignatureFrom(t, signer2)) + _, err = aggregatorClient.WriteCommitCCVNodeData(t.Context(), NewWriteCommitCCVNodeDataRequest(ccvNodeData2)) + require.NoError(t, err, "WriteCommitCCVNodeData failed for signer2") + + time.Sleep(100 * time.Millisecond) + + // Test batch request with duplicate message IDs + batchReqWithDuplicates := &pb.BatchGetVerifierResultForMessageRequest{ + Requests: []*pb.GetVerifierResultForMessageRequest{ + {MessageId: messageId[:]}, + {MessageId: messageId[:]}, // duplicate + {MessageId: messageId[:]}, // another duplicate + }, + } + + batchResp, err := ccvDataClient.BatchGetVerifierResultForMessage(t.Context(), batchReqWithDuplicates) + require.NoError(t, err, "BatchGetVerifierResultForMessage with duplicates should not error") + require.NotNil(t, batchResp, "batch response with duplicates should not be nil") + + // Should have 1 result (deduplicated by the backend) and no errors + require.Len(t, batchResp.Results, 1, "should have 1 result (deduplicated)") + require.Len(t, batchResp.Errors, 0, "should have no errors") + + // Verify the result is correct + result := batchResp.Results[0] + require.Equal(t, uint64(1001), result.GetMessage().GetNonce(), "nonce should match") + require.Equal(t, sourceVerifierAddress, result.SourceVerifierAddress, "source verifier address should match") + require.Equal(t, destVerifierAddress, result.DestVerifierAddress, "dest verifier address should match") + } + + for _, storageType := range storageTypes { + t.Run(storageType, func(t *testing.T) { + t.Parallel() + testFunc(t, storageType) + }) + } +} + +// TestBatchGetVerifierResult_MissingMessages tests batch API with mix of existing and non-existing messages. +func TestBatchGetVerifierResult_MissingMessages(t *testing.T) { + t.Parallel() + storageTypes := []string{"postgres"} + + testFunc := func(t *testing.T, storageType string) { + sourceVerifierAddress, destVerifierAddress := GenerateVerifierAddresses(t) + signer1 := NewSignerFixture(t, "node1") + signer2 := NewSignerFixture(t, "node2") + config := map[string]*model.Committee{ + "default": { + SourceVerifierAddresses: map[string]string{ + "1": common.Bytes2Hex(sourceVerifierAddress), + }, + QuorumConfigs: map[string]*model.QuorumConfig{ + "2": { + Threshold: 2, + Signers: []model.Signer{ + signer1.Signer, + signer2.Signer, + }, + CommitteeVerifierAddress: common.BytesToAddress(destVerifierAddress).Hex(), + }, + }, + }, + } + aggregatorClient, ccvDataClient, cleanup, err := CreateServerAndClient(t, WithCommitteeConfig(config), WithStorageType(storageType)) + t.Cleanup(cleanup) + require.NoError(t, err, "failed to create server and client") + + // Create and aggregate one message + existingMessage := NewProtocolMessage(t) + existingMessage.Nonce = protocol.Nonce(1001) + existingMessageId, err := existingMessage.MessageID() + require.NoError(t, err, "failed to compute existing message ID") + + ccvNodeData1 := NewMessageWithCCVNodeData(t, existingMessage, sourceVerifierAddress, WithSignatureFrom(t, signer1)) + _, err = aggregatorClient.WriteCommitCCVNodeData(t.Context(), NewWriteCommitCCVNodeDataRequest(ccvNodeData1)) + require.NoError(t, err, "WriteCommitCCVNodeData failed for signer1") + + ccvNodeData2 := NewMessageWithCCVNodeData(t, existingMessage, sourceVerifierAddress, WithSignatureFrom(t, signer2)) + _, err = aggregatorClient.WriteCommitCCVNodeData(t.Context(), NewWriteCommitCCVNodeDataRequest(ccvNodeData2)) + require.NoError(t, err, "WriteCommitCCVNodeData failed for signer2") + + time.Sleep(100 * time.Millisecond) + + // Create a non-existent message ID + nonExistentMessage := NewProtocolMessage(t) + nonExistentMessage.Nonce = protocol.Nonce(9999) + nonExistentMsgId, err := nonExistentMessage.MessageID() + require.NoError(t, err, "failed to compute non-existent message ID") + + // Test batch request with mix of existing and non-existing messages + batchReqWithMissing := &pb.BatchGetVerifierResultForMessageRequest{ + Requests: []*pb.GetVerifierResultForMessageRequest{ + {MessageId: existingMessageId[:]}, // exists + {MessageId: nonExistentMsgId[:]}, // doesn't exist + }, + } + + batchResp, err := ccvDataClient.BatchGetVerifierResultForMessage(t.Context(), batchReqWithMissing) + require.NoError(t, err, "BatchGetVerifierResultForMessage with missing should not error") + require.NotNil(t, batchResp, "batch response with missing should not be nil") + + // Should have 1 result and 1 error + require.Len(t, batchResp.Results, 1, "should have 1 result (existing message)") + require.Len(t, batchResp.Errors, 1, "should have 1 error (missing message)") + + // Verify the error is NotFound + errorStatus := batchResp.Errors[0] + require.Equal(t, int32(codes.NotFound), errorStatus.Code, "error should be NotFound") + + // Verify the result is correct + result := batchResp.Results[0] + require.Equal(t, uint64(1001), result.GetMessage().GetNonce(), "nonce should match") + require.Equal(t, sourceVerifierAddress, result.SourceVerifierAddress, "source verifier address should match") + } + + for _, storageType := range storageTypes { + t.Run(storageType, func(t *testing.T) { + t.Parallel() + testFunc(t, storageType) + }) + } +} + +// TestBatchGetVerifierResult_EmptyRequest tests batch API with empty request. +func TestBatchGetVerifierResult_EmptyRequest(t *testing.T) { + t.Parallel() + storageTypes := []string{"postgres"} + + testFunc := func(t *testing.T, storageType string) { + sourceVerifierAddress, destVerifierAddress := GenerateVerifierAddresses(t) + signer1 := NewSignerFixture(t, "node1") + signer2 := NewSignerFixture(t, "node2") + config := map[string]*model.Committee{ + "default": { + SourceVerifierAddresses: map[string]string{ + "1": common.Bytes2Hex(sourceVerifierAddress), + }, + QuorumConfigs: map[string]*model.QuorumConfig{ + "2": { + Threshold: 2, + Signers: []model.Signer{ + signer1.Signer, + signer2.Signer, + }, + CommitteeVerifierAddress: common.BytesToAddress(destVerifierAddress).Hex(), + }, + }, + }, + } + _, ccvDataClient, cleanup, err := CreateServerAndClient(t, WithCommitteeConfig(config), WithStorageType(storageType)) + t.Cleanup(cleanup) + require.NoError(t, err, "failed to create server and client") + + // Test empty batch request (should fail) + emptyBatchReq := &pb.BatchGetVerifierResultForMessageRequest{ + Requests: []*pb.GetVerifierResultForMessageRequest{}, + } + + _, err = ccvDataClient.BatchGetVerifierResultForMessage(t.Context(), emptyBatchReq) + require.Error(t, err, "empty batch request should fail") + require.Equal(t, codes.InvalidArgument, status.Code(err), "error should be InvalidArgument") + } + + for _, storageType := range storageTypes { + t.Run(storageType, func(t *testing.T) { + t.Parallel() + testFunc(t, storageType) + }) + } +} diff --git a/aggregator/tests/utils.go b/aggregator/tests/utils.go index 9ecc94e2..49c3f2fb 100644 --- a/aggregator/tests/utils.go +++ b/aggregator/tests/utils.go @@ -186,13 +186,14 @@ func CreateServerOnly(t *testing.T, options ...ConfigOption) (*bufconn.Listener, }, DefaultLimits: map[string]model.RateLimitConfig{ // Generous defaults for tests - 10000 requests per minute - pb.VerifierResultAPI_GetMessagesSince_FullMethodName: {LimitPerMinute: 10000}, - pb.VerifierResultAPI_GetVerifierResultForMessage_FullMethodName: {LimitPerMinute: 10000}, - pb.Aggregator_WriteCommitCCVNodeData_FullMethodName: {LimitPerMinute: 10000}, - pb.Aggregator_BatchWriteCommitCCVNodeData_FullMethodName: {LimitPerMinute: 10000}, - pb.Aggregator_ReadCommitCCVNodeData_FullMethodName: {LimitPerMinute: 10000}, - pb.Aggregator_WriteChainStatus_FullMethodName: {LimitPerMinute: 10000}, - pb.Aggregator_ReadChainStatus_FullMethodName: {LimitPerMinute: 10000}, + pb.VerifierResultAPI_GetMessagesSince_FullMethodName: {LimitPerMinute: 10000}, + pb.VerifierResultAPI_GetVerifierResultForMessage_FullMethodName: {LimitPerMinute: 10000}, + pb.VerifierResultAPI_BatchGetVerifierResultForMessage_FullMethodName: {LimitPerMinute: 10000}, + pb.Aggregator_WriteCommitCCVNodeData_FullMethodName: {LimitPerMinute: 10000}, + pb.Aggregator_BatchWriteCommitCCVNodeData_FullMethodName: {LimitPerMinute: 10000}, + pb.Aggregator_ReadCommitCCVNodeData_FullMethodName: {LimitPerMinute: 10000}, + pb.Aggregator_WriteChainStatus_FullMethodName: {LimitPerMinute: 10000}, + pb.Aggregator_ReadChainStatus_FullMethodName: {LimitPerMinute: 10000}, }, }, } diff --git a/build/devenv/go.mod b/build/devenv/go.mod index 8844c349..66142033 100644 --- a/build/devenv/go.mod +++ b/build/devenv/go.mod @@ -32,7 +32,7 @@ require ( require ( github.com/gorilla/websocket v1.5.3 github.com/smartcontractkit/chainlink-ccv v0.0.0-00010101000000-000000000000 - github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251030133409-21ff07fff5e9 + github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251031131659-8d662c845c1a github.com/smartcontractkit/chainlink-testing-framework/wasp v1.51.1 google.golang.org/grpc v1.76.0 ) diff --git a/build/devenv/go.sum b/build/devenv/go.sum index e1f76839..1602a65b 100644 --- a/build/devenv/go.sum +++ b/build/devenv/go.sum @@ -1024,8 +1024,8 @@ github.com/smartcontractkit/chainlink-deployments-framework v0.56.0 h1:VkzslEC/a github.com/smartcontractkit/chainlink-deployments-framework v0.56.0/go.mod h1:ObH5HJ4yXzTmQLc6Af+ufrTVcQ+ocasHJ0YBZjw5ZCM= github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20250826201006-c81344a26fc3 h1:mJP6yJq2woOZchX0KvhLiKxDPaS0Vy4vTDFH4nnFkXs= github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20250826201006-c81344a26fc3/go.mod h1:3Lsp38qxen9PABVF+O5eocveQev+hyo9HLAgRodBD4Q= -github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251030133409-21ff07fff5e9 h1:vyj6oNj3cifQzg7jI/UCC59h8q3yPUzxnq3/lwrBMtc= -github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251030133409-21ff07fff5e9/go.mod h1:KJkb85Mfxr/2vjPvAWWpq0/QJMAP1Bts1wMWWhRn4/E= +github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251031131659-8d662c845c1a h1:koqgTBQEbmrAhzSaMdFEAWIPPwzHXdSnEKcxqQZ7c0M= +github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251031131659-8d662c845c1a/go.mod h1:KJkb85Mfxr/2vjPvAWWpq0/QJMAP1Bts1wMWWhRn4/E= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2 h1:1/KdO5AbUr3CmpLjMPuJXPo2wHMbfB8mldKLsg7D4M8= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2/go.mod h1:jUC52kZzEnWF9tddHh85zolKybmLpbQ1oNA4FjOHt1Q= github.com/smartcontractkit/chainlink-protos/job-distributor v0.13.1 h1:PWwLGimBt37eDzpbfZ9V/ZkW4oCjcwKjKiAwKlSfPc0= diff --git a/go.mod b/go.mod index 1da7a5ab..359020f5 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/smartcontractkit/chainlink-deployments-framework v0.56.0 github.com/smartcontractkit/chainlink-evm v0.3.3 github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20250826201006-c81344a26fc3 - github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251030133409-21ff07fff5e9 + github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251031131659-8d662c845c1a github.com/smartcontractkit/chainlink-testing-framework/framework v0.10.33 github.com/stretchr/testify v1.11.1 github.com/testcontainers/testcontainers-go v0.39.0 @@ -65,6 +65,7 @@ require ( github.com/spf13/cobra v1.10.1 github.com/testcontainers/testcontainers-go/modules/postgres v0.39.0 golang.org/x/crypto v0.41.0 + google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b ) require ( @@ -320,7 +321,6 @@ require ( golang.org/x/time v0.12.0 // indirect golang.org/x/tools v0.36.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250804133106-a7a43d27e69b // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b // indirect gopkg.in/guregu/null.v4 v4.0.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 2ac667dd..5e5bd8d2 100644 --- a/go.sum +++ b/go.sum @@ -752,8 +752,8 @@ github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-23 github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2/go.mod h1:jo+cUqNcHwN8IF7SInQNXDZ8qzBsyMpnLdYbDswviFc= github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250729142306-508e798f6a5d h1:pTYIcsWHTMG5fAcbRUA8Qk5yscXKdSpopQ0DUEOjPik= github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250729142306-508e798f6a5d/go.mod h1:2JTBNp3FlRdO/nHc4dsc9bfxxMClMO1Qt8sLJgtreBY= -github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251030133409-21ff07fff5e9 h1:vyj6oNj3cifQzg7jI/UCC59h8q3yPUzxnq3/lwrBMtc= -github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251030133409-21ff07fff5e9/go.mod h1:KJkb85Mfxr/2vjPvAWWpq0/QJMAP1Bts1wMWWhRn4/E= +github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251031131659-8d662c845c1a h1:koqgTBQEbmrAhzSaMdFEAWIPPwzHXdSnEKcxqQZ7c0M= +github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go v0.0.0-20251031131659-8d662c845c1a/go.mod h1:KJkb85Mfxr/2vjPvAWWpq0/QJMAP1Bts1wMWWhRn4/E= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2 h1:1/KdO5AbUr3CmpLjMPuJXPo2wHMbfB8mldKLsg7D4M8= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2/go.mod h1:jUC52kZzEnWF9tddHh85zolKybmLpbQ1oNA4FjOHt1Q= github.com/smartcontractkit/chainlink-protos/job-distributor v0.13.1 h1:PWwLGimBt37eDzpbfZ9V/ZkW4oCjcwKjKiAwKlSfPc0=