Skip to content

Commit faefe8b

Browse files
authored
feat: add idempotency key support (#244)
* feat: add idempotency key support to prevent duplicate CCV verification processing Updates aggregator database schema, API validation, and storage layer to use idempotency_key for deduplication instead of verification_timestamp. Modifies verifier pipeline to generate and propagate idempotency keys through the CCVDataWithIdempotencyKey paired struct from source readers to storage operations. * Fix merge issues * Address PR comments
1 parent 353ce9b commit faefe8b

22 files changed

+206
-154
lines changed

aggregator/migrations/postgres/00001_create_all_tables.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ CREATE TABLE IF NOT EXISTS commit_verification_records (
1616
signature_s BYTEA NOT NULL DEFAULT '',
1717
ccv_node_data BYTEA NOT NULL,
1818
verification_timestamp BIGINT NOT NULL,
19+
idempotency_key UUID NOT NULL,
1920
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
20-
CONSTRAINT unique_verification UNIQUE (message_id, committee_id, signer_address, verification_timestamp)
21+
CONSTRAINT unique_verification UNIQUE (message_id, committee_id, signer_address, idempotency_key)
2122
);
2223

2324
CREATE TABLE IF NOT EXISTS commit_aggregated_reports (

aggregator/pkg/handlers/validations.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,22 @@ package handlers
22

33
import (
44
"bytes"
5+
"regexp"
56

67
"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/model"
78

89
validation "github.com/go-ozzo/ozzo-validation/v4"
910
pb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go/v1"
1011
)
1112

13+
// uuidRegex matches standard UUID format (with or without hyphens).
14+
var uuidRegex = regexp.MustCompile(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`)
15+
1216
func validateWriteRequest(req *pb.WriteCommitCCVNodeDataRequest) error {
1317
err := validation.ValidateStruct(
1418
req,
15-
validation.Field(&req.CcvNodeData, validation.Required))
19+
validation.Field(&req.CcvNodeData, validation.Required),
20+
validation.Field(&req.IdempotencyKey, validation.Required, validation.Match(uuidRegex).Error("must be a valid UUID")))
1621
if err != nil {
1722
return err
1823
}

aggregator/pkg/handlers/write_commit_ccv_data.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package handlers
33
import (
44
"context"
55

6+
"github.com/google/uuid"
67
"google.golang.org/grpc/codes"
78
"google.golang.org/grpc/status"
89

@@ -67,12 +68,22 @@ func (h *WriteCommitCCVNodeDataHandler) Handle(ctx context.Context, req *pb.Writ
6768
signerCtx := scope.WithAddress(ctx, signer.Address)
6869
signerCtx = scope.WithParticipantID(signerCtx, signer.ParticipantID)
6970
signerCtx = scope.WithCommitteeID(signerCtx, signer.CommitteeID)
71+
72+
// Parse the idempotency key as UUID
73+
idempotencyUUID, err := uuid.Parse(req.GetIdempotencyKey())
74+
if err != nil {
75+
return &pb.WriteCommitCCVNodeDataResponse{
76+
Status: pb.WriteStatus_FAILED,
77+
}, status.Errorf(codes.InvalidArgument, "invalid idempotency key format: %v", err)
78+
}
79+
7080
record := model.CommitVerificationRecord{
7181
MessageWithCCVNodeData: *req.GetCcvNodeData(),
7282
IdentifierSigner: signer,
7383
CommitteeID: signer.CommitteeID,
84+
IdempotencyKey: idempotencyUUID,
7485
}
75-
err := h.storage.SaveCommitVerification(signerCtx, &record)
86+
err = h.storage.SaveCommitVerification(signerCtx, &record)
7687
if err != nil {
7788
return &pb.WriteCommitCCVNodeDataResponse{
7889
Status: pb.WriteStatus_FAILED,

aggregator/pkg/model/commit_verification_record.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"encoding/hex"
55
"fmt"
66

7+
"github.com/google/uuid"
8+
79
pb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/go/v1"
810
)
911

@@ -26,7 +28,8 @@ func (c CommitVerificationRecordIdentifier) ToIdentifier() string {
2628
type CommitVerificationRecord struct {
2729
IdentifierSigner *IdentifierSigner
2830
pb.MessageWithCCVNodeData
29-
CommitteeID CommitteeID
31+
CommitteeID CommitteeID
32+
IdempotencyKey uuid.UUID
3033
}
3134

3235
// GetID retrieves the unique identifier for the commit verification record.

aggregator/pkg/storage/postgres/database_storage.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,9 @@ func (d *DatabaseStorage) SaveCommitVerification(ctx context.Context, record *mo
142142

143143
stmt := `INSERT INTO commit_verification_records
144144
(message_id, committee_id, participant_id, signer_address, source_chain_selector, dest_chain_selector,
145-
onramp_address, offramp_address, signature_r, signature_s, ccv_node_data, verification_timestamp)
146-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
147-
ON CONFLICT (message_id, committee_id, signer_address, verification_timestamp)
145+
onramp_address, offramp_address, signature_r, signature_s, ccv_node_data, verification_timestamp, idempotency_key)
146+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
147+
ON CONFLICT (message_id, committee_id, signer_address, idempotency_key)
148148
DO NOTHING`
149149

150150
ccvNodeData, err := proto.Marshal(&record.MessageWithCCVNodeData)
@@ -174,6 +174,7 @@ func (d *DatabaseStorage) SaveCommitVerification(ctx context.Context, record *mo
174174
record.IdentifierSigner.SignatureS[:],
175175
ccvNodeData,
176176
record.Timestamp,
177+
record.IdempotencyKey,
177178
)
178179
if err != nil {
179180
return fmt.Errorf("failed to save commit verification record: %w", err)

aggregator/pkg/storage/postgres/database_storage_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/ethereum/go-ethereum/common"
1212
"github.com/ethereum/go-ethereum/crypto"
13+
"github.com/google/uuid"
1314
"github.com/jmoiron/sqlx"
1415
"github.com/stretchr/testify/require"
1516
"github.com/testcontainers/testcontainers-go"
@@ -181,7 +182,8 @@ func createTestCommitVerificationRecord(msgWithCCV *pb.MessageWithCCVNodeData, s
181182
SignatureS: s32,
182183
CommitteeID: committeeID,
183184
},
184-
CommitteeID: committeeID,
185+
CommitteeID: committeeID,
186+
IdempotencyKey: uuid.New(), // Generate unique idempotency key for each record
185187
}
186188
}
187189

0 commit comments

Comments
 (0)