@@ -1262,7 +1262,18 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
12621262 }
12631263
12641264 private def alterConfigs (servers : Seq [KafkaBroker ], adminClient : Admin , props : Properties ,
1265- perBrokerConfig : Boolean ): AlterConfigsResult = {
1265+ perBrokerConfig : Boolean ): Unit = {
1266+ alterConfigsAsync(servers, adminClient, props, perBrokerConfig).all.get()
1267+ // Skip config-provider placeholders (e.g. "${file:...}"): the broker stores the resolved value,
1268+ // so waiting on the literal placeholder would never match. Callers using placeholders must wait
1269+ // on the resolved value themselves.
1270+ props.asScala.foreach { case (k, v) =>
1271+ if (! v.contains(" ${" )) waitForConfig(k, v)
1272+ }
1273+ }
1274+
1275+ private def alterConfigsAsync (servers : Seq [KafkaBroker ], adminClient : Admin , props : Properties ,
1276+ perBrokerConfig : Boolean ): AlterConfigsResult = {
12661277 val configEntries = props.asScala.map { case (k, v) => new AlterConfigOp (new ConfigEntry (k, v), OpType .SET ) }.toList.asJava
12671278 val configs = if (perBrokerConfig) {
12681279 val alterConfigs = new java.util.HashMap [ConfigResource , java.util.Collection [AlterConfigOp ]]()
@@ -1277,7 +1288,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
12771288 }
12781289
12791290 private def reconfigureServers (newProps : Properties , perBrokerConfig : Boolean , aPropToVerify : (String , String ), expectFailure : Boolean = false ): Unit = {
1280- val alterResult = alterConfigs (servers, adminClients.head, newProps, perBrokerConfig)
1291+ val alterResult = alterConfigsAsync (servers, adminClients.head, newProps, perBrokerConfig)
12811292 if (expectFailure) {
12821293 val oldProps = servers.head.config.values.asScala.filter { case (k, _) => newProps.containsKey(k) }
12831294 val brokerResources = if (perBrokerConfig)
0 commit comments