Skip to content

Commit

Permalink
table redirect core
Browse files Browse the repository at this point in the history
  • Loading branch information
kamcheungting-db committed Feb 27, 2025
1 parent 0fbe9b9 commit ca4e2b9
Show file tree
Hide file tree
Showing 12 changed files with 634 additions and 81 deletions.
20 changes: 19 additions & 1 deletion spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2127,6 +2127,12 @@
],
"sqlState" : "2201B"
},
"DELTA_RELATION_PATH_MISMATCH" : {
"message" : [
"Relation path '<relation>' mismatches with <targetType>'s path '<targetPath>'."
],
"sqlState" : "2201B"
},
"DELTA_REMOVE_FILE_CDC_MISSING_EXTENDED_METADATA" : {
"message" : [
"RemoveFile created without extended metadata is ineligible for CDC:",
Expand Down Expand Up @@ -2480,14 +2486,20 @@
"Unable to update table redirection state: Invalid state transition attempted.",
"The Delta table '<table>' cannot change from '<oldState>' to '<newState>'."
],
"sqlState" : "KD007"
"sqlState" : "22023"
},
"DELTA_TABLE_INVALID_REMOVE_TABLE_REDIRECT" : {
"message" : [
"Unable to remove table redirection for <table> due to its invalid state: <currentState>."
],
"sqlState" : "KD007"
},
"DELTA_TABLE_INVALID_SET_UNSET_REDIRECT" : {
"message" : [
"Unable to SET or UNSET redirect property on <table>: current property '<currentProperty>' mismatches with new property '<newProperty>'."
],
"sqlState" : "22023"
},
"DELTA_TABLE_LOCATION_MISMATCH" : {
"message" : [
"The location of the existing table <tableName> is <existingTableLocation>. It doesn't match the specified location <tableLocation>."
Expand All @@ -2512,6 +2524,12 @@
],
"sqlState" : "0AKDD"
},
"DELTA_TABLE_UNRECOGNIZED_REDIRECT_SPEC" : {
"message" : [
"The Delta log contains unrecognized table redirect spec '<spec>'."
],
"sqlState" : "42704"
},
"DELTA_TARGET_TABLE_FINAL_SCHEMA_EMPTY" : {
"message" : [
"Target table final schema is empty."
Expand Down
44 changes: 38 additions & 6 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.delta.hooks.AutoCompactType
import org.apache.spark.sql.delta.hooks.PostCommitHook
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.redirect.NoRedirectRule
import org.apache.spark.sql.delta.redirect.RedirectSpec
import org.apache.spark.sql.delta.redirect.RedirectState
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils, UnsupportedDataTypeInfo}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -357,26 +358,57 @@ trait DeltaErrorsBase
)
}

def deltaRelationPathMismatch(
relationPath: Seq[String],
targetType: String,
targetPath: Seq[String]
): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_RELATION_PATH_MISMATCH",
messageParameters = Array(
relationPath.mkString("."),
targetType,
targetPath.mkString(".")
)
)
}

def unrecognizedRedirectSpec(spec: RedirectSpec): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_UNRECOGNIZED_REDIRECT_SPEC",
messageParameters = Array(spec.toString)
)
}

def invalidSetUnSetRedirectCommand(
table: String,
newProperty: String,
existingProperty: String): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_SET_UNSET_REDIRECT",
messageParameters = Array(table, existingProperty, newProperty)
)
}

def invalidRedirectStateTransition(
table: String,
oldState: RedirectState,
newState: RedirectState): Unit = {
newState: RedirectState): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_REDIRECT_STATE_TRANSITION",
messageParameters = Array(
table, table, oldState.name, newState.name)
messageParameters = Array(table, oldState.name, newState.name)
)
}

def invalidRemoveTableRedirect(table: String, currentState: RedirectState): Unit = {
def invalidRemoveTableRedirect(table: String, currentState: RedirectState): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_REMOVE_TABLE_REDIRECT",
messageParameters = Array(table, table, currentState.name)
)
}

