Skip to content

Commit 79310d0

Browse files
authored
refactor(sequencers): persist prepended batch (#2907)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview ref: #2906
1 parent 33bb441 commit 79310d0

File tree

2 files changed

+86
-18
lines changed

2 files changed

+86
-18
lines changed

sequencers/single/queue.go

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,23 +57,7 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e
5757
return ErrQueueFull
5858
}
5959

60-
hash, err := batch.Hash()
61-
if err != nil {
62-
return err
63-
}
64-
key := hex.EncodeToString(hash)
65-
66-
pbBatch := &pb.Batch{
67-
Txs: batch.Transactions,
68-
}
69-
70-
encodedBatch, err := proto.Marshal(pbBatch)
71-
if err != nil {
72-
return err
73-
}
74-
75-
// First write to DB for durability
76-
if err := bq.db.Put(ctx, ds.NewKey(key), encodedBatch); err != nil {
60+
if err := bq.persistBatch(ctx, batch); err != nil {
7761
return err
7862
}
7963

@@ -85,11 +69,21 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e
8569

8670
// Prepend adds a batch to the front of the queue (before head position).
8771
// This is used to return transactions that couldn't fit in the current batch.
88-
// TODO(@julienrbrt): The batch is currently NOT persisted to the DB since these are transactions that were already in the queue or were just processed. -- FI txs are lost, this should be tackled.
72+
// The batch is persisted to the DB to ensure durability in case of crashes.
73+
//
74+
// NOTE: Prepend intentionally bypasses the maxQueueSize limit to ensure high-priority
75+
// transactions can always be re-queued. This means the effective queue size may temporarily
76+
// exceed the configured maximum when Prepend is used. This is by design to prevent loss
77+
// of transactions that have already been accepted but couldn't fit in the current batch.
8978
func (bq *BatchQueue) Prepend(ctx context.Context, batch coresequencer.Batch) error {
9079
bq.mu.Lock()
9180
defer bq.mu.Unlock()
9281

82+
if err := bq.persistBatch(ctx, batch); err != nil {
83+
return err
84+
}
85+
86+
// Then add to in-memory queue
9387
// If we have room before head, use it
9488
if bq.head > 0 {
9589
bq.head--
@@ -187,3 +181,28 @@ func (bq *BatchQueue) Size() int {
187181
defer bq.mu.Unlock()
188182
return len(bq.queue) - bq.head
189183
}
184+
185+
// persistBatch persists a batch to the datastore
186+
func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch) error {
187+
hash, err := batch.Hash()
188+
if err != nil {
189+
return err
190+
}
191+
key := hex.EncodeToString(hash)
192+
193+
pbBatch := &pb.Batch{
194+
Txs: batch.Transactions,
195+
}
196+
197+
encodedBatch, err := proto.Marshal(pbBatch)
198+
if err != nil {
199+
return err
200+
}
201+
202+
// First write to DB for durability
203+
if err := bq.db.Put(ctx, ds.NewKey(key), encodedBatch); err != nil {
204+
return err
205+
}
206+
207+
return nil
208+
}

sequencers/single/queue_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -720,4 +720,53 @@ func TestBatchQueue_Prepend(t *testing.T) {
720720
require.NoError(t, err)
721721
assert.Equal(t, []byte("tx1"), nextBatch.Transactions[0])
722722
})
723+
724+
t.Run("prepend persistence across restarts", func(t *testing.T) {
725+
prefix := "test-prepend-persistence"
726+
queue := NewBatchQueue(db, prefix, 0)
727+
err := queue.Load(ctx)
728+
require.NoError(t, err)
729+
730+
// Add some batches
731+
batch1 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}}
732+
batch2 := coresequencer.Batch{Transactions: [][]byte{[]byte("tx2")}}
733+
err = queue.AddBatch(ctx, batch1)
734+
require.NoError(t, err)
735+
err = queue.AddBatch(ctx, batch2)
736+
require.NoError(t, err)
737+
738+
// Consume first batch
739+
_, err = queue.Next(ctx)
740+
require.NoError(t, err)
741+
742+
// Prepend a batch (simulating transactions that couldn't fit)
743+
prependedBatch := coresequencer.Batch{Transactions: [][]byte{[]byte("prepended")}}
744+
err = queue.Prepend(ctx, prependedBatch)
745+
require.NoError(t, err)
746+
747+
assert.Equal(t, 2, queue.Size())
748+
749+
// Simulate restart by creating a new queue with same prefix
750+
queue2 := NewBatchQueue(db, prefix, 0)
751+
err = queue2.Load(ctx)
752+
require.NoError(t, err)
753+
754+
// Should have both the prepended batch and tx2
755+
assert.Equal(t, 2, queue2.Size())
756+
757+
// First should be prepended batch
758+
nextBatch, err := queue2.Next(ctx)
759+
require.NoError(t, err)
760+
assert.Equal(t, 1, len(nextBatch.Transactions))
761+
assert.Contains(t, nextBatch.Transactions, []byte("prepended"))
762+
763+
// Then tx2
764+
nextBatch, err = queue2.Next(ctx)
765+
require.NoError(t, err)
766+
assert.Equal(t, 1, len(nextBatch.Transactions))
767+
assert.Contains(t, nextBatch.Transactions, []byte("tx2"))
768+
769+
// Queue should be empty now
770+
assert.Equal(t, 0, queue2.Size())
771+
})
723772
}

0 commit comments

Comments
 (0)