Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ public enum LogKeys implements LogKey {
TIMEOUT,
TIMER,
TIMESTAMP,
TIMESTAMP_COLUMN_NAME,
TIME_UNITS,
TIP,
TOKEN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ package org.apache.spark.sql.kafka010

import java.{util => ju}

import org.apache.kafka.common.record.TimestampType

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.connector.metric.CustomTaskMetric
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.connector.read.streaming.SupportsRealTimeRead
import org.apache.spark.sql.connector.read.streaming.SupportsRealTimeRead.RecordStatus
import org.apache.spark.sql.execution.streaming.runtime.{MicroBatchExecution, StreamExecution}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
import org.apache.spark.sql.kafka010.consumer.{KafkaDataConsumer, KafkaDataConsumerIterator}

/** A [[InputPartition]] for reading Kafka data in a batch based streaming query. */
private[kafka010] case class KafkaBatchInputPartition(
Expand Down Expand Up @@ -67,7 +71,8 @@ private case class KafkaBatchPartitionReader(
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean,
includeHeaders: Boolean) extends PartitionReader[InternalRow] with Logging {
includeHeaders: Boolean)
extends SupportsRealTimeRead[InternalRow] with Logging {

private val consumer = KafkaDataConsumer.acquire(offsetRange.topicPartition, executorKafkaParams)

Expand All @@ -77,6 +82,12 @@ private case class KafkaBatchPartitionReader(

private var nextOffset = rangeToRead.fromOffset
private var nextRow: UnsafeRow = _
private var iteratorForRealTimeMode: Option[KafkaDataConsumerIterator] = None

// Boolean flag that indicates whether we have logged the type of timestamp (i.e. create time,
// log-append time, etc.) for the Kafka source. We log upon reading the first record, and we
// then skip logging for subsequent records.
private var timestampTypeLogged = false

override def next(): Boolean = {
if (nextOffset < rangeToRead.untilOffset) {
Expand All @@ -93,6 +104,38 @@ private case class KafkaBatchPartitionReader(
}
}

override def nextWithTimeout(timeoutMs: java.lang.Long): RecordStatus = {
if (!iteratorForRealTimeMode.isDefined) {
logInfo(s"Getting a new kafka consuming iterator for ${offsetRange.topicPartition} " +
s"starting from ${nextOffset}, timeoutMs ${timeoutMs}")
iteratorForRealTimeMode = Some(consumer.getIterator(nextOffset))
}
assert(iteratorForRealTimeMode.isDefined)
val nextRecord = iteratorForRealTimeMode.get.nextWithTimeout(timeoutMs)
nextRecord.foreach { record =>

nextRow = unsafeRowProjector(record)
nextOffset = record.offset + 1
if (record.timestampType() == TimestampType.LOG_APPEND_TIME ||
record.timestampType() == TimestampType.CREATE_TIME) {
if (!timestampTypeLogged) {
logInfo(log"Kafka source record timestamp type is " +
log"${MDC(LogKeys.TIMESTAMP_COLUMN_NAME, record.timestampType())}")
timestampTypeLogged = true
}
Comment on lines +119 to +125
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain more on this logging behavior? Why we need to do this logging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tells us the semantics of of the timestamp column from a kafka record. That is, whether timestamp for records from this topic is set to wal append time (when the record is persisted by kafka brokers) or create time which is either when the record is produced by a kafka producer or is user-defined. This information is use when calculating latency to understand what journey we are actually measuring.


RecordStatus.newStatusWithArrivalTimeMs(record.timestamp())
} else {
RecordStatus.newStatusWithoutArrivalTime(true)
}
}
RecordStatus.newStatusWithoutArrivalTime(nextRecord.isDefined)
}

override def getOffset(): KafkaSourcePartitionOffset = {
KafkaSourcePartitionOffset(offsetRange.topicPartition, nextOffset)
}

override def get(): UnsafeRow = {
assert(nextRow != null)
nextRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ private[kafka010] class KafkaMicroBatchStream(
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean)
extends SupportsTriggerAvailableNow with ReportsSourceMetrics with MicroBatchStream with Logging {
extends SupportsTriggerAvailableNow
with SupportsRealTimeMode
with ReportsSourceMetrics
with MicroBatchStream
with Logging {

private[kafka010] val pollTimeoutMs = options.getLong(
KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
Expand Down Expand Up @@ -93,6 +97,11 @@ private[kafka010] class KafkaMicroBatchStream(

private var isTriggerAvailableNow: Boolean = false

private var inRealTimeMode = false
override def prepareForRealTimeMode(): Unit = {
inRealTimeMode = true
}

/**
* Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
* called in StreamExecutionThread. Otherwise, interrupting a thread while running
Expand Down Expand Up @@ -218,6 +227,93 @@ private[kafka010] class KafkaMicroBatchStream(
}.toArray
}

override def planInputPartitions(start: Offset): Array[InputPartition] = {
// This function is used for real time mode. Trigger restrictions won't be supported.
if (maxOffsetsPerTrigger.isDefined) {
throw new UnsupportedOperationException(
"maxOffsetsPerTrigger is not compatible with real time mode")
}
if (minOffsetPerTrigger.isDefined) {
throw new UnsupportedOperationException(
"minOffsetsPerTrigger is not compatible with real time mode"
)
}
if (options.containsKey(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY)) {
throw new UnsupportedOperationException(
"minpartitions is not compatible with real time mode"
)
}
if (options.containsKey(KafkaSourceProvider.ENDING_TIMESTAMP_OPTION_KEY)) {
throw new UnsupportedOperationException(
"endingtimestamp is not compatible with real time mode"
)
}
if (options.containsKey(KafkaSourceProvider.MAX_TRIGGER_DELAY)) {
throw new UnsupportedOperationException(
"maxtriggerdelay is not compatible with real time mode"
)
}

// This function is used by Low Latency Mode, where we expect 1:1 mapping between a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This function is used by Low Latency Mode, where we expect 1:1 mapping between a
// This function is used by real time mode, where we expect 1:1 mapping between a

// topic partition and an input partition.
// We are skipping partition range check for performance reason. We can always try to do
// it in tasks if needed.
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets

// Here we check previous topic partitions with latest partition offsets to see if we need to
// update the partition list. Here we don't need the updated partition topic to be absolutely
// up to date, because there might already be minutes' delay since new partition is created.
// latestPartitionOffsets should be fetched not long ago anyway.
// If the topic partitions change, we fetch the earliest offsets for all new partitions
// and add them to the list.
assert(latestPartitionOffsets != null, "latestPartitionOffsets should be set in latestOffset")
val latestTopicPartitions = latestPartitionOffsets.keySet
val newStartPartitionOffsets = if (startPartitionOffsets.keySet == latestTopicPartitions) {
startPartitionOffsets
} else {
val newPartitions = latestTopicPartitions.diff(startPartitionOffsets.keySet)
// Instead of fetching earliest offsets, we could fill offset 0 here and avoid this extra
// admin function call. But we consider new partition is rare and getting earliest offset
// aligns with what we do in micro-batch mode and can potentially enable more sanity checks
// in executor side.
val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaMicroBatchStream's existing planInputPartitions calls kafkaOffsetReader.getOffsetRangesFromResolvedOffsets to handle partition offsets.

It handles deleted partitions cases but this new planInputPartitions doesn't, should we also do the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka doesn't support deleting partitions so I am not sure if that case is worth checking. If the topic was deleted and recreated the offsets which not be valid and we would fail in that case anyways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is what currently in getOffsetRangesFromResolvedOffsets called by KafkaMicroBatchStream.planInputPartitions:

if (newPartitionInitialOffsets.keySet != newPartitions) {
  // We cannot get from offsets for some partitions. It means they got deleted.
  val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
  reportDataLoss(
    s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed",
    () => KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions))
}

The behavior of reportDataLoss is configurable. It can be a failure like what you did here, or a log warning.

I would suggest to follow existing behavior instead of two different behaviors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


assert(
newPartitionOffsets.keys.forall(!startPartitionOffsets.contains(_)),
"startPartitionOffsets should not contain any key in newPartitionOffsets")

// Filter out new partition offsets that are not 0 and log a warning
val nonZeroNewPartitionOffsets = newPartitionOffsets.filter {
case (_, offset) => offset != 0
}
// Log the non-zero new partition offsets
if (nonZeroNewPartitionOffsets.nonEmpty) {
logWarning(log"new partitions should start from offset 0: " +
log"${MDC(OFFSETS, nonZeroNewPartitionOffsets)}")
}
Comment on lines 286 to 300
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non zero offset new partitions case, getOffsetRangesFromResolvedOffsets delegates to reportDataLoss closure. Should we do the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me add that and make the behavior consistent.


logInfo(log"Added new partition offsets: ${MDC(OFFSETS, newPartitionOffsets)}")
startPartitionOffsets ++ newPartitionOffsets
}

newStartPartitionOffsets.keySet.toSeq.map { tp =>
val fromOffset = newStartPartitionOffsets(tp)
KafkaBatchInputPartition(
KafkaOffsetRange(tp, fromOffset, Long.MaxValue, preferredLoc = None),
executorKafkaParams,
pollTimeoutMs,
failOnDataLoss,
includeHeaders)
}.toArray
}

override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
val mergedMap = offsets.map {
case KafkaSourcePartitionOffset(p, o) => (p, o)
}.toMap
KafkaSourceOffset(mergedMap)
}

override def createReaderFactory(): PartitionReaderFactory = {
KafkaBatchReaderFactory
}
Expand All @@ -235,7 +331,30 @@ private[kafka010] class KafkaMicroBatchStream(
override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"

override def metrics(latestConsumedOffset: Optional[Offset]): ju.Map[String, String] = {
KafkaMicroBatchStream.metrics(latestConsumedOffset, latestPartitionOffsets)
var rtmFetchLatestOffsetsTimeMs = Option.empty[Long]
val reCalculatedLatestPartitionOffsets =
if (inRealTimeMode) {
if (!latestConsumedOffset.isPresent) {
// this means a batch has no end offsets, which should not happen
None
} else {
Some {
val startTime = System.currentTimeMillis()
val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(
Some(latestConsumedOffset.get.asInstanceOf[KafkaSourceOffset].partitionToOffsets))
val endTime = System.currentTimeMillis()
rtmFetchLatestOffsetsTimeMs = Some(endTime - startTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not sure if I miss something. rtmFetchLatestOffsetsTimeMs is assigned here, but it is not used anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me remove this variable it is no necessary.

latestOffsets
}
}
} else {
// If we are in micro-batch mode, we need to get the latest partition offsets at the
// start of the batch and recalculate the latest offsets at the end for backlog
// estimation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not needed right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will revert this code based on this thread:
https://github.com/apache/spark/pull/52729/files#r2482113567

So this is not needed

Some(kafkaOffsetReader.fetchLatestOffsets(Some(latestPartitionOffsets)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes original behavior? Previously it just uses latestPartitionOffsets without fetching latest offsets again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually fixing an issue with non-rtm queries using kafka. The calculation is is not correct here and will always result in the backlog metrics being zero. "latestPartitionOffsets" is calculated at when "latestOffset" is called at the beginning of a batch. It is basically the offset this batch will read up to so for non-rtm streaming queries latestConsumedOffset will be the same as latestPartitionOffsets resulting in zero backlog. What we should be doing is get the latest offsets from source kafka topic after the batch is processed i.e. when metrics() is called to calculate a useful backlog metric. I know this is not really related to RTM so let me know if I should just create a separate PR for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, let's focus on RTM in this PR and don't change/fix existing behavior here. Please open a separate PR to fix it if you think it is an issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}

KafkaMicroBatchStream.metrics(latestConsumedOffset, reCalculatedLatestPartitionOffsets)
}

/**
Expand Down Expand Up @@ -386,13 +505,14 @@ object KafkaMicroBatchStream extends Logging {
*/
def metrics(
latestConsumedOffset: Optional[Offset],
latestAvailablePartitionOffsets: PartitionOffsetMap): ju.Map[String, String] = {
latestAvailablePartitionOffsets: Option[PartitionOffsetMap]): ju.Map[String, String] = {
val offset = Option(latestConsumedOffset.orElse(null))

if (offset.nonEmpty && latestAvailablePartitionOffsets != null) {
if (offset.nonEmpty && latestAvailablePartitionOffsets.isDefined) {
val consumedPartitionOffsets = offset.map(KafkaSourceOffset(_)).get.partitionToOffsets
val offsetsBehindLatest = latestAvailablePartitionOffsets
.map(partitionOffset => partitionOffset._2 - consumedPartitionOffsets(partitionOffset._1))
val offsetsBehindLatest = latestAvailablePartitionOffsets.get
.map(partitionOffset => partitionOffset._2 -
consumedPartitionOffsets.getOrElse(partitionOffset._1, 0L))
if (offsetsBehindLatest.nonEmpty) {
val avgOffsetBehindLatest = offsetsBehindLatest.sum.toDouble / offsetsBehindLatest.size
return Map[String, String](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ private[kafka010] class InternalKafkaConsumer(
private[consumer] var kafkaParamsWithSecurity: ju.Map[String, Object] = _
private val consumer = createConsumer()

def poll(pollTimeoutMs: Long): ju.List[ConsumerRecord[Array[Byte], Array[Byte]]] = {
val p = consumer.poll(Duration.ofMillis(pollTimeoutMs))
val r = p.records(topicPartition)
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
r
}

/**
* Poll messages from Kafka starting from `offset` and returns a pair of "list of consumer record"
* and "offset after poll". The list of consumer record may be empty if the Kafka consumer fetches
Expand Down Expand Up @@ -131,7 +138,7 @@ private[kafka010] class InternalKafkaConsumer(
c
}

private def seek(offset: Long): Unit = {
def seek(offset: Long): Unit = {
logDebug(s"Seeking to $groupId $topicPartition $offset")
consumer.seek(topicPartition, offset)
}
Expand Down Expand Up @@ -228,6 +235,19 @@ private[consumer] case class FetchedRecord(
}
}

/**
* This class keeps returning the next records. If no new record is available, it will keep
* polling until timeout. It is used by KafkaBatchPartitionReader.nextWithTimeout(), to reduce
* seeking overhead in real time mode.
*/
private[sql] trait KafkaDataConsumerIterator {
/**
* Return the next record
* @return None if no new record is available after `timeoutMs`.
*/
def nextWithTimeout(timeoutMs: Long): Option[ConsumerRecord[Array[Byte], Array[Byte]]]
}

/**
* This class helps caller to read from Kafka leveraging consumer pool as well as fetched data pool.
* This class throws error when data loss is detected while reading from Kafka.
Expand Down Expand Up @@ -272,6 +292,82 @@ private[kafka010] class KafkaDataConsumer(
// Starting timestamp when the consumer is created.
private var startTimestampNano: Long = System.nanoTime()

/**
* Get an iterator that can return the next entry. It is used exclusively for real-time
* mode.
*
* It is called by KafkaBatchPartitionReader.nextWithTimeout(). Unlike get(), there is no
* out-of-bound check in this function. Since there is no endOffset given, we assume anything
* record is valid to return as long as it is at or after `offset`.
*
* @param startOffsets, the starting positions to read from, inclusive.
*/
def getIterator(offset: Long): KafkaDataConsumerIterator = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The param doc is startOffsets instead of offset.

Suggested change
def getIterator(offset: Long): KafkaDataConsumerIterator = {
def getIterator(startOffsets: Long): KafkaDataConsumerIterator = {

new KafkaDataConsumerIterator {
private var fetchedRecordList
: Option[ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]]] = None
private val consumer = getOrRetrieveConsumer()
private var firstRecord = true
private var _currentOffset: Long = offset - 1

private def fetchedRecordListHasNext(): Boolean = {
fetchedRecordList.map(_.hasNext).getOrElse(false)
}

override def nextWithTimeout(
timeoutMs: Long): Option[ConsumerRecord[Array[Byte], Array[Byte]]] = {
var timeLeftMs = timeoutMs

def timeAndDeductFromTimeLeftMs[T](body: => T): Unit = {
// To reduce timing the same operator twice, we reuse the timing results for
// totalTimeReadNanos and for timeoutMs.
val prevTime = totalTimeReadNanos
timeNanos {
body
}
timeLeftMs -= (totalTimeReadNanos - prevTime) / 1000000
}

if (firstRecord) {
timeAndDeductFromTimeLeftMs {
consumer.seek(offset)
firstRecord = false
}
}
while (!fetchedRecordListHasNext() && timeLeftMs > 0) {
timeAndDeductFromTimeLeftMs {
try {
val records = consumer.poll(timeLeftMs)
numPolls += 1
if (!records.isEmpty) {
numRecordsPolled += records.size
fetchedRecordList = Some(records.listIterator)
}
} catch {
case ex: OffsetOutOfRangeException =>
if (_currentOffset != -1) {
throw ex
} else {
Thread.sleep(10) // retry until the source partition is populated
assert(offset == 0)
consumer.seek(offset)
}
}
}
}
if (fetchedRecordListHasNext()) {
totalRecordsRead += 1
val nextRecord = fetchedRecordList.get.next()
assert(nextRecord.offset > _currentOffset, "Kafka offset should be incremental.")
_currentOffset = nextRecord.offset
Some(nextRecord)
} else {
None
}
}
}
}

/**
* Get the record for the given offset if available.
*
Expand Down
Loading