diff --git a/test/send_config_update_test.go b/test/send_config_update_test.go index 4b4b11a8..f7f651b7 100644 --- a/test/send_config_update_test.go +++ b/test/send_config_update_test.go @@ -1573,6 +1573,177 @@ func uniqueFileName(path string) string { } } +// TestUpdateTimeoutParameters verifies that updating a party's timeout parameters via a config update succeeds, +// and that the party can continue processing transactions after the config update with the new timeout parameters. +func TestUpdateTimeoutParameters(t *testing.T) { + // Prepare Arma config and crypto and get the genesis block + dir, err := os.MkdirTemp("", t.Name()) + require.NoError(t, err) + defer os.RemoveAll(dir) + + configPath := filepath.Join(dir, "config.yaml") + numOfParties := 4 + submittingParty := types.PartyID(1) + + netInfo := testutil.CreateNetwork(t, configPath, numOfParties, 2, "none", "none") + require.NotNil(t, netInfo) + require.NoError(t, err) + + armageddon.NewCLI().Run([]string{"generate", "--config", configPath, "--output", dir}) + + armaBinaryPath, err := gexec.BuildWithEnvironment("github.com/hyperledger/fabric-x-orderer/cmd/arma", []string{"GOPRIVATE=" + os.Getenv("GOPRIVATE")}) + defer gexec.CleanupBuildArtifacts() + require.NoError(t, err) + require.NotNil(t, armaBinaryPath) + + // Start Arma nodes + numOfArmaNodes := len(netInfo) + readyChan := make(chan string, numOfArmaNodes) + armaNetwork := testutil.RunArmaNodes(t, dir, armaBinaryPath, readyChan, netInfo) + defer armaNetwork.Stop() + + testutil.WaitReady(t, readyChan, numOfArmaNodes, 10) + + userConfig, err := testutil.GetUserConfig(dir, submittingParty) + require.NoError(t, err) + require.NotNil(t, userConfig) + + totalTxNumber := 100 + // rate limiter parameters + fillInterval := 10 * time.Millisecond + fillFrequency := 1000 / int(fillInterval.Milliseconds()) + rate := 500 + + capacity := rate / fillFrequency + rl, err := armageddon.NewRateLimiter(rate, fillInterval, capacity) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to start a rate limiter") + os.Exit(3) + } + + broadcastClient := client.NewBroadcastTxClient(userConfig, 10*time.Second) + signer, certBytes, err := testutil.LoadCryptoMaterialsFromDir(t, userConfig.MSPDir) + require.NoError(t, err) + + org := fmt.Sprintf("org%d", submittingParty) + + for i := range totalTxNumber { + status := rl.GetToken() + if !status { + fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1) + os.Exit(3) + } + txContent := tx.PrepareTxWithTimestamp(i, 64, []byte("sessionNumber")) + env := tx.CreateSignedStructuredEnvelope(txContent, signer, certBytes, org) + err = broadcastClient.SendTx(env) + require.NoError(t, err) + } + + var parties []types.PartyID + for i := 1; i <= numOfParties; i++ { + parties = append(parties, types.PartyID(i)) + } + + pullRequestSigner := signutil.CreateTestSigner(t, "org1", dir) + + statusUknown := common.Status_UNKNOWN + PullFromAssemblers(t, &BlockPullerOptions{ + UserConfig: userConfig, + Parties: parties, + Transactions: totalTxNumber, + Timeout: 60, + ErrString: "cancelled pull from assembler: %d; pull ended: failed to receive a deliver response: rpc error: code = Canceled desc = grpc: the client connection is closing", + Status: &statusUknown, + Signer: pullRequestSigner, + }) + + // Create config update + configUpdateBuilder, _ := configutil.NewConfigUpdateBuilder(t, dir, filepath.Join(dir, "bootstrap", "bootstrap.block")) + + autoRemoveTimeout := "100ms" + configUpdatePbData := configUpdateBuilder.UpdateBatchTimeouts(t, configutil.NewBatchTimeoutsConfig(configutil.BatchTimeoutsConfigName.AutoRemoveTimeout, autoRemoveTimeout)) + + // Submit config update + env := configutil.CreateConfigTX(t, dir, parties, int(submittingParty), configUpdatePbData) + require.NotNil(t, env) + + // Send the config tx + err = broadcastClient.SendTxTo(env, submittingParty) + require.NoError(t, err) + + broadcastClient.Stop() + + // Wait for Arma nodes to stop + testutil.WaitSoftStopped(t, netInfo) + + // Restart Arma nodes + armaNetwork.Stop() + + armaNetwork.Restart(t, readyChan) + defer armaNetwork.Stop() + + testutil.WaitReady(t, readyChan, numOfArmaNodes, 10) + + // Send transactions again and verify they are processed + broadcastClient = client.NewBroadcastTxClient(userConfig, 10*time.Second) + totalTxNumber += 1000 + + for i := range totalTxNumber { + status := rl.GetToken() + if !status { + fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1) + os.Exit(3) + } + txContent := tx.PrepareTxWithTimestamp(i+totalTxNumber, 64, []byte("sessionNumber")) + env := tx.CreateSignedStructuredEnvelope(txContent, signer, certBytes, org) + err = broadcastClient.SendTx(env) + require.NoError(t, err) + } + + broadcastClient.Stop() + + PullFromAssemblers(t, &BlockPullerOptions{ + UserConfig: userConfig, + Parties: parties, + Transactions: totalTxNumber + 1, // including config update tx + BlockHandler: &verifyTimeoutParam{ + AutoRemoveTimeout: autoRemoveTimeout, + }, + Timeout: 60, + ErrString: "cancelled pull from assembler: %d; pull ended: failed to receive a deliver response: rpc error: code = Canceled desc = grpc: the client connection is closing", + Status: &statusUknown, + Signer: pullRequestSigner, + }) +} + +type verifyTimeoutParam struct { + AutoRemoveTimeout string +} + +func (vt *verifyTimeoutParam) HandleBlock(t *testing.T, block *common.Block) error { + isGenesisBlock := block.Header.Number == 0 || block.Header.GetDataHash() == nil + if isGenesisBlock { + return nil + } + + if protoutil.IsConfigBlock(block) { + envelope, err := configutil.ReadConfigEnvelopeFromConfigBlock(block) + if err != nil || envelope == nil { + return fmt.Errorf("failed to read config envelope from config block: %w", err) + } + + sharedConfig := configutil.GetSharedConfig(t, envelope) + require.NotNil(t, sharedConfig) + + if vt.AutoRemoveTimeout != sharedConfig.BatchingConfig.BatchTimeouts.AutoRemoveTimeout { + return fmt.Errorf("AutoRemoveTimeout in the config block does not match the expected value. Expected: %s, Got: %s", vt.AutoRemoveTimeout, sharedConfig.BatchingConfig.BatchTimeouts.AutoRemoveTimeout) + } + return nil + } + + return nil +} + // prepareAddPartyConfigUpdate prepares a config update to add a new party to the network and returns the new party ID, // the updated network info with the new party's nodes, and the config update protobuf data. func prepareAddPartyConfigUpdate(t *testing.T, dir string, configUpdateBuilder *configutil.ConfigUpdateBuilder) (types.PartyID, map[testutil.NodeName]*testutil.ArmaNodeInfo) { diff --git a/testutil/configutil/config_update_utils.go b/testutil/configutil/config_update_utils.go index d9c5676f..9741bc96 100644 --- a/testutil/configutil/config_update_utils.go +++ b/testutil/configutil/config_update_utils.go @@ -948,6 +948,20 @@ func ReadConfigEnvelopeFromConfigBlock(configBlock *common.Block) (*common.Confi } func GetPartyConfig(t *testing.T, configEnvelope *common.ConfigEnvelope, partyID types.PartyID) *protos.PartyConfig { + sharedConfig := GetSharedConfig(t, configEnvelope) + partiesConfig := sharedConfig.GetPartiesConfig() + require.NotNil(t, partiesConfig) + + for _, partyConfig := range partiesConfig { + if partyConfig.PartyID == uint32(partyID) { + return partyConfig + } + } + + return nil +} + +func GetSharedConfig(t *testing.T, configEnvelope *common.ConfigEnvelope) *protos.SharedConfig { require.NotNil(t, configEnvelope) require.NotNil(t, configEnvelope.Config.GetChannelGroup().Groups["Orderer"].Values["ConsensusType"].GetValue()) @@ -960,16 +974,7 @@ func GetPartyConfig(t *testing.T, configEnvelope *common.ConfigEnvelope, partyID err = proto.Unmarshal(consensusType.GetMetadata(), &sharedConfig) require.NoError(t, err) - partiesConfig := sharedConfig.GetPartiesConfig() - require.NotNil(t, partiesConfig) - - for _, partyConfig := range partiesConfig { - if partyConfig.PartyID == uint32(partyID) { - return partyConfig - } - } - - return nil + return &sharedConfig } func getNestedJSONValue(t *testing.T, data map[string]any, path ...string) any {