Skip to content

Commit

Permalink
fix(issue2151): avoid using stale broker IPs for AutoBalancer consumer (
Browse files Browse the repository at this point in the history
#2152)

close #2151

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Nov 13, 2024
1 parent 351e1e0 commit ab249e0
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 22 deletions.
57 changes: 37 additions & 20 deletions core/src/main/java/kafka/autobalancer/LoadRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class LoadRetriever extends AbstractResumableService implements BrokerSta
private final Condition cond;
private final Controller controller;
private final ScheduledExecutorService mainExecutorService;
private final Set<Integer> brokerIdsInUse;
private final Map<Integer, BrokerEndpoints> bootstrapServerMapInUse;
private final Set<TopicPartition> currentAssignment = new HashSet<>();
private final StaticAutoBalancerConfig staticConfig;
private final String listenerName;
Expand All @@ -101,7 +101,7 @@ public LoadRetriever(AutoBalancerControllerConfig config, Controller controller,
this.controller = controller;
this.clusterModel = clusterModel;
this.bootstrapServerMap = new HashMap<>();
this.brokerIdsInUse = new HashSet<>();
this.bootstrapServerMapInUse = new HashMap<>();
this.lock = new ReentrantLock();
this.cond = lock.newCondition();
this.mainExecutorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("load-retriever-main"));
Expand Down Expand Up @@ -167,8 +167,8 @@ protected Properties buildConsumerProps(String bootstrapServer) {
public static class BrokerEndpoints {
private final int brokerId;
private Set<String> endpoints = new HashSet<>();

private boolean isFenced;
private boolean isOutdated = false;

public BrokerEndpoints(int brokerId) {
this.brokerId = brokerId;
Expand Down Expand Up @@ -199,6 +199,13 @@ public boolean isValid() {
return !this.isFenced && !this.endpoints.isEmpty();
}

public boolean isOutdated() {
return isOutdated;
}

public void setOutdated(boolean outdated) {
isOutdated = outdated;
}
}

@Override
Expand All @@ -221,7 +228,13 @@ public void onBrokerRegister(RegisterBrokerRecord record) {
BrokerEndpoints brokerEndpoints = new BrokerEndpoints(record.brokerId());
brokerEndpoints.setFenced(Utils.isBrokerFenced(record));
brokerEndpoints.setEndpoints(endpoints);
brokerEndpoints.setOutdated(false);
this.bootstrapServerMap.put(record.brokerId(), brokerEndpoints);
this.bootstrapServerMapInUse.computeIfPresent(record.brokerId(), (k, v) -> {
v.setOutdated(!v.getEndpoints().equals(endpoints));
v.setFenced(Utils.isBrokerFenced(record));
return v;
});
cond.signal();
} finally {
lock.unlock();
Expand All @@ -236,7 +249,6 @@ public void onBrokerUnregister(UnregisterBrokerRecord record) {
} finally {
lock.unlock();
}

}

@Override
Expand All @@ -245,31 +257,36 @@ public void onBrokerRegistrationChanged(BrokerRegistrationChangeRecord record) {
isBrokerFenced.ifPresent(isFenced -> {
lock.lock();
try {
BrokerEndpoints brokerEndpoints = this.bootstrapServerMap.get(record.brokerId());
if (brokerEndpoints != null) {
brokerEndpoints.setFenced(isFenced);
}
this.bootstrapServerMap.computeIfPresent(record.brokerId(), (k, v) -> {
v.setFenced(isFenced);
return v;
});
this.bootstrapServerMapInUse.computeIfPresent(record.brokerId(), (k, v) -> {
v.setFenced(isFenced);
return v;
});
cond.signal();
} finally {
lock.unlock();
}
});
}

private boolean hasAvailableBrokerInUse() {
if (brokerIdsInUse.isEmpty()) {
boolean hasAvailableBrokerInUse() {
if (bootstrapServerMapInUse.isEmpty()) {
return false;
}
for (int brokerId : brokerIdsInUse) {
BrokerEndpoints brokerEndpoints = this.bootstrapServerMap.get(brokerId);
if (brokerEndpoints != null && brokerEndpoints.isValid()) {
for (Map.Entry<Integer, BrokerEndpoints> entry : bootstrapServerMapInUse.entrySet()) {
int brokerId = entry.getKey();
BrokerEndpoints endpoints = entry.getValue();
if (bootstrapServerMap.containsKey(brokerId) && endpoints != null && endpoints.isValid() && !endpoints.isOutdated()) {
return true;
}
}
return false;
}

private boolean hasAvailableBroker() {
boolean hasAvailableBroker() {
if (this.bootstrapServerMap.isEmpty()) {
return false;
}
Expand All @@ -283,18 +300,17 @@ private boolean hasAvailableBroker() {

public String buildBootstrapServer() {
Set<String> endpoints = new HashSet<>();
this.brokerIdsInUse.clear();
this.bootstrapServerMapInUse.clear();
for (BrokerEndpoints brokerEndpoints : this.bootstrapServerMap.values()) {
if (brokerEndpoints.isValid() && !brokerEndpoints.getEndpoints().isEmpty()) {
endpoints.add(brokerEndpoints.getEndpoints().iterator().next());
this.brokerIdsInUse.add(brokerEndpoints.brokerId());
this.bootstrapServerMapInUse.put(brokerEndpoints.brokerId(), brokerEndpoints);
}
}
return String.join(",", endpoints);
}

private void checkAndCreateConsumer(int epoch) {
String bootstrapServer;
void checkAndCreateConsumer(int epoch) {
this.lock.lock();
try {
if (!isRunnable(epoch)) {
Expand All @@ -314,9 +330,10 @@ private void checkAndCreateConsumer(int epoch) {
return;
}
}
bootstrapServer = buildBootstrapServer();
if (this.consumer == null && !bootstrapServer.isEmpty()) {

if (this.consumer == null) {
//TODO: fetch metadata from controller
String bootstrapServer = buildBootstrapServer();
this.consumer = createConsumer(bootstrapServer);
logger.info("Created consumer on {}", bootstrapServer);
}
Expand Down
73 changes: 73 additions & 0 deletions core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.autobalancer;

import kafka.autobalancer.config.AutoBalancerControllerConfig;
import kafka.autobalancer.model.ClusterModel;

import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.List;

public class LoadRetrieverTest {

@Test
public void testBrokerChanged() {
LoadRetriever loadRetriever = Mockito.spy(new LoadRetriever(Mockito.mock(AutoBalancerControllerConfig.class), Mockito.mock(Controller.class), Mockito.mock(ClusterModel.class)));
loadRetriever.onBrokerRegister(new RegisterBrokerRecord().setBrokerId(0).setFenced(false).setEndPoints(
new RegisterBrokerRecord.BrokerEndpointCollection(List.of(
new RegisterBrokerRecord.BrokerEndpoint().setHost("192.168.0.0").setPort(9092)).iterator())));
loadRetriever.onBrokerRegister(new RegisterBrokerRecord().setBrokerId(1).setFenced(false).setEndPoints(
new RegisterBrokerRecord.BrokerEndpointCollection(List.of(
new RegisterBrokerRecord.BrokerEndpoint().setHost("192.168.0.1").setPort(9093)).iterator())));
loadRetriever.checkAndCreateConsumer(0);

Assertions.assertEquals(loadRetriever.buildBootstrapServer(), "192.168.0.1:9093,192.168.0.0:9092");
Assertions.assertTrue(loadRetriever.hasAvailableBrokerInUse());
Assertions.assertTrue(loadRetriever.hasAvailableBroker());

loadRetriever.onBrokerRegistrationChanged(new BrokerRegistrationChangeRecord().setBrokerId(0).setFenced(BrokerRegistrationFencingChange.FENCE.value()));
Assertions.assertTrue(loadRetriever.hasAvailableBrokerInUse());
Assertions.assertTrue(loadRetriever.hasAvailableBroker());

loadRetriever.onBrokerRegistrationChanged(new BrokerRegistrationChangeRecord().setBrokerId(1).setFenced(BrokerRegistrationFencingChange.FENCE.value()));
Assertions.assertFalse(loadRetriever.hasAvailableBrokerInUse());
Assertions.assertFalse(loadRetriever.hasAvailableBroker());

loadRetriever.onBrokerRegistrationChanged(new BrokerRegistrationChangeRecord().setBrokerId(1).setFenced(BrokerRegistrationFencingChange.UNFENCE.value()));
Assertions.assertEquals(loadRetriever.buildBootstrapServer(), "192.168.0.1:9093");
Assertions.assertTrue(loadRetriever.hasAvailableBrokerInUse());
Assertions.assertTrue(loadRetriever.hasAvailableBroker());

loadRetriever.onBrokerRegister(new RegisterBrokerRecord().setBrokerId(1).setFenced(false).setEndPoints(
new RegisterBrokerRecord.BrokerEndpointCollection(List.of(
new RegisterBrokerRecord.BrokerEndpoint().setHost("192.168.0.2").setPort(9094)).iterator())));
Assertions.assertFalse(loadRetriever.hasAvailableBrokerInUse());
Assertions.assertTrue(loadRetriever.hasAvailableBroker());

Assertions.assertEquals(loadRetriever.buildBootstrapServer(), "192.168.0.2:9094");
Assertions.assertTrue(loadRetriever.hasAvailableBrokerInUse());
Assertions.assertTrue(loadRetriever.hasAvailableBroker());

loadRetriever.onBrokerUnregister(new UnregisterBrokerRecord().setBrokerId(1));
Assertions.assertFalse(loadRetriever.hasAvailableBrokerInUse());
Assertions.assertFalse(loadRetriever.hasAvailableBroker());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@

import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.stats.AsyncLRUCacheStats;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An asynchronous LRU cache that supports asynchronous value computation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.automq.stream.s3.cache.ObjectReaderLRUCache;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.operator.ObjectStorage;

import java.util.concurrent.atomic.AtomicReference;

public class DefaultObjectReaderFactory implements ObjectReaderFactory {
Expand Down

0 comments on commit ab249e0

Please sign in to comment.