diff --git a/node/consensus/consensus.go b/node/consensus/consensus.go index a14f20b6..f9042cdb 100644 --- a/node/consensus/consensus.go +++ b/node/consensus/consensus.go @@ -34,6 +34,7 @@ import ( "github.com/hyperledger/fabric-x-orderer/node/consensus/badb" "github.com/hyperledger/fabric-x-orderer/node/consensus/configrequest" "github.com/hyperledger/fabric-x-orderer/node/consensus/state" + bft_synch "github.com/hyperledger/fabric-x-orderer/node/consensus/synchronizer" "github.com/hyperledger/fabric-x-orderer/node/delivery" "github.com/hyperledger/fabric-x-orderer/node/ledger" protos "github.com/hyperledger/fabric-x-orderer/node/protos/comm" @@ -97,16 +98,21 @@ type Consensus struct { lastConfigBlockNum uint64 decisionNumOfLastConfigBlock arma_types.DecisionNum Logger *flogging.FabricLogger - Synchronizer SynchronizerStopper - Metrics *ConsensusMetrics - RequestVerifier *requestfilter.RulesVerifier - ConfigUpdateProposer policy.ConfigUpdateProposer - ConfigApplier ConfigApplier - ConfigRequestValidator configrequest.ConfigRequestValidator - ConfigRulesVerifier verify.OrdererRules - softStopCh chan struct{} - softStopOnce sync.Once - txCount uint64 + + synchronizerFactory bft_synch.SynchronizerFactory // Builds a BFT synchronizer + bftSynchronizer bft_synch.SynchronizerWithStop // The BFT synchronizer built by the factory + + Synchronizer SynchronizerStopper // TODO remove after we change to the BFT synchronizer, and use bftSynchronizer instead + + Metrics *ConsensusMetrics + RequestVerifier *requestfilter.RulesVerifier + ConfigUpdateProposer policy.ConfigUpdateProposer + ConfigApplier ConfigApplier + ConfigRequestValidator configrequest.ConfigRequestValidator + ConfigRulesVerifier verify.OrdererRules + softStopCh chan struct{} + softStopOnce sync.Once + txCount uint64 } func (c *Consensus) Start() error { diff --git a/node/consensus/consensus_builder.go b/node/consensus/consensus_builder.go index 085028f0..9a9ed1a2 100644 --- a/node/consensus/consensus_builder.go +++ b/node/consensus/consensus_builder.go @@ -20,6 +20,7 @@ import ( smartbft_types "github.com/hyperledger-labs/SmartBFT/pkg/types" "github.com/hyperledger-labs/SmartBFT/pkg/wal" "github.com/hyperledger-labs/SmartBFT/smartbftprotos" + "github.com/hyperledger/fabric-lib-go/bccsp/factory" "github.com/hyperledger/fabric-lib-go/common/flogging" "github.com/hyperledger/fabric-protos-go-apiv2/common" "github.com/hyperledger/fabric-x-common/common/policies" @@ -28,12 +29,14 @@ import ( "github.com/hyperledger/fabric-x-orderer/common/requestfilter" arma_types "github.com/hyperledger/fabric-x-orderer/common/types" "github.com/hyperledger/fabric-x-orderer/common/utils" + ord_config "github.com/hyperledger/fabric-x-orderer/config" "github.com/hyperledger/fabric-x-orderer/config/verify" "github.com/hyperledger/fabric-x-orderer/node/comm" - "github.com/hyperledger/fabric-x-orderer/node/config" + node_config "github.com/hyperledger/fabric-x-orderer/node/config" "github.com/hyperledger/fabric-x-orderer/node/consensus/badb" "github.com/hyperledger/fabric-x-orderer/node/consensus/configrequest" "github.com/hyperledger/fabric-x-orderer/node/consensus/state" + bft_synch "github.com/hyperledger/fabric-x-orderer/node/consensus/synchronizer" "github.com/hyperledger/fabric-x-orderer/node/crypto" "github.com/hyperledger/fabric-x-orderer/node/delivery" "github.com/hyperledger/fabric-x-orderer/node/ledger" @@ -41,7 +44,7 @@ import ( "google.golang.org/protobuf/proto" ) -func CreateConsensus(conf *config.ConsenterNodeConfig, net NetStopper, lastConfigBlock *common.Block, logger *flogging.FabricLogger, signer Signer, configUpdateProposer policy.ConfigUpdateProposer) *Consensus { +func CreateConsensus(conf *node_config.ConsenterNodeConfig, net NetStopper, lastConfigBlock *common.Block, logger *flogging.FabricLogger, signer Signer, configUpdateProposer policy.ConfigUpdateProposer) *Consensus { if lastConfigBlock == nil { logger.Panicf("Error creating Consensus%d, last config block is nil", conf.PartyId) return nil @@ -97,6 +100,7 @@ func CreateConsensus(conf *config.ConsenterNodeConfig, net NetStopper, lastConfi Storage: consLedger, SigVerifier: buildVerifier(conf.Consenters, conf.Shards, logger), Signer: signer, + synchronizerFactory: &bft_synch.SynchronizerCreator{}, Metrics: NewConsensusMetrics(conf, consLedger.Height(), txCount, logger), RequestVerifier: CreateConsensusRulesVerifier(conf), ConfigUpdateProposer: configUpdateProposer, @@ -111,6 +115,31 @@ func CreateConsensus(conf *config.ConsenterNodeConfig, net NetStopper, lastConfi c.BFT = createBFT(c, metadata, lastProposal, lastSigs, conf.WALDir) setupComm(c) + + // TODO just creation in the meantime + bftSynch := c.synchronizerFactory.CreateSynchronizer( + logger, + uint64(conf.PartyId), + ord_config.Cluster{ + SendBufferSize: 100, // TODO get this from local config + ClientCertificate: conf.TLSCertificateFile, + ClientPrivateKey: conf.TLSPrivateKeyFile, + ReplicationPolicy: "", + }, + c, // implements synchronizer.BFTConfigGetter, + ConsenterBlockToDecision, // func(block *cb.Block) *types.Decision // TODO look at the assembler + nil, // pruneCommittedRequests func(block *cb.Block), + nil, // updateRuntimeConfig func(block *cb.Block) types.Reconfig, + nil, // support ConsenterSupport, + factory.GetDefault(), + nil, // c.ClusterService.Dialer) + ) + if bftSynch != nil { + logger.Info("Created a BFT Synchronizer") + c.bftSynchronizer = bftSynch + } + + // TODO BFTSynch remove the simple synchronizer and replace with the BFT synchronizer sync := createSynchronizer(consLedger, c) c.BFT.Synchronizer = sync c.Synchronizer = sync @@ -150,6 +179,7 @@ func createBFT(c *Consensus, m *smartbftprotos.ViewMetadata, lastProposal *smart return bft } +// TODO BFTSynch remove the simple synchronizer and replace with the BFT synchronizer func createSynchronizer(ledger *ledger.ConsensusLedger, c *Consensus) *synchronizer { latestCommittedBlock := uint64(0) if ledger.Height() > 0 { @@ -184,6 +214,7 @@ func createSynchronizer(ledger *ledger.ConsensusLedger, c *Consensus) *synchroni CurrentNodes: c.CurrentNodes, } + // TODO remove once we change to the BFT synchronizer ledger.RegisterAppendListener(synchronizer) if len(c.CurrentNodes) > 1 { // don't run the synchronizer when there is only one node @@ -195,7 +226,7 @@ func createSynchronizer(ledger *ledger.ConsensusLedger, c *Consensus) *synchroni return synchronizer } -func buildVerifier(consenterInfos []config.ConsenterInfo, shardInfo []config.ShardInfo, logger *flogging.FabricLogger) crypto.ECDSAVerifier { +func buildVerifier(consenterInfos []node_config.ConsenterInfo, shardInfo []node_config.ShardInfo, logger *flogging.FabricLogger) crypto.ECDSAVerifier { verifier := make(crypto.ECDSAVerifier) for _, ci := range consenterInfos { pk, _ := pem.Decode(ci.PublicKey) @@ -232,7 +263,7 @@ func buildVerifier(consenterInfos []config.ConsenterInfo, shardInfo []config.Sha return verifier } -func getInitialStateAndMetadata(logger *flogging.FabricLogger, config *config.ConsenterNodeConfig, lastConfigBlock *common.Block, ledger *ledger.ConsensusLedger) (*state.State, *smartbftprotos.ViewMetadata, *smartbft_types.Proposal, []smartbft_types.Signature, arma_types.DecisionNum) { +func getInitialStateAndMetadata(logger *flogging.FabricLogger, config *node_config.ConsenterNodeConfig, lastConfigBlock *common.Block, ledger *ledger.ConsensusLedger) (*state.State, *smartbftprotos.ViewMetadata, *smartbft_types.Proposal, []smartbft_types.Signature, arma_types.DecisionNum) { height := ledger.Height() logger.Infof("Initial consenter ledger height is: %d", height) if height == 0 { @@ -266,7 +297,7 @@ func getInitialStateAndMetadata(logger *flogging.FabricLogger, config *config.Co return header.State, md, &proposal, sigs, header.DecisionNumOfLastConfigBlock } -func initialStateFromConfig(config *config.ConsenterNodeConfig) *state.State { +func initialStateFromConfig(config *node_config.ConsenterNodeConfig) *state.State { var initState state.State initState.ShardCount = uint16(len(config.Shards)) initState.N = uint16(len(config.Consenters)) @@ -452,7 +483,7 @@ func setupComm(c *Consensus) { } } -func getSelfID(consenterInfos []config.ConsenterInfo, partyID arma_types.PartyID) []byte { +func getSelfID(consenterInfos []node_config.ConsenterInfo, partyID arma_types.PartyID) []byte { var myIdentity []byte for _, ci := range consenterInfos { pk := ci.PublicKey @@ -465,7 +496,7 @@ func getSelfID(consenterInfos []config.ConsenterInfo, partyID arma_types.PartyID return myIdentity } -func CreateConsensusRulesVerifier(config *config.ConsenterNodeConfig) *requestfilter.RulesVerifier { +func CreateConsensusRulesVerifier(config *node_config.ConsenterNodeConfig) *requestfilter.RulesVerifier { rv := requestfilter.NewRulesVerifier(nil) rv.AddRule(requestfilter.PayloadNotEmptyRule{}) rv.AddRule(requestfilter.NewMaxSizeFilter(config)) diff --git a/node/ledger/consensus_ledger.go b/node/ledger/consensus_ledger.go index f48c9ae0..8954e13d 100644 --- a/node/ledger/consensus_ledger.go +++ b/node/ledger/consensus_ledger.go @@ -70,6 +70,7 @@ func NewConsensusLedger(ledgerDir string) (*ConsensusLedger, error) { return consensusLedger, nil } +// TODO BFTSynch remove will not be used once we change to the BFT synchronizer func (l *ConsensusLedger) RegisterAppendListener(listener AppendListener) { l.appendListener = listener }