From 5a2dac3a1a156ddb0491b35ec389e4de76551749 Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Wed, 11 Mar 2026 16:27:00 +0200 Subject: [PATCH] Prune requests committed by the BFT synchronizer Signed-off-by: Yoav Tock --- node/consensus/consensus.go | 25 +++++++++++++++++++++++++ node/consensus/consensus_builder.go | 10 +++++----- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/node/consensus/consensus.go b/node/consensus/consensus.go index f9042cdb..55c110af 100644 --- a/node/consensus/consensus.go +++ b/node/consensus/consensus.go @@ -925,3 +925,28 @@ func (c *Consensus) verifyAndClassifyRequest(request *protos.Request) (*protos.R return configRequest, nil } + +// PruneRequestsFromMemPool removes from the mempool the requests included in the given block. +// It is called by the BFT synchronizer. +func (c *Consensus) PruneRequestsFromMemPool(consenterBlock *common.Block) { + if consenterBlock.GetHeader().GetNumber() == 0 { + return // genesis block doesn't include any request + } + + // TODO it make sense to have ConsenterBlockToDecision return the error + decision := ConsenterBlockToDecision(consenterBlock) + if decision == nil { + c.Logger.Panicf("Failed parsing block we pulled with BFT Synchronizer") + } + + // Every request, including config requests, is included in the proposal's payload as a batch of requests, + // so we can just deserialize the payload to get all the included requests and remove them from the mempool. + var batch arma_types.BatchedRequests + if err := batch.Deserialize(decision.Proposal.Payload); err != nil { + c.Logger.Panicf("Failed deserializing proposal payload: %v", err) + } + + for _, req := range batch { + c.BFT.Pool.RemoveRequest(c.RequestID(req)) + } +} diff --git a/node/consensus/consensus_builder.go b/node/consensus/consensus_builder.go index 9a9ed1a2..481a8ef5 100644 --- a/node/consensus/consensus_builder.go +++ b/node/consensus/consensus_builder.go @@ -126,11 +126,11 @@ func CreateConsensus(conf *node_config.ConsenterNodeConfig, net NetStopper, last ClientPrivateKey: conf.TLSPrivateKeyFile, ReplicationPolicy: "", }, - c, // implements synchronizer.BFTConfigGetter, - ConsenterBlockToDecision, // func(block *cb.Block) *types.Decision // TODO look at the assembler - nil, // pruneCommittedRequests func(block *cb.Block), - nil, // updateRuntimeConfig func(block *cb.Block) types.Reconfig, - nil, // support ConsenterSupport, + c, // implements synchronizer.BFTConfigGetter, + ConsenterBlockToDecision, // func(block *cb.Block) *types.Decision // TODO look at the assembler + c.PruneRequestsFromMemPool, // pruneCommittedRequests func(block *cb.Block), + nil, // updateRuntimeConfig func(block *cb.Block) types.Reconfig, + nil, // support ConsenterSupport, factory.GetDefault(), nil, // c.ClusterService.Dialer) )