@@ -23,7 +23,7 @@ import kafka.log.remote.RemoteLogManager
2323import kafka .log .{LogManager , UnifiedLog }
2424import kafka .server .HostedPartition .Online
2525import kafka .server .QuotaFactory .QuotaManagers
26- import kafka .server .ReplicaManager .{AtMinIsrPartitionCountMetricName , FailedIsrUpdatesPerSecMetricName , IsrExpandsPerSecMetricName , IsrShrinksPerSecMetricName , LeaderCountMetricName , OfflineReplicaCountMetricName , PartitionCountMetricName , PartitionsWithLateTransactionsCountMetricName , ProducerIdCountMetricName , ReassigningPartitionsMetricName , StopReplicaPartitionState , UnderMinIsrPartitionCountMetricName , UnderReplicatedPartitionsMetricName , createLogReadResult , isListOffsetsTimestampUnsupported }
26+ import kafka .server .ReplicaManager .{AtMinIsrPartitionCountMetricName , FailedIsrUpdatesPerSecMetricName , IsrExpandsPerSecMetricName , IsrShrinksPerSecMetricName , LeaderCountMetricName , OfflineReplicaCountMetricName , PartitionCountMetricName , PartitionsWithLateTransactionsCountMetricName , ProducerIdCountMetricName , ReassigningPartitionsMetricName , UnderMinIsrPartitionCountMetricName , UnderReplicatedPartitionsMetricName , createLogReadResult , isListOffsetsTimestampUnsupported }
2727import kafka .server .share .DelayedShareFetch
2828import kafka .utils ._
2929import org .apache .kafka .common .errors ._
@@ -48,7 +48,6 @@ import org.apache.kafka.common.requests._
4848import org .apache .kafka .common .utils .{Exit , Time }
4949import org .apache .kafka .common .{IsolationLevel , Node , TopicIdPartition , TopicPartition , Uuid }
5050import org .apache .kafka .image .{LocalReplicaChanges , MetadataImage , TopicsDelta }
51- import org .apache .kafka .metadata .LeaderAndIsr
5251import org .apache .kafka .metadata .LeaderConstants .NO_LEADER
5352import org .apache .kafka .server .{ActionQueue , DelayedActionQueue , common }
5453import org .apache .kafka .server .common .{DirectoryEventHandler , RequestLocal , StopPartition , TopicOptionalIdPartition }
@@ -258,8 +257,6 @@ object ReplicaManager {
258257 timestamp < 0 &&
259258 (! timestampMinSupportedVersion.contains(timestamp) || version < timestampMinSupportedVersion(timestamp))
260259 }
261-
262- case class StopReplicaPartitionState (leaderEpoch : Int , deletePartition : Boolean )
263260}
264261
265262class ReplicaManager (val config : KafkaConfig ,
@@ -481,93 +478,6 @@ class ReplicaManager(val config: KafkaConfig,
481478 }
482479 }
483480
484- def stopReplicas (correlationId : Int ,
485- controllerId : Int ,
486- controllerEpoch : Int ,
487- partitionStates : Map [TopicPartition , StopReplicaPartitionState ]
488- ): (mutable.Map [TopicPartition , Errors ], Errors ) = {
489- replicaStateChangeLock synchronized {
490- stateChangeLogger.info(s " Handling StopReplica request correlationId $correlationId from controller " +
491- s " $controllerId for ${partitionStates.size} partitions " )
492- if (stateChangeLogger.isTraceEnabled)
493- partitionStates.foreachEntry { (topicPartition, partitionState) =>
494- stateChangeLogger.trace(s " Received StopReplica request $partitionState " +
495- s " correlation id $correlationId from controller $controllerId " +
496- s " epoch $controllerEpoch for partition $topicPartition" )
497- }
498-
499- val responseMap = new collection.mutable.HashMap [TopicPartition , Errors ]
500- if (controllerEpoch < this .controllerEpoch) {
501- stateChangeLogger.warn(s " Ignoring StopReplica request from " +
502- s " controller $controllerId with correlation id $correlationId " +
503- s " since its controller epoch $controllerEpoch is old. " +
504- s " Latest known controller epoch is ${this .controllerEpoch}" )
505- (responseMap, Errors .STALE_CONTROLLER_EPOCH )
506- } else {
507- this .controllerEpoch = controllerEpoch
508-
509- val stoppedPartitions = mutable.Buffer .empty[StopPartition ]
510- partitionStates.foreachEntry { (topicPartition, partitionState) =>
511- val deletePartition = partitionState.deletePartition
512-
513- getPartition(topicPartition) match {
514- case HostedPartition .Offline (_) =>
515- stateChangeLogger.warn(s " Ignoring StopReplica request (delete= $deletePartition) from " +
516- s " controller $controllerId with correlation id $correlationId " +
517- s " epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
518- " partition is in an offline log directory" )
519- responseMap.put(topicPartition, Errors .KAFKA_STORAGE_ERROR )
520-
521- case HostedPartition .Online (partition) =>
522- val currentLeaderEpoch = partition.getLeaderEpoch
523- val requestLeaderEpoch = partitionState.leaderEpoch
524- // When a topic is deleted, the leader epoch is not incremented. To circumvent this,
525- // a sentinel value (EpochDuringDelete) overwriting any previous epoch is used.
526- // When an older version of the StopReplica request which does not contain the leader
527- // epoch, a sentinel value (NoEpoch) is used and bypass the epoch validation.
528- if (requestLeaderEpoch == LeaderAndIsr .EPOCH_DURING_DELETE ||
529- requestLeaderEpoch == LeaderAndIsr .NO_EPOCH ||
530- requestLeaderEpoch >= currentLeaderEpoch) {
531- stoppedPartitions += new StopPartition (topicPartition, deletePartition,
532- deletePartition && partition.isLeader && requestLeaderEpoch == LeaderAndIsr .EPOCH_DURING_DELETE , false )
533- // Assume that everything will go right. It is overwritten in case of an error.
534- responseMap.put(topicPartition, Errors .NONE )
535- } else {
536- stateChangeLogger.warn(s " Ignoring StopReplica request (delete= $deletePartition) from " +
537- s " controller $controllerId with correlation id $correlationId " +
538- s " epoch $controllerEpoch for partition $topicPartition since its associated " +
539- s " leader epoch $requestLeaderEpoch is smaller than the current " +
540- s " leader epoch $currentLeaderEpoch" )
541- responseMap.put(topicPartition, Errors .FENCED_LEADER_EPOCH )
542- }
543-
544- case HostedPartition .None =>
545- // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
546- // This could happen when topic is being deleted while broker is down and recovers.
547- stoppedPartitions += new StopPartition (topicPartition, deletePartition, false , false )
548- responseMap.put(topicPartition, Errors .NONE )
549- }
550- }
551-
552- stopPartitions(stoppedPartitions.toSet).foreach { case (topicPartition, e) =>
553- if (e.isInstanceOf [KafkaStorageException ]) {
554- stateChangeLogger.error(s " Ignoring StopReplica request (delete=true) from " +
555- s " controller $controllerId with correlation id $correlationId " +
556- s " epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
557- " partition is in an offline log directory" )
558- } else {
559- stateChangeLogger.error(s " Ignoring StopReplica request (delete=true) from " +
560- s " controller $controllerId with correlation id $correlationId " +
561- s " epoch $controllerEpoch for partition $topicPartition due to an unexpected " +
562- s " ${e.getClass.getName} exception: ${e.getMessage}" )
563- }
564- responseMap.put(topicPartition, Errors .forException(e))
565- }
566- (responseMap, Errors .NONE )
567- }
568- }
569- }
570-
571481 /**
572482 * Stop the given partitions.
573483 *
0 commit comments