Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions test/send_config_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,177 @@ func uniqueFileName(path string) string {
}
}

// TestUpdateTimeoutParameters verifies that updating a party's timeout parameters via a config update succeeds,
// and that the party can continue processing transactions after the config update with the new timeout parameters.
func TestUpdateTimeoutParameters(t *testing.T) {
// Prepare Arma config and crypto and get the genesis block
dir, err := os.MkdirTemp("", t.Name())
require.NoError(t, err)
defer os.RemoveAll(dir)

configPath := filepath.Join(dir, "config.yaml")
numOfParties := 4
submittingParty := types.PartyID(1)

netInfo := testutil.CreateNetwork(t, configPath, numOfParties, 2, "none", "none")
require.NotNil(t, netInfo)
require.NoError(t, err)

armageddon.NewCLI().Run([]string{"generate", "--config", configPath, "--output", dir})

armaBinaryPath, err := gexec.BuildWithEnvironment("github.com/hyperledger/fabric-x-orderer/cmd/arma", []string{"GOPRIVATE=" + os.Getenv("GOPRIVATE")})
defer gexec.CleanupBuildArtifacts()
require.NoError(t, err)
require.NotNil(t, armaBinaryPath)

// Start Arma nodes
numOfArmaNodes := len(netInfo)
readyChan := make(chan string, numOfArmaNodes)
armaNetwork := testutil.RunArmaNodes(t, dir, armaBinaryPath, readyChan, netInfo)
defer armaNetwork.Stop()

testutil.WaitReady(t, readyChan, numOfArmaNodes, 10)

userConfig, err := testutil.GetUserConfig(dir, submittingParty)
require.NoError(t, err)
require.NotNil(t, userConfig)

totalTxNumber := 100
// rate limiter parameters
fillInterval := 10 * time.Millisecond
fillFrequency := 1000 / int(fillInterval.Milliseconds())
rate := 500

capacity := rate / fillFrequency
rl, err := armageddon.NewRateLimiter(rate, fillInterval, capacity)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to start a rate limiter")
os.Exit(3)
}

broadcastClient := client.NewBroadcastTxClient(userConfig, 10*time.Second)
signer, certBytes, err := testutil.LoadCryptoMaterialsFromDir(t, userConfig.MSPDir)
require.NoError(t, err)

org := fmt.Sprintf("org%d", submittingParty)

for i := range totalTxNumber {
status := rl.GetToken()
if !status {
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
os.Exit(3)
}
txContent := tx.PrepareTxWithTimestamp(i, 64, []byte("sessionNumber"))
env := tx.CreateSignedStructuredEnvelope(txContent, signer, certBytes, org)
err = broadcastClient.SendTx(env)
require.NoError(t, err)
}

var parties []types.PartyID
for i := 1; i <= numOfParties; i++ {
parties = append(parties, types.PartyID(i))
}

pullRequestSigner := signutil.CreateTestSigner(t, "org1", dir)

statusUknown := common.Status_UNKNOWN
PullFromAssemblers(t, &BlockPullerOptions{
UserConfig: userConfig,
Parties: parties,
Transactions: totalTxNumber,
Timeout: 60,
ErrString: "cancelled pull from assembler: %d; pull ended: failed to receive a deliver response: rpc error: code = Canceled desc = grpc: the client connection is closing",
Status: &statusUknown,
Signer: pullRequestSigner,
})

// Create config update
configUpdateBuilder, _ := configutil.NewConfigUpdateBuilder(t, dir, filepath.Join(dir, "bootstrap", "bootstrap.block"))

autoRemoveTimeout := "100ms"
configUpdatePbData := configUpdateBuilder.UpdateBatchTimeouts(t, configutil.NewBatchTimeoutsConfig(configutil.BatchTimeoutsConfigName.AutoRemoveTimeout, autoRemoveTimeout))

// Submit config update
env := configutil.CreateConfigTX(t, dir, parties, int(submittingParty), configUpdatePbData)
require.NotNil(t, env)

// Send the config tx
err = broadcastClient.SendTxTo(env, submittingParty)
require.NoError(t, err)

broadcastClient.Stop()

// Wait for Arma nodes to stop
testutil.WaitSoftStopped(t, netInfo)

// Restart Arma nodes
armaNetwork.Stop()

armaNetwork.Restart(t, readyChan)
defer armaNetwork.Stop()

testutil.WaitReady(t, readyChan, numOfArmaNodes, 10)

// Send transactions again and verify they are processed
broadcastClient = client.NewBroadcastTxClient(userConfig, 10*time.Second)
totalTxNumber += 1000

for i := range totalTxNumber {
status := rl.GetToken()
if !status {
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
os.Exit(3)
}
txContent := tx.PrepareTxWithTimestamp(i+totalTxNumber, 64, []byte("sessionNumber"))
env := tx.CreateSignedStructuredEnvelope(txContent, signer, certBytes, org)
err = broadcastClient.SendTx(env)
require.NoError(t, err)
}

broadcastClient.Stop()

PullFromAssemblers(t, &BlockPullerOptions{
UserConfig: userConfig,
Parties: parties,
Transactions: totalTxNumber + 1, // including config update tx
BlockHandler: &verifyTimeoutParam{
AutoRemoveTimeout: autoRemoveTimeout,
},
Timeout: 60,
ErrString: "cancelled pull from assembler: %d; pull ended: failed to receive a deliver response: rpc error: code = Canceled desc = grpc: the client connection is closing",
Status: &statusUknown,
Signer: pullRequestSigner,
})
}

type verifyTimeoutParam struct {
AutoRemoveTimeout string
}

func (vt *verifyTimeoutParam) HandleBlock(t *testing.T, block *common.Block) error {
isGenesisBlock := block.Header.Number == 0 || block.Header.GetDataHash() == nil
if isGenesisBlock {
return nil
}

if protoutil.IsConfigBlock(block) {
envelope, err := configutil.ReadConfigEnvelopeFromConfigBlock(block)
if err != nil || envelope == nil {
return fmt.Errorf("failed to read config envelope from config block: %w", err)
}

sharedConfig := configutil.GetSharedConfig(t, envelope)
require.NotNil(t, sharedConfig)

if vt.AutoRemoveTimeout != sharedConfig.BatchingConfig.BatchTimeouts.AutoRemoveTimeout {
return fmt.Errorf("AutoRemoveTimeout in the config block does not match the expected value. Expected: %s, Got: %s", vt.AutoRemoveTimeout, sharedConfig.BatchingConfig.BatchTimeouts.AutoRemoveTimeout)
}
return nil
}

return nil
}

// prepareAddPartyConfigUpdate prepares a config update to add a new party to the network and returns the new party ID,
// the updated network info with the new party's nodes, and the config update protobuf data.
func prepareAddPartyConfigUpdate(t *testing.T, dir string, configUpdateBuilder *configutil.ConfigUpdateBuilder) (types.PartyID, map[testutil.NodeName]*testutil.ArmaNodeInfo) {
Expand Down
25 changes: 15 additions & 10 deletions testutil/configutil/config_update_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,20 @@ func ReadConfigEnvelopeFromConfigBlock(configBlock *common.Block) (*common.Confi
}

func GetPartyConfig(t *testing.T, configEnvelope *common.ConfigEnvelope, partyID types.PartyID) *protos.PartyConfig {
sharedConfig := GetSharedConfig(t, configEnvelope)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can also move this change to a different PR
and then rebase them all on top of this one

partiesConfig := sharedConfig.GetPartiesConfig()
require.NotNil(t, partiesConfig)

for _, partyConfig := range partiesConfig {
if partyConfig.PartyID == uint32(partyID) {
return partyConfig
}
}

return nil
}

func GetSharedConfig(t *testing.T, configEnvelope *common.ConfigEnvelope) *protos.SharedConfig {
require.NotNil(t, configEnvelope)

require.NotNil(t, configEnvelope.Config.GetChannelGroup().Groups["Orderer"].Values["ConsensusType"].GetValue())
Expand All @@ -960,16 +974,7 @@ func GetPartyConfig(t *testing.T, configEnvelope *common.ConfigEnvelope, partyID
err = proto.Unmarshal(consensusType.GetMetadata(), &sharedConfig)
require.NoError(t, err)

partiesConfig := sharedConfig.GetPartiesConfig()
require.NotNil(t, partiesConfig)

for _, partyConfig := range partiesConfig {
if partyConfig.PartyID == uint32(partyID) {
return partyConfig
}
}

return nil
return &sharedConfig
}

func getNestedJSONValue(t *testing.T, data map[string]any, path ...string) any {
Expand Down
Loading