From 94ef69ee097999ecf79936d32cf9439dbeec1d2f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 30 Oct 2025 00:48:26 -0700 Subject: [PATCH 1/4] Extract common methods to KafkaOffsetReaderBase --- .../sql/kafka010/KafkaOffsetReader.scala | 90 +++++++++++++++++++ .../sql/kafka010/KafkaOffsetReaderAdmin.scala | 83 +---------------- .../kafka010/KafkaOffsetReaderConsumer.scala | 83 +---------------- 3 files changed, 92 insertions(+), 164 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index df0c7e9c0425..bb61ea3fac23 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -19,12 +19,17 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.TopicPartition +import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging +import org.apache.spark.internal.LogKeys.TOPIC_PARTITION_OFFSET +import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset +import org.apache.spark.util.ArrayImplicits._ /** * Base trait to fetch offsets from Kafka. The implementations are @@ -167,3 +172,88 @@ private[kafka010] object KafkaOffsetReader extends Logging { } } } + +private[kafka010] abstract class KafkaOffsetReaderBase extends KafkaOffsetReader with Logging { + protected val rangeCalculator: KafkaOffsetRangeCalculator + + private def getSortedExecutorList: Array[String] = { + def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = { + if (a.host == b.host) { + a.executorId > b.executorId + } else { + a.host > b.host + } + } + + val bm = SparkEnv.get.blockManager + bm.master.getPeers(bm.blockManagerId).toArray + .map(x => ExecutorCacheTaskLocation(x.host, x.executorId)) + .sortWith(compare) + .map(_.toString) + } + + override def getOffsetRangesFromResolvedOffsets( + fromPartitionOffsets: PartitionOffsetMap, + untilPartitionOffsets: PartitionOffsetMap, + reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = { + // Find the new partitions, and get their earliest offsets + val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet) + val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq) + if (newPartitionInitialOffsets.keySet != newPartitions) { + // We cannot get from offsets for some partitions. It means they got deleted. + val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet) + reportDataLoss( + s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed", + () => + KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions)) + } + logInfo(log"Partitions added: ${MDC(TOPIC_PARTITION_OFFSET, newPartitionInitialOffsets)}") + newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => + reportDataLoss( + s"Added partition $p starts from $o instead of 0. Some data may have been missed", + () => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o)) + } + + val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet) + if (deletedPartitions.nonEmpty) { + val (message, config) = + if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + (s"$deletedPartitions are gone.${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}", + Some(ConsumerConfig.GROUP_ID_CONFIG)) + } else { + (s"$deletedPartitions are gone. Some data may have been missed.", None) + } + + reportDataLoss( + message, + () => + KafkaExceptions.partitionsDeleted(deletedPartitions, config)) + } + + // Use the until partitions to calculate offset ranges to ignore partitions that have + // been deleted + val topicPartitions = untilPartitionOffsets.keySet.filter { tp => + // Ignore partitions that we don't know the from offsets. + newPartitionInitialOffsets.contains(tp) || fromPartitionOffsets.contains(tp) + }.toSeq + logDebug("TopicPartitions: " + topicPartitions.mkString(", ")) + + val fromOffsets = fromPartitionOffsets ++ newPartitionInitialOffsets + val untilOffsets = untilPartitionOffsets + val ranges = topicPartitions.map { tp => + val fromOffset = fromOffsets(tp) + val untilOffset = untilOffsets(tp) + if (untilOffset < fromOffset) { + reportDataLoss( + s"Partition $tp's offset was changed from " + + s"$fromOffset to $untilOffset. This could be either 1) a user error that the start " + + "offset is set beyond available offset when starting query, or 2) the kafka " + + "topic-partition is deleted and re-created.", + () => + KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset)) + } + KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None) + } + rangeCalculator.getRanges(ranges, getSortedExecutorList.toImmutableArraySeq) + } +} diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala index 7420c2c1055b..a2fa6c9be921 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala @@ -52,7 +52,7 @@ private[kafka010] class KafkaOffsetReaderAdmin( consumerStrategy: ConsumerStrategy, override val driverKafkaParams: ju.Map[String, Object], readerOptions: CaseInsensitiveMap[String], - driverGroupIdPrefix: String) extends KafkaOffsetReader with Logging { + driverGroupIdPrefix: String) extends KafkaOffsetReaderBase { private[kafka010] val maxOffsetFetchAttempts = readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, "3").toInt @@ -442,87 +442,6 @@ private[kafka010] class KafkaOffsetReaderAdmin( } } - private def getSortedExecutorList: Array[String] = { - def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = { - if (a.host == b.host) { - a.executorId > b.executorId - } else { - a.host > b.host - } - } - - val bm = SparkEnv.get.blockManager - bm.master.getPeers(bm.blockManagerId).toArray - .map(x => ExecutorCacheTaskLocation(x.host, x.executorId)) - .sortWith(compare) - .map(_.toString) - } - - override def getOffsetRangesFromResolvedOffsets( - fromPartitionOffsets: PartitionOffsetMap, - untilPartitionOffsets: PartitionOffsetMap, - reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = { - // Find the new partitions, and get their earliest offsets - val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet) - val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq) - if (newPartitionInitialOffsets.keySet != newPartitions) { - // We cannot get from offsets for some partitions. It means they got deleted. - val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet) - reportDataLoss( - s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed", - () => - KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions)) - } - logInfo(log"Partitions added: ${MDC(TOPIC_PARTITION_OFFSET, newPartitionInitialOffsets)}") - newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => - reportDataLoss( - s"Added partition $p starts from $o instead of 0. Some data may have been missed", - () => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o)) - } - - val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet) - if (deletedPartitions.nonEmpty) { - val (message, config) = - if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - (s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}", - Some(ConsumerConfig.GROUP_ID_CONFIG)) - } else { - (s"$deletedPartitions are gone. Some data may have been missed.", None) - } - - reportDataLoss( - message, - () => - KafkaExceptions.partitionsDeleted(deletedPartitions, config)) - } - - // Use the until partitions to calculate offset ranges to ignore partitions that have - // been deleted - val topicPartitions = untilPartitionOffsets.keySet.filter { tp => - // Ignore partitions that we don't know the from offsets. - newPartitionInitialOffsets.contains(tp) || fromPartitionOffsets.contains(tp) - }.toSeq - logDebug("TopicPartitions: " + topicPartitions.mkString(", ")) - - val fromOffsets = fromPartitionOffsets ++ newPartitionInitialOffsets - val untilOffsets = untilPartitionOffsets - val ranges = topicPartitions.map { tp => - val fromOffset = fromOffsets(tp) - val untilOffset = untilOffsets(tp) - if (untilOffset < fromOffset) { - reportDataLoss( - s"Partition $tp's offset was changed from " + - s"$fromOffset to $untilOffset. This could be either 1) a user error that the start " + - "offset is set beyond available offset when starting query, or 2) the kafka " + - "topic-partition is deleted and re-created.", - () => - KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset)) - } - KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None) - } - rangeCalculator.getRanges(ranges, getSortedExecutorList.toImmutableArraySeq) - } - private def partitionsAssignedToAdmin( body: ju.Set[TopicPartition] => Map[TopicPartition, Long]) : Map[TopicPartition, Long] = { diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala index b7ac9a171c57..f10b9e82116c 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala @@ -52,7 +52,7 @@ private[kafka010] class KafkaOffsetReaderConsumer( consumerStrategy: ConsumerStrategy, override val driverKafkaParams: ju.Map[String, Object], readerOptions: CaseInsensitiveMap[String], - driverGroupIdPrefix: String) extends KafkaOffsetReader with Logging { + driverGroupIdPrefix: String) extends KafkaOffsetReaderBase { /** * [[UninterruptibleThreadRunner]] ensures that all @@ -491,87 +491,6 @@ private[kafka010] class KafkaOffsetReaderConsumer( } } - private def getSortedExecutorList(): Array[String] = { - def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = { - if (a.host == b.host) { - a.executorId > b.executorId - } else { - a.host > b.host - } - } - - val bm = SparkEnv.get.blockManager - bm.master.getPeers(bm.blockManagerId).toArray - .map(x => ExecutorCacheTaskLocation(x.host, x.executorId)) - .sortWith(compare) - .map(_.toString) - } - - override def getOffsetRangesFromResolvedOffsets( - fromPartitionOffsets: PartitionOffsetMap, - untilPartitionOffsets: PartitionOffsetMap, - reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = { - // Find the new partitions, and get their earliest offsets - val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet) - val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq) - if (newPartitionInitialOffsets.keySet != newPartitions) { - // We cannot get from offsets for some partitions. It means they got deleted. - val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet) - reportDataLoss( - s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed", - () => - KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions)) - } - logInfo(log"Partitions added: ${MDC(TOPIC_PARTITION_OFFSET, newPartitionInitialOffsets)}") - newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => - reportDataLoss( - s"Added partition $p starts from $o instead of 0. Some data may have been missed", - () => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o)) - } - - val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet) - if (deletedPartitions.nonEmpty) { - val (message, config) = - if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - (s"$deletedPartitions are gone.${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}", - Some(ConsumerConfig.GROUP_ID_CONFIG)) - } else { - (s"$deletedPartitions are gone. Some data may have been missed.", None) - } - - reportDataLoss( - message, - () => - KafkaExceptions.partitionsDeleted(deletedPartitions, config)) - } - - // Use the until partitions to calculate offset ranges to ignore partitions that have - // been deleted - val topicPartitions = untilPartitionOffsets.keySet.filter { tp => - // Ignore partitions that we don't know the from offsets. - newPartitionInitialOffsets.contains(tp) || fromPartitionOffsets.contains(tp) - }.toSeq - logDebug("TopicPartitions: " + topicPartitions.mkString(", ")) - - val fromOffsets = fromPartitionOffsets ++ newPartitionInitialOffsets - val untilOffsets = untilPartitionOffsets - val ranges = topicPartitions.map { tp => - val fromOffset = fromOffsets(tp) - val untilOffset = untilOffsets(tp) - if (untilOffset < fromOffset) { - reportDataLoss( - s"Partition $tp's offset was changed from " + - s"$fromOffset to $untilOffset. This could be either 1) a user error that the start " + - "offset is set beyond available offset when starting query, or 2) the kafka " + - "topic-partition is deleted and re-created.", - () => - KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset)) - } - KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None) - } - rangeCalculator.getRanges(ranges, getSortedExecutorList().toImmutableArraySeq) - } - private def partitionsAssignedToConsumer( body: ju.Set[TopicPartition] => Map[TopicPartition, Long], fetchingEarliestOffset: Boolean = false) From 7202be02694eb04221e0acd1520a6350fe801b75 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 30 Oct 2025 09:14:21 -0700 Subject: [PATCH 2/4] Remove unused imports --- .../apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala | 5 +---- .../spark/sql/kafka010/KafkaOffsetReaderConsumer.scala | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala index a2fa6c9be921..187fbbb078ef 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala @@ -29,10 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.{IsolationLevel, TopicPartition} import org.apache.kafka.common.requests.OffsetFetchResponse -import org.apache.spark.SparkEnv -import org.apache.spark.internal.Logging -import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS, TOPIC_PARTITION_OFFSET} -import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset import org.apache.spark.util.ArrayImplicits._ diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala index f10b9e82116c..a92cc03be232 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala @@ -26,10 +26,7 @@ import scala.util.control.NonFatal import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, OffsetAndTimestamp} import org.apache.kafka.common.TopicPartition -import org.apache.spark.SparkEnv -import org.apache.spark.internal.Logging -import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS, TOPIC_PARTITION_OFFSET} -import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset import org.apache.spark.util.{UninterruptibleThread, UninterruptibleThreadRunner} From 885a411c611b8d72abec28b2514bc548273f581d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 30 Oct 2025 10:43:41 -0700 Subject: [PATCH 3/4] fix variable scope --- .../org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala | 2 +- .../apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala index 187fbbb078ef..ee674f34d9cb 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala @@ -99,7 +99,7 @@ private[kafka010] class KafkaOffsetReaderAdmin( private val maxRecordsPerPartition = readerOptions.get(KafkaSourceProvider.MAX_RECORDS_PER_PARTITION_OPTION_KEY).map(_.toLong) - private val rangeCalculator = + override protected val rangeCalculator = new KafkaOffsetRangeCalculator(minPartitions, maxRecordsPerPartition) /** diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala index a92cc03be232..5196e967399c 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala @@ -98,7 +98,7 @@ private[kafka010] class KafkaOffsetReaderConsumer( private val maxRecordsPerPartition = readerOptions.get(KafkaSourceProvider.MAX_RECORDS_PER_PARTITION_OPTION_KEY).map(_.toLong) - private val rangeCalculator = + override protected val rangeCalculator = new KafkaOffsetRangeCalculator(minPartitions, maxRecordsPerPartition) private[kafka010] val offsetFetchAttemptIntervalMs = From 7c7167fe9806816a4ba70170aa8831c404938003 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 30 Oct 2025 10:46:41 -0700 Subject: [PATCH 4/4] fix style --- .../spark/sql/kafka010/KafkaOffsetReader.scala | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index bb61ea3fac23..b8624c5ae637 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -193,9 +193,9 @@ private[kafka010] abstract class KafkaOffsetReaderBase extends KafkaOffsetReader } override def getOffsetRangesFromResolvedOffsets( - fromPartitionOffsets: PartitionOffsetMap, - untilPartitionOffsets: PartitionOffsetMap, - reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = { + fromPartitionOffsets: PartitionOffsetMap, + untilPartitionOffsets: PartitionOffsetMap, + reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = { // Find the new partitions, and get their earliest offsets val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet) val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq) @@ -204,8 +204,7 @@ private[kafka010] abstract class KafkaOffsetReaderBase extends KafkaOffsetReader val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet) reportDataLoss( s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed", - () => - KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions)) + () => KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions)) } logInfo(log"Partitions added: ${MDC(TOPIC_PARTITION_OFFSET, newPartitionInitialOffsets)}") newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => @@ -226,8 +225,7 @@ private[kafka010] abstract class KafkaOffsetReaderBase extends KafkaOffsetReader reportDataLoss( message, - () => - KafkaExceptions.partitionsDeleted(deletedPartitions, config)) + () => KafkaExceptions.partitionsDeleted(deletedPartitions, config)) } // Use the until partitions to calculate offset ranges to ignore partitions that have @@ -249,8 +247,7 @@ private[kafka010] abstract class KafkaOffsetReaderBase extends KafkaOffsetReader s"$fromOffset to $untilOffset. This could be either 1) a user error that the start " + "offset is set beyond available offset when starting query, or 2) the kafka " + "topic-partition is deleted and re-created.", - () => - KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset)) + () => KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset)) } KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None) }