Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

[cleanup] Create only one instance of KafkaTopicLookupService #1721

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
private final KopBrokerLookupManager kopBrokerLookupManager;
@Getter
private final KafkaTopicManagerSharedState kafkaTopicManagerSharedState;
private final KafkaTopicLookupService kafkaTopicLookupService;
private final LookupClient lookupClient;

private final AdminManager adminManager;
Expand Down Expand Up @@ -82,6 +83,7 @@ public KafkaChannelInitializer(PulsarService pulsarService,
RequestStats requestStats,
OrderedScheduler sendResponseScheduler,
KafkaTopicManagerSharedState kafkaTopicManagerSharedState,
KafkaTopicLookupService kafkaTopicLookupService,
LookupClient lookupClient) {
super();
this.pulsarService = pulsarService;
Expand All @@ -104,6 +106,7 @@ public KafkaChannelInitializer(PulsarService pulsarService,
}
this.sendResponseScheduler = sendResponseScheduler;
this.kafkaTopicManagerSharedState = kafkaTopicManagerSharedState;
this.kafkaTopicLookupService = kafkaTopicLookupService;
this.lengthFieldPrepender = new LengthFieldPrepender(4);
}

Expand All @@ -130,7 +133,7 @@ public KafkaRequestHandler newCnx() throws Exception {
tenantContextManager, replicaManager, kopBrokerLookupManager, adminManager,
producePurgatory, fetchPurgatory,
enableTls, advertisedEndPoint, skipMessagesWithoutIndex, requestStats, sendResponseScheduler,
kafkaTopicManagerSharedState, lookupClient);
kafkaTopicManagerSharedState, kafkaTopicLookupService, lookupClient);
}

@VisibleForTesting
Expand All @@ -141,6 +144,6 @@ public KafkaRequestHandler newCnx(final TenantContextManager tenantContextManage
enableTls, advertisedEndPoint, skipMessagesWithoutIndex,
requestStats,
sendResponseScheduler,
kafkaTopicManagerSharedState, lookupClient);
kafkaTopicManagerSharedState, kafkaTopicLookupService, lookupClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
private SystemTopicClient txnTopicClient;
private DelayedOperationPurgatory<DelayedOperation> producePurgatory;
private DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;
private KafkaTopicLookupService kafkaTopicLookupService;
private LookupClient lookupClient;
@VisibleForTesting
@Getter
Expand Down Expand Up @@ -400,6 +401,7 @@ private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoi
requestStats,
sendResponseScheduler,
kafkaTopicManagerSharedState,
kafkaTopicLookupService,
lookupClient);
}

Expand All @@ -418,6 +420,8 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
.timeoutTimer(SystemTimer.builder().executorName("fetch").build())
.build();

kafkaTopicLookupService = new KafkaTopicLookupService(brokerService);

replicaManager = new ReplicaManager(
kafkaConfig,
requestStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
RequestStats requestStats,
OrderedScheduler sendResponseScheduler,
KafkaTopicManagerSharedState kafkaTopicManagerSharedState,
KafkaTopicLookupService kafkaTopicLookupService,
LookupClient lookupClient) throws Exception {
super(requestStats, kafkaConfig, sendResponseScheduler);
this.pulsarService = pulsarService;
Expand All @@ -343,7 +344,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
this.tlsEnabled = tlsEnabled;
this.advertisedEndPoint = advertisedEndPoint;
this.skipMessagesWithoutIndex = skipMessagesWithoutIndex;
this.topicManager = new KafkaTopicManager(this);
this.topicManager = new KafkaTopicManager(this, kafkaTopicLookupService);
this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions();
this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum();
this.currentConnectedGroup = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.net.SocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -41,14 +42,14 @@ public class KafkaTopicManager {

private final AtomicBoolean closed = new AtomicBoolean(false);

KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler) {
KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler, KafkaTopicLookupService kafkaTopicLookupService) {
this.requestHandler = kafkaRequestHandler;
PulsarService pulsarService = kafkaRequestHandler.getPulsarService();
this.brokerService = pulsarService.getBrokerService();
this.internalServerCnx = new InternalServerCnx(requestHandler);
this.lookupClient = kafkaRequestHandler.getLookupClient();
this.kafkaTopicLookupService = new KafkaTopicLookupService(pulsarService.getBrokerService());
}
this.kafkaTopicLookupService = kafkaTopicLookupService;
}