def invalidCommitIntermediateRedirectState(state: RedirectState): Throwable = {
throw new DeltaIllegalStateException (
new DeltaIllegalStateException (
errorClass = "DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE",
messageParameters = Array(state.name)
)
Expand All @@ -385,7 +417,7 @@ trait DeltaErrorsBase
def noRedirectRulesViolated(
op: DeltaOperations.Operation,
noRedirectRules: Set[NoRedirectRule]): Throwable = {
throw new DeltaIllegalStateException (
new DeltaIllegalStateException (
errorClass = "DELTA_NO_REDIRECT_RULES_VIOLATED",
messageParameters =
Array(op.name, noRedirectRules.map("\"" + _ + "\"").mkString("[", ",\n", "]"))
Expand Down
87 changes: 74 additions & 13 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils
import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.redirect.RedirectFeature
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.sources._
import org.apache.spark.sql.delta.storage.LogStoreProvider
Expand Down Expand Up @@ -80,7 +81,7 @@ class DeltaLog private(
val options: Map[String, String],
val allOptions: Map[String, String],
val clock: Clock,
initialCatalogTable: Option[CatalogTable]
val initialCatalogTable: Option[CatalogTable]
) extends Checkpoints
with MetadataCleanup
with LogStoreProvider
Expand Down Expand Up @@ -160,6 +161,7 @@ class DeltaLog private(
/** The unique identifier for this table. */
def tableId: String = unsafeVolatileMetadata.id // safe because table id never changes

def getInitialCatalogTable: Option[CatalogTable] = initialCatalogTable
/**
* Combines the tableId with the path of the table to ensure uniqueness. Normally `tableId`
* should be globally unique, but nothing stops users from copying a Delta table directly to
Expand Down Expand Up @@ -891,6 +893,29 @@ object DeltaLog extends DeltaLogging {
(deltaLog, Some(table))
}

/**
* Helper method for transforming a given delta log path to the consistent formal path format.
*/
def formalizeDeltaPath(
spark: SparkSession,
options: Map[String, String],
rootPath: Path): Path = {
val fileSystemOptions: Map[String, String] =
if (spark.sessionState.conf.getConf(
DeltaSQLConf.LOAD_FILE_SYSTEM_CONFIGS_FROM_DATAFRAME_OPTIONS)) {
options.filterKeys { k =>
DeltaTableUtils.validDeltaTableHadoopPrefixes.exists(k.startsWith)
}.toMap
} else {
Map.empty
}
// scalastyle:off deltahadoopconfiguration
val hadoopConf = spark.sessionState.newHadoopConfWithOptions(fileSystemOptions)
// scalastyle:on deltahadoopconfiguration
val fs = rootPath.getFileSystem(hadoopConf)
fs.makeQualified(rootPath)
}

/**
* Helper function to be used with the forTableWithSnapshot calls. Thunk is a
* partially applied DeltaLog.forTable call, which we can then wrap around with a
Expand Down Expand Up @@ -929,14 +954,14 @@ object DeltaLog extends DeltaLogging {
// scalastyle:on deltahadoopconfiguration
val fs = rawPath.getFileSystem(hadoopConf)
val path = fs.makeQualified(rawPath)
def createDeltaLog(): DeltaLog = recordDeltaOperation(
def createDeltaLog(tablePath: Path = path): DeltaLog = recordDeltaOperation(
null,
"delta.log.create",
Map(TAG_TAHOE_PATH -> path.getParent.toString)) {
Map(TAG_TAHOE_PATH -> tablePath.getParent.toString)) {
AnalysisHelper.allowInvokingTransformsInAnalyzer {
new DeltaLog(
logPath = path,
dataPath = path.getParent,
logPath = tablePath,
dataPath = tablePath.getParent,
options = fileSystemOptions,
allOptions = options,
clock = clock,
Expand Down Expand Up @@ -966,15 +991,51 @@ object DeltaLog extends DeltaLogging {
}
}

val deltaLog = getDeltaLogFromCache
if (Option(deltaLog.sparkContext.get).map(_.isStopped).getOrElse(true)) {
// Invalid the cached `DeltaLog` and create a new one because the `SparkContext` of the cached
// `DeltaLog` has been stopped.
getOrCreateCache(spark.sessionState.conf).invalidate(cacheKey)
getDeltaLogFromCache
} else {
deltaLog
def initializeDeltaLog(): DeltaLog = {
val deltaLog = getDeltaLogFromCache
if (Option(deltaLog.sparkContext.get).forall(_.isStopped)) {
// Invalid the cached `DeltaLog` and create a new one because the `SparkContext` of the
// cached `DeltaLog` has been stopped.
getOrCreateCache(spark.sessionState.conf).invalidate(path -> fileSystemOptions)
getDeltaLogFromCache
} else {
deltaLog
}
}

val deltaLog = initializeDeltaLog()
// The deltaLog object may be cached while other session updates table redirect property.
// To avoid this potential race condition, we would add a validation inside deltaLog.update
// method to ensure deltaLog points to correct place after snapshot is updated.
val redirectConfigOpt = RedirectFeature.needDeltaLogRedirect(
spark,
deltaLog,
initialCatalogTable
)
redirectConfigOpt.map { redirectConfig =>
val (redirectLoc, catalogTableOpt) = RedirectFeature
.getRedirectLocationAndTable(spark, deltaLog, redirectConfig)
val formalizedPath = formalizeDeltaPath(spark, options, redirectLoc)
// with redirect prefix to prevent interference between redirection and normal access.
val redirectKey = new Path(RedirectFeature.DELTALOG_PREFIX, redirectLoc)
val deltaLogCacheKey = DeltaLogCacheKey(
redirectKey,
fileSystemOptions)
getOrCreateCache(spark.sessionState.conf).get(
deltaLogCacheKey,
() => {
var redirectedDeltaLog = new DeltaLog(
logPath = formalizedPath,
dataPath = formalizedPath.getParent,
options = fileSystemOptions,
allOptions = options,
clock = clock,
initialCatalogTable = catalogTableOpt
)
redirectedDeltaLog
}
)
}.getOrElse(deltaLog)
}

/** Invalidate the cached DeltaLog object for the given `dataPath`. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,13 @@ trait OptimisticTransactionImpl extends DeltaTransaction
op: DeltaOperations.Operation,
redirectConfig: TableRedirectConfiguration
): Unit = {
// If this transaction commits to the redirect destination location, then there is no
// need to validate the subsequent no-redirect rules.
val configuration = deltaLog.newDeltaHadoopConf()
val dataPath = snapshot.deltaLog.dataPath.toUri.getPath
val catalog = spark.sessionState.catalog
val isRedirectDest = redirectConfig.spec.isRedirectDest(catalog, configuration, dataPath)
if (isRedirectDest) return
// Find all rules that match with the current application name.
// If appName is not present, its no-redirect-rule are included.
// If appName is present, includes its no-redirect-rule only when appName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.delta.commands._
import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.redirect.RedirectFeature
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils, DeltaSQLConf}
import org.apache.spark.sql.delta.stats.StatisticsCollection
Expand Down Expand Up @@ -651,6 +652,17 @@ class DeltaCatalog extends DelegatingCatalogExtension
case s: SetProperty if s.property() == "location" => classOf[SetLocation]
case c => c.getClass
}
// Determines whether this DDL SET or UNSET the table redirect property. If it is, the table
// redirect feature should be disabled to ensure the DDL can be applied onto the source or
// destination table properly.
val isUpdateTableRedirectDDL = grouped.map {
case (t, s: Seq[RemoveProperty]) if t == classOf[RemoveProperty] =>
s.map { prop => prop.property() }.exists(RedirectFeature.isRedirectProperty)
case (t, s: Seq[SetProperty]) if t == classOf[SetProperty] =>
RedirectFeature.hasRedirectConfig(s.map(prop => prop.property() -> prop.value()).toMap)
case (_, _) => false
}.toSeq.exists(a => a)
RedirectFeature.withUpdateTableRedirectDDL(isUpdateTableRedirectDDL) {
val table = loadTable(ident) match {
case deltaTable: DeltaTableV2 => deltaTable
case _ if changes.exists(_.isInstanceOf[ClusterBy]) =>
Expand Down Expand Up @@ -884,6 +896,7 @@ class DeltaCatalog extends DelegatingCatalogExtension

loadTable(ident)
}
}

// We want our catalog to handle Delta, therefore for other data sources that want to be
// created, we just have this wrapper StagedTable to only drop the table if the commit fails.
Expand Down
Loading

0 comments on commit ca4e2b9

Please sign in to comment.