Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@ package org.apache.spark.sql.kafka010

import java.{util => ju}

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.TOPIC_PARTITION_OFFSET
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
import org.apache.spark.util.ArrayImplicits._

/**
* Base trait to fetch offsets from Kafka. The implementations are
Expand Down Expand Up @@ -167,3 +172,85 @@ private[kafka010] object KafkaOffsetReader extends Logging {
}
}
}

private[kafka010] abstract class KafkaOffsetReaderBase extends KafkaOffsetReader with Logging {
protected val rangeCalculator: KafkaOffsetRangeCalculator

private def getSortedExecutorList: Array[String] = {
def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
if (a.host == b.host) {
a.executorId > b.executorId
} else {
a.host > b.host
}
}

val bm = SparkEnv.get.blockManager
bm.master.getPeers(bm.blockManagerId).toArray
.map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
.sortWith(compare)
.map(_.toString)
}

override def getOffsetRangesFromResolvedOffsets(
fromPartitionOffsets: PartitionOffsetMap,
untilPartitionOffsets: PartitionOffsetMap,
reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = {
// Find the new partitions, and get their earliest offsets
val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq)
if (newPartitionInitialOffsets.keySet != newPartitions) {
// We cannot get from offsets for some partitions. It means they got deleted.
val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
reportDataLoss(
s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed",
() => KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions))
}
logInfo(log"Partitions added: ${MDC(TOPIC_PARTITION_OFFSET, newPartitionInitialOffsets)}")
newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
reportDataLoss(
s"Added partition $p starts from $o instead of 0. Some data may have been missed",
() => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o))
}

val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
if (deletedPartitions.nonEmpty) {
val (message, config) =
if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
(s"$deletedPartitions are gone.${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}",
Some(ConsumerConfig.GROUP_ID_CONFIG))
} else {
(s"$deletedPartitions are gone. Some data may have been missed.", None)
}

reportDataLoss(
message,
() => KafkaExceptions.partitionsDeleted(deletedPartitions, config))
}

// Use the until partitions to calculate offset ranges to ignore partitions that have
// been deleted
val topicPartitions = untilPartitionOffsets.keySet.filter { tp =>
// Ignore partitions that we don't know the from offsets.
newPartitionInitialOffsets.contains(tp) || fromPartitionOffsets.contains(tp)
}.toSeq
logDebug("TopicPartitions: " + topicPartitions.mkString(", "))

val fromOffsets = fromPartitionOffsets ++ newPartitionInitialOffsets
val untilOffsets = untilPartitionOffsets
val ranges = topicPartitions.map { tp =>
val fromOffset = fromOffsets(tp)
val untilOffset = untilOffsets(tp)
if (untilOffset < fromOffset) {
reportDataLoss(
s"Partition $tp's offset was changed from " +
s"$fromOffset to $untilOffset. This could be either 1) a user error that the start " +
"offset is set beyond available offset when starting query, or 2) the kafka " +
"topic-partition is deleted and re-created.",
() => KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset))
}
KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None)
}
rangeCalculator.getRanges(ranges, getSortedExecutorList.toImmutableArraySeq)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.common.requests.OffsetFetchResponse

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS, TOPIC_PARTITION_OFFSET}
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
import org.apache.spark.util.ArrayImplicits._
Expand All @@ -52,7 +49,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
consumerStrategy: ConsumerStrategy,
override val driverKafkaParams: ju.Map[String, Object],
readerOptions: CaseInsensitiveMap[String],
driverGroupIdPrefix: String) extends KafkaOffsetReader with Logging {
driverGroupIdPrefix: String) extends KafkaOffsetReaderBase {

private[kafka010] val maxOffsetFetchAttempts =
readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, "3").toInt
Expand Down Expand Up @@ -102,7 +99,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
private val maxRecordsPerPartition =
readerOptions.get(KafkaSourceProvider.MAX_RECORDS_PER_PARTITION_OPTION_KEY).map(_.toLong)

private val rangeCalculator =
override protected val rangeCalculator =
new KafkaOffsetRangeCalculator(minPartitions, maxRecordsPerPartition)

/**
Expand Down Expand Up @@ -442,87 +439,6 @@ private[kafka010] class KafkaOffsetReaderAdmin(
}
}

private def getSortedExecutorList: Array[String] = {
def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
if (a.host == b.host) {
a.executorId > b.executorId
} else {
a.host > b.host
}
}

val bm = SparkEnv.get.blockManager
bm.master.getPeers(bm.blockManagerId).toArray
.map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
.sortWith(compare)
.map(_.toString)
}

override def getOffsetRangesFromResolvedOffsets(
fromPartitionOffsets: PartitionOffsetMap,
untilPartitionOffsets: PartitionOffsetMap,
reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = {
// Find the new partitions, and get their earliest offsets
val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq)
if (newPartitionInitialOffsets.keySet != newPartitions) {
// We cannot get from offsets for some partitions. It means they got deleted.
val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
reportDataLoss(
s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed",
() =>
KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions))
}
logInfo(log"Partitions added: ${MDC(TOPIC_PARTITION_OFFSET, newPartitionInitialOffsets)}")
newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
reportDataLoss(
s"Added partition $p starts from $o instead of 0. Some data may have been missed",
() => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o))
}

val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
if (deletedPartitions.nonEmpty) {
val (message, config) =
if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
(s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}",
Some(ConsumerConfig.GROUP_ID_CONFIG))
} else {
(s"$deletedPartitions are gone. Some data may have been missed.", None)
}

reportDataLoss(
message,
() =>
KafkaExceptions.partitionsDeleted(deletedPartitions, config))
}

// Use the until partitions to calculate offset ranges to ignore partitions that have
// been deleted
val topicPartitions = untilPartitionOffsets.keySet.filter { tp =>
// Ignore partitions that we don't know the from offsets.
newPartitionInitialOffsets.contains(tp) || fromPartitionOffsets.contains(tp)
}.toSeq
logDebug("TopicPartitions: " + topicPartitions.mkString(", "))

val fromOffsets = fromPartitionOffsets ++ newPartitionInitialOffsets
val untilOffsets = untilPartitionOffsets
val ranges = topicPartitions.map { tp =>
val fromOffset = fromOffsets(tp)
val untilOffset = untilOffsets(tp)
if (untilOffset < fromOffset) {
reportDataLoss(
s"Partition $tp's offset was changed from " +
s"$fromOffset to $untilOffset. This could be either 1) a user error that the start " +
"offset is set beyond available offset when starting query, or 2) the kafka " +
"topic-partition is deleted and re-created.",
() =>
KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset))
}
KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None)
}
rangeCalculator.getRanges(ranges, getSortedExecutorList.toImmutableArraySeq)
}

private def partitionsAssignedToAdmin(
body: ju.Set[TopicPartition] => Map[TopicPartition, Long])
: Map[TopicPartition, Long] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ import scala.util.control.NonFatal
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, OffsetAndTimestamp}
import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS, TOPIC_PARTITION_OFFSET}
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
import org.apache.spark.util.{UninterruptibleThread, UninterruptibleThreadRunner}
Expand All @@ -52,7 +49,7 @@ private[kafka010] class KafkaOffsetReaderConsumer(
consumerStrategy: ConsumerStrategy,
override val driverKafkaParams: ju.Map[String, Object],
readerOptions: CaseInsensitiveMap[String],
driverGroupIdPrefix: String) extends KafkaOffsetReader with Logging {
driverGroupIdPrefix: String) extends KafkaOffsetReaderBase {

/**
* [[UninterruptibleThreadRunner]] ensures that all
Expand Down Expand Up @@ -101,7 +98,7 @@ private[kafka010] class KafkaOffsetReaderConsumer(
private val maxRecordsPerPartition =
readerOptions.get(KafkaSourceProvider.MAX_RECORDS_PER_PARTITION_OPTION_KEY).map(_.toLong)

private val rangeCalculator =
override protected val rangeCalculator =
new KafkaOffsetRangeCalculator(minPartitions, maxRecordsPerPartition)

private[kafka010] val offsetFetchAttemptIntervalMs =
Expand Down Expand Up @@ -491,87 +488,6 @@ private[kafka010] class KafkaOffsetReaderConsumer(
}
}

private def getSortedExecutorList(): Array[String] = {
def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
if (a.host == b.host) {
a.executorId > b.executorId
} else {
a.host > b.host
}
}

val bm = SparkEnv.get.blockManager
bm.master.getPeers(bm.blockManagerId).toArray
.map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
.sortWith(compare)
.map(_.toString)
}

override def getOffsetRangesFromResolvedOffsets(
fromPartitionOffsets: PartitionOffsetMap,
untilPartitionOffsets: PartitionOffsetMap,
reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = {
// Find the new partitions, and get their earliest offsets
val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq)
if (newPartitionInitialOffsets.keySet != newPartitions) {
// We cannot get from offsets for some partitions. It means they got deleted.
val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
reportDataLoss(
s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed",
() =>
KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions))
}
logInfo(log"Partitions added: ${MDC(TOPIC_PARTITION_OFFSET, newPartitionInitialOffsets)}")
newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
reportDataLoss(
s"Added partition $p starts from $o instead of 0. Some data may have been missed",
() => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o))
}

val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
if (deletedPartitions.nonEmpty) {
val (message, config) =
if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
(s"$deletedPartitions are gone.${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}",
Some(ConsumerConfig.GROUP_ID_CONFIG))
} else {
(s"$deletedPartitions are gone. Some data may have been missed.", None)
}

reportDataLoss(
message,
() =>
KafkaExceptions.partitionsDeleted(deletedPartitions, config))
}

// Use the until partitions to calculate offset ranges to ignore partitions that have
// been deleted
val topicPartitions = untilPartitionOffsets.keySet.filter { tp =>
// Ignore partitions that we don't know the from offsets.
newPartitionInitialOffsets.contains(tp) || fromPartitionOffsets.contains(tp)
}.toSeq
logDebug("TopicPartitions: " + topicPartitions.mkString(", "))

val fromOffsets = fromPartitionOffsets ++ newPartitionInitialOffsets
val untilOffsets = untilPartitionOffsets
val ranges = topicPartitions.map { tp =>
val fromOffset = fromOffsets(tp)
val untilOffset = untilOffsets(tp)
if (untilOffset < fromOffset) {
reportDataLoss(
s"Partition $tp's offset was changed from " +
s"$fromOffset to $untilOffset. This could be either 1) a user error that the start " +
"offset is set beyond available offset when starting query, or 2) the kafka " +
"topic-partition is deleted and re-created.",
() =>
KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset))
}
KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None)
}
rangeCalculator.getRanges(ranges, getSortedExecutorList().toImmutableArraySeq)
}

private def partitionsAssignedToConsumer(
body: ju.Set[TopicPartition] => Map[TopicPartition, Long],
fetchingEarliestOffset: Boolean = false)
Expand Down