// update Ctx information, since at internalServerCnx create time there is no ctx passed into kafkaRequestHandler.
public void setRemoteAddress(SocketAddress remoteAddress) {
Expand Down Expand Up @@ -101,12 +102,12 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri

private Producer registerInPersistentTopic(PersistentTopic persistentTopic) {
Producer producer = new InternalProducer(persistentTopic, internalServerCnx,
lookupClient.getPulsarClient().newRequestId(),
brokerService.generateUniqueProducerName());
lookupClient.getPulsarClient().newRequestId(),
brokerService.generateUniqueProducerName());

if (log.isDebugEnabled()) {
log.debug("[{}] Register Mock Producer {} into PersistentTopic {}",
requestHandler.ctx.channel(), producer, persistentTopic.getName());
requestHandler.ctx.channel(), producer, persistentTopic.getName());
}

// this will register and add USAGE_COUNT_UPDATER.
Expand All @@ -122,8 +123,9 @@ public Optional<Producer> registerProducerInPersistentTopic(String topicName, Pe
}
return Optional.empty();
}
return Optional.of(requestHandler.getKafkaTopicManagerSharedState()
.getReferences().computeIfAbsent(topicName, (__) -> registerInPersistentTopic(persistentTopic)));
ConcurrentHashMap<String, Producer> references = requestHandler
.getKafkaTopicManagerSharedState().getReferences();
return Optional.of(references.computeIfAbsent(topicName, (__) -> registerInPersistentTopic(persistentTopic)));
}

// when channel close, release all the topics reference in persistentTopic
Expand All @@ -141,18 +143,23 @@ public void close() {
}

public CompletableFuture<Optional<PersistentTopic>> getTopic(String topicName) {
if (closed.get()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Return null for getTopic({}) since channel is closing",
requestHandler.ctx.channel(), topicName);
try {
if (closed.get()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Return null for getTopic({}) since channel is closing",
requestHandler.ctx.channel(), topicName);
}
return CompletableFuture.completedFuture(Optional.empty());
}
CompletableFuture<Optional<PersistentTopic>> topicCompletableFuture =
kafkaTopicLookupService.getTopic(topicName, requestHandler.ctx.channel());
// cache for removing producer
requestHandler.getKafkaTopicManagerSharedState().getTopics().put(topicName, topicCompletableFuture);
return topicCompletableFuture;
} catch (Throwable error) {
log.error("Unhandled error for {}", topicName, error);
return CompletableFuture.completedFuture(Optional.empty());
}
CompletableFuture<Optional<PersistentTopic>> topicCompletableFuture =
kafkaTopicLookupService.getTopic(topicName, requestHandler.ctx.channel());
// cache for removing producer
requestHandler.getKafkaTopicManagerSharedState().getTopics().put(topicName, topicCompletableFuture);
return topicCompletableFuture;
}

public void invalidateCacheForFencedManagerLedgerOnTopic(String fullTopicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ protected void setup() throws Exception {
doReturn(mockChannel).when(mockCtx).channel();
kafkaRequestHandler.ctx = mockCtx;

kafkaTopicManager = new KafkaTopicManager(kafkaRequestHandler);
kafkaTopicManager = new KafkaTopicManager(kafkaRequestHandler,
new KafkaTopicLookupService(pulsar.getBrokerService()));
kafkaTopicManager.setRemoteAddress(InternalServerCnx.MOCKED_REMOTE_ADDRESS);
}

Expand Down