Skip to content

Commit

Permalink
v1
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Feb 28, 2025
1 parent 9e17c07 commit 0258090
Show file tree
Hide file tree
Showing 4 changed files with 367 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ trait DeltaLogKeysBase {
case object END_OFFSET extends LogKeyShims
case object END_VERSION extends LogKeyShims
case object ERROR extends LogKeyShims
case object EXCEPTION extends LogKeyShims
case object EXECUTOR_ID extends LogKeyShims
case object EXPR extends LogKeyShims
case object FILE_INDEX extends LogKeyShims
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,15 @@ trait DeltaSQLConfBase {
.checkValue(_ >= 0, "DVTombstoneCountThreshold must not be negative.")
.createWithDefault(10000)

val FAST_DROP_FEATURE_STREAMING_ALWAYS_VALIDATE_PROTOCOL =
buildConf("tableFeatures.dev.fastDropFeature.alwaysValidateProtocolInStreaming.enabled")
.internal()
.doc(
"""Whether to validate the protocol when starting a stream from arbitrary
|versions.""".stripMargin)
.booleanConf
.createWithDefault(true)

val DELTA_MAX_SNAPSHOT_LINEAGE_LENGTH =
buildConf("maxSnapshotLineageLength")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.FileNotFoundException
import java.sql.Timestamp

import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
import scala.util.matching.Regex

import org.apache.spark.sql.delta._
Expand Down Expand Up @@ -1360,9 +1361,15 @@ case class DeltaSource(
case StartingVersionLatest =>
deltaLog.update().version + 1
case StartingVersion(version) =>
// when starting from a given version, we don't need the snapshot of this version. So
// `mustBeRecreatable` is set to `false`.
deltaLog.history.checkVersionExists(version, mustBeRecreatable = false, allowOutOfRange)
if (!DeltaSource.validateProtocolAt(spark, deltaLog, version)) {
// When starting from a given version, we don't require that the snapshot of this
// version can be reconstructed, even though the input table is technically in an
// inconsistent state. If the snapshot cannot be reconstructed, then the protocol
// check is skipped, so this is technically not safe, but we keep it this way for
// historical reasons.
deltaLog.history.checkVersionExists(
version, mustBeRecreatable = false, allowOutOfRange)
}
version
}
Some(v)
Expand All @@ -1384,7 +1391,33 @@ case class DeltaSource(

}

object DeltaSource {
object DeltaSource extends DeltaLogging {
/**
* Validate the protocol at a given version. If the snapshot reconstruction fails for any other
* reason than table feature exception, we suppress it. This allows to fallback to previous
* behavior where the starting version/timestamp was not mandatory to point to reconstructable
* snapshot.
*
* Returns true when the validation was performed and succeeded.
*/
def validateProtocolAt(spark: SparkSession, deltaLog: DeltaLog, version: Long): Boolean = {
val alwaysValidateProtocol = spark.sessionState.conf.getConf(
DeltaSQLConf.FAST_DROP_FEATURE_STREAMING_ALWAYS_VALIDATE_PROTOCOL)
if (!alwaysValidateProtocol) return false

try {
// We attempt to construct a snapshot at the startingVersion in order to validate the
// protocol. If snapshot reconstruction fails, fall back to the old behavior where the
// only requirement was for the commit to exist.
deltaLog.getSnapshotAt(version)
return true
} catch {
case e: DeltaUnsupportedTableFeatureException => throw e
case NonFatal(e) => // Suppress rest errors.
logWarning(log"Protocol validation failed with '${MDC(DeltaLogKeys.EXCEPTION, e)}'.")
}
false
}

/**
* - If a commit version exactly matches the provided timestamp, we return it.
Expand Down Expand Up @@ -1414,6 +1447,7 @@ object DeltaSource {
mustBeRecreatable = false,
canReturnEarliestCommit = true)
if (commit.timestamp >= timestamp.getTime) {
validateProtocolAt(spark, deltaLog, commit.version)
// Find the commit at the `timestamp` or the earliest commit
commit.version
} else {
Expand All @@ -1423,7 +1457,9 @@ object DeltaSource {
//
// Note2: In the use case of [[CDCReader]] timestamp passed in can exceed the latest commit
// timestamp, caller doesn't expect exception, and can handle the non-existent version.
if (commit.version + 1 <= deltaLog.unsafeVolatileSnapshot.version || canExceedLatest) {
val latestNotExceeded = commit.version + 1 <= deltaLog.unsafeVolatileSnapshot.version
if (latestNotExceeded || canExceedLatest) {
if (latestNotExceeded) validateProtocolAt(spark, deltaLog, commit.version + 1)
commit.version + 1
} else {
val commitTs = new Timestamp(commit.timestamp)
Expand Down
Loading

0 comments on commit 0258090

Please sign in to comment.