diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index 3749045beb3..0541b6cc4f0 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -2127,6 +2127,12 @@ ], "sqlState" : "2201B" }, + "DELTA_RELATION_PATH_MISMATCH" : { + "message" : [ + "Relation path '' mismatches with 's path ''." + ], + "sqlState" : "2201B" + }, "DELTA_REMOVE_FILE_CDC_MISSING_EXTENDED_METADATA" : { "message" : [ "RemoveFile created without extended metadata is ineligible for CDC:", @@ -2480,7 +2486,7 @@ "Unable to update table redirection state: Invalid state transition attempted.", "The Delta table '' cannot change from '' to ''." ], - "sqlState" : "KD007" + "sqlState" : "22023" }, "DELTA_TABLE_INVALID_REMOVE_TABLE_REDIRECT" : { "message" : [ @@ -2488,6 +2494,12 @@ ], "sqlState" : "KD007" }, + "DELTA_TABLE_INVALID_SET_UNSET_REDIRECT" : { + "message" : [ + "Unable to SET or UNSET redirect property on
: current property '' mismatches with new property ''." + ], + "sqlState" : "22023" + }, "DELTA_TABLE_LOCATION_MISMATCH" : { "message" : [ "The location of the existing table is . It doesn't match the specified location ." @@ -2512,6 +2524,12 @@ ], "sqlState" : "0AKDD" }, + "DELTA_TABLE_UNRECOGNIZED_REDIRECT_SPEC" : { + "message" : [ + "The Delta log contains unrecognized table redirect spec ''." + ], + "sqlState" : "42704" + }, "DELTA_TARGET_TABLE_FINAL_SCHEMA_EMPTY" : { "message" : [ "Target table final schema is empty." diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 57bdec6c070..271667b8d7c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.delta.hooks.AutoCompactType import org.apache.spark.sql.delta.hooks.PostCommitHook import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.redirect.NoRedirectRule +import org.apache.spark.sql.delta.redirect.RedirectSpec import org.apache.spark.sql.delta.redirect.RedirectState import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils, UnsupportedDataTypeInfo} import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -357,18 +358,49 @@ trait DeltaErrorsBase ) } + def deltaRelationPathMismatch( + relationPath: Seq[String], + targetType: String, + targetPath: Seq[String] + ): Throwable = { + new DeltaIllegalStateException( + errorClass = "DELTA_RELATION_PATH_MISMATCH", + messageParameters = Array( + relationPath.mkString("."), + targetType, + targetPath.mkString(".") + ) + ) + } + + def unrecognizedRedirectSpec(spec: RedirectSpec): Throwable = { + new DeltaIllegalStateException( + errorClass = "DELTA_TABLE_UNRECOGNIZED_REDIRECT_SPEC", + messageParameters = Array(spec.toString) + ) + } + + def invalidSetUnSetRedirectCommand( + table: String, + newProperty: String, + existingProperty: String): Throwable = { + new DeltaIllegalStateException( + errorClass = "DELTA_TABLE_INVALID_SET_UNSET_REDIRECT", + messageParameters = Array(table, existingProperty, newProperty) + ) + } + def invalidRedirectStateTransition( table: String, oldState: RedirectState, - newState: RedirectState): Unit = { + newState: RedirectState): Throwable = { new DeltaIllegalStateException( errorClass = "DELTA_TABLE_INVALID_REDIRECT_STATE_TRANSITION", - messageParameters = Array( - table, table, oldState.name, newState.name) + messageParameters = Array(table, oldState.name, newState.name) ) } - def invalidRemoveTableRedirect(table: String, currentState: RedirectState): Unit = { + def invalidRemoveTableRedirect(table: String, currentState: RedirectState): Throwable = { new DeltaIllegalStateException( errorClass = "DELTA_TABLE_INVALID_REMOVE_TABLE_REDIRECT", messageParameters = Array(table, table, currentState.name) @@ -376,7 +408,7 @@ trait DeltaErrorsBase } def invalidCommitIntermediateRedirectState(state: RedirectState): Throwable = { - throw new DeltaIllegalStateException ( + new DeltaIllegalStateException ( errorClass = "DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE", messageParameters = Array(state.name) ) @@ -385,7 +417,7 @@ trait DeltaErrorsBase def noRedirectRulesViolated( op: DeltaOperations.Operation, noRedirectRules: Set[NoRedirectRule]): Throwable = { - throw new DeltaIllegalStateException ( + new DeltaIllegalStateException ( errorClass = "DELTA_NO_REDIRECT_RULES_VIOLATED", messageParameters = Array(op.name, noRedirectRules.map("\"" + _ + "\"").mkString("[", ",\n", "]")) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 1260c348f87..dbeafc20ffc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.delta.commands.WriteIntoDelta import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeLogFileIndex} import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.redirect.RedirectFeature import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.delta.sources._ import org.apache.spark.sql.delta.storage.LogStoreProvider @@ -80,7 +81,7 @@ class DeltaLog private( val options: Map[String, String], val allOptions: Map[String, String], val clock: Clock, - initialCatalogTable: Option[CatalogTable] + val initialCatalogTable: Option[CatalogTable] ) extends Checkpoints with MetadataCleanup with LogStoreProvider @@ -160,6 +161,7 @@ class DeltaLog private( /** The unique identifier for this table. */ def tableId: String = unsafeVolatileMetadata.id // safe because table id never changes + def getInitialCatalogTable: Option[CatalogTable] = initialCatalogTable /** * Combines the tableId with the path of the table to ensure uniqueness. Normally `tableId` * should be globally unique, but nothing stops users from copying a Delta table directly to @@ -891,6 +893,29 @@ object DeltaLog extends DeltaLogging { (deltaLog, Some(table)) } + /** + * Helper method for transforming a given delta log path to the consistent formal path format. + */ + def formalizeDeltaPath( + spark: SparkSession, + options: Map[String, String], + rootPath: Path): Path = { + val fileSystemOptions: Map[String, String] = + if (spark.sessionState.conf.getConf( + DeltaSQLConf.LOAD_FILE_SYSTEM_CONFIGS_FROM_DATAFRAME_OPTIONS)) { + options.filterKeys { k => + DeltaTableUtils.validDeltaTableHadoopPrefixes.exists(k.startsWith) + }.toMap + } else { + Map.empty + } + // scalastyle:off deltahadoopconfiguration + val hadoopConf = spark.sessionState.newHadoopConfWithOptions(fileSystemOptions) + // scalastyle:on deltahadoopconfiguration + val fs = rootPath.getFileSystem(hadoopConf) + fs.makeQualified(rootPath) + } + /** * Helper function to be used with the forTableWithSnapshot calls. Thunk is a * partially applied DeltaLog.forTable call, which we can then wrap around with a @@ -929,14 +954,14 @@ object DeltaLog extends DeltaLogging { // scalastyle:on deltahadoopconfiguration val fs = rawPath.getFileSystem(hadoopConf) val path = fs.makeQualified(rawPath) - def createDeltaLog(): DeltaLog = recordDeltaOperation( + def createDeltaLog(tablePath: Path = path): DeltaLog = recordDeltaOperation( null, "delta.log.create", - Map(TAG_TAHOE_PATH -> path.getParent.toString)) { + Map(TAG_TAHOE_PATH -> tablePath.getParent.toString)) { AnalysisHelper.allowInvokingTransformsInAnalyzer { new DeltaLog( - logPath = path, - dataPath = path.getParent, + logPath = tablePath, + dataPath = tablePath.getParent, options = fileSystemOptions, allOptions = options, clock = clock, @@ -966,15 +991,51 @@ object DeltaLog extends DeltaLogging { } } - val deltaLog = getDeltaLogFromCache - if (Option(deltaLog.sparkContext.get).map(_.isStopped).getOrElse(true)) { - // Invalid the cached `DeltaLog` and create a new one because the `SparkContext` of the cached - // `DeltaLog` has been stopped. - getOrCreateCache(spark.sessionState.conf).invalidate(cacheKey) - getDeltaLogFromCache - } else { - deltaLog + def initializeDeltaLog(): DeltaLog = { + val deltaLog = getDeltaLogFromCache + if (Option(deltaLog.sparkContext.get).forall(_.isStopped)) { + // Invalid the cached `DeltaLog` and create a new one because the `SparkContext` of the + // cached `DeltaLog` has been stopped. + getOrCreateCache(spark.sessionState.conf).invalidate(path -> fileSystemOptions) + getDeltaLogFromCache + } else { + deltaLog + } } + + val deltaLog = initializeDeltaLog() + // The deltaLog object may be cached while other session updates table redirect property. + // To avoid this potential race condition, we would add a validation inside deltaLog.update + // method to ensure deltaLog points to correct place after snapshot is updated. + val redirectConfigOpt = RedirectFeature.needDeltaLogRedirect( + spark, + deltaLog, + initialCatalogTable + ) + redirectConfigOpt.map { redirectConfig => + val (redirectLoc, catalogTableOpt) = RedirectFeature + .getRedirectLocationAndTable(spark, deltaLog, redirectConfig) + val formalizedPath = formalizeDeltaPath(spark, options, redirectLoc) + // with redirect prefix to prevent interference between redirection and normal access. + val redirectKey = new Path(RedirectFeature.DELTALOG_PREFIX, redirectLoc) + val deltaLogCacheKey = DeltaLogCacheKey( + redirectKey, + fileSystemOptions) + getOrCreateCache(spark.sessionState.conf).get( + deltaLogCacheKey, + () => { + var redirectedDeltaLog = new DeltaLog( + logPath = formalizedPath, + dataPath = formalizedPath.getParent, + options = fileSystemOptions, + allOptions = options, + clock = clock, + initialCatalogTable = catalogTableOpt + ) + redirectedDeltaLog + } + ) + }.getOrElse(deltaLog) } /** Invalidate the cached DeltaLog object for the given `dataPath`. */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 48062bf9e2a..4643b0a856d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1276,6 +1276,13 @@ trait OptimisticTransactionImpl extends DeltaTransaction op: DeltaOperations.Operation, redirectConfig: TableRedirectConfiguration ): Unit = { + // If this transaction commits to the redirect destination location, then there is no + // need to validate the subsequent no-redirect rules. + val configuration = deltaLog.newDeltaHadoopConf() + val dataPath = snapshot.deltaLog.dataPath.toUri.getPath + val catalog = spark.sessionState.catalog + val isRedirectDest = redirectConfig.spec.isRedirectDest(catalog, configuration, dataPath) + if (isRedirectDest) return // Find all rules that match with the current application name. // If appName is not present, its no-redirect-rule are included. // If appName is present, includes its no-redirect-rule only when appName diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala index 6d827928b69..0106135dbc8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.delta.commands._ import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint} import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.redirect.RedirectFeature import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils, DeltaSQLConf} import org.apache.spark.sql.delta.stats.StatisticsCollection @@ -651,6 +652,17 @@ class DeltaCatalog extends DelegatingCatalogExtension case s: SetProperty if s.property() == "location" => classOf[SetLocation] case c => c.getClass } + // Determines whether this DDL SET or UNSET the table redirect property. If it is, the table + // redirect feature should be disabled to ensure the DDL can be applied onto the source or + // destination table properly. + val isUpdateTableRedirectDDL = grouped.map { + case (t, s: Seq[RemoveProperty]) if t == classOf[RemoveProperty] => + s.map { prop => prop.property() }.exists(RedirectFeature.isRedirectProperty) + case (t, s: Seq[SetProperty]) if t == classOf[SetProperty] => + RedirectFeature.hasRedirectConfig(s.map(prop => prop.property() -> prop.value()).toMap) + case (_, _) => false + }.toSeq.exists(a => a) + RedirectFeature.withUpdateTableRedirectDDL(isUpdateTableRedirectDDL) { val table = loadTable(ident) match { case deltaTable: DeltaTableV2 => deltaTable case _ if changes.exists(_.isInstanceOf[ClusterBy]) => @@ -884,6 +896,7 @@ class DeltaCatalog extends DelegatingCatalogExtension loadTable(ident) } + } // We want our catalog to handle Delta, therefore for other data sources that want to be // created, we just have this wrapper StagedTable to only drop the table if the commit fails. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index a4bb58700e2..cafa3ec2d99 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils} import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.sources.DeltaSQLConf.ENABLE_TABLE_REDIRECT_FEATURE import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession} @@ -37,7 +38,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedTable} import org.apache.spark.sql.catalyst.analysis.UnresolvedTableImplicits._ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisHelper, LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableCatalog, V2TableWithV1Fallback} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -58,27 +59,41 @@ import org.apache.spark.util.{Clock, SystemClock} * @param path The path to the table * @param tableIdentifier The table identifier for this table */ -case class DeltaTableV2( - spark: SparkSession, - path: Path, - catalogTable: Option[CatalogTable] = None, - tableIdentifier: Option[String] = None, - timeTravelOpt: Option[DeltaTimeTravelSpec] = None, - options: Map[String, String] = Map.empty) +class DeltaTableV2 private[delta]( + val spark: SparkSession, + val path: Path, + val catalogTable: Option[CatalogTable], + val tableIdentifier: Option[String], + val timeTravelOpt: Option[DeltaTimeTravelSpec], + val options: Map[String, String]) extends Table with SupportsWrite with V2TableWithV1Fallback with DeltaLogging { - private lazy val (rootPath, partitionFilters, timeTravelByPath) = { + case class PathInfo( + rootPath: Path, + private[delta] var partitionFilters: Seq[(String, String)], + private[delta] var timeTravelByPath: Option[DeltaTimeTravelSpec] + ) + + private lazy val pathInfo: PathInfo = { if (catalogTable.isDefined) { // Fast path for reducing path munging overhead - (new Path(catalogTable.get.location), Nil, None) + PathInfo(new Path(catalogTable.get.location), Seq.empty, None) } else { - DeltaDataSource.parsePathIdentifier(spark, path.toString, options) + val (rootPath, filters, timeTravel) = + DeltaDataSource.parsePathIdentifier(spark, path.toString, options) + PathInfo(rootPath, filters, timeTravel) } } + private def rootPath = pathInfo.rootPath + + private def partitionFilters = pathInfo.partitionFilters + + private def timeTravelByPath = pathInfo.timeTravelByPath + def hasPartitionFilters: Boolean = partitionFilters.nonEmpty @@ -363,9 +378,86 @@ case class DeltaTableV2( None } } + + def copy( + spark: SparkSession = this.spark, + path: Path = this.path, + catalogTable: Option[CatalogTable] = this.catalogTable, + tableIdentifier: Option[String] = this.tableIdentifier, + timeTravelOpt: Option[DeltaTimeTravelSpec] = this.timeTravelOpt, + options: Map[String, String] = this.options + ): DeltaTableV2 = { + val deltaTableV2 = + new DeltaTableV2(spark, path, catalogTable, tableIdentifier, timeTravelOpt, options) + deltaTableV2.pathInfo.timeTravelByPath = timeTravelByPath + deltaTableV2.pathInfo.partitionFilters = partitionFilters + deltaTableV2 + } + + override def toString: String = + s"DeltaTableV2($spark,$path,$catalogTable,$tableIdentifier,$timeTravelOpt,$options)" } object DeltaTableV2 { + def unapply(deltaTable: DeltaTableV2): Option[( + SparkSession, + Path, + Option[CatalogTable], + Option[String], + Option[DeltaTimeTravelSpec], + Map[String, String]) + ] = { + Some(( + deltaTable.spark, + deltaTable.path, + deltaTable.catalogTable, + deltaTable.tableIdentifier, + deltaTable.timeTravelOpt, + deltaTable.options) + ) + } + + def apply( + spark: SparkSession, + path: Path, + catalogTable: Option[CatalogTable] = None, + tableIdentifier: Option[String] = None, + options: Map[String, String] = Map.empty[String, String], + timeTravelOpt: Option[DeltaTimeTravelSpec] = None + ): DeltaTableV2 = { + val deltaTable = new DeltaTableV2( + spark, + path, + catalogTable = catalogTable, + tableIdentifier = tableIdentifier, + timeTravelOpt = timeTravelOpt, + options = options + ) + if (spark == null || spark.sessionState == null || + !spark.sessionState.conf.getConf(ENABLE_TABLE_REDIRECT_FEATURE)) { + return deltaTable + } + // This following code ensure passing the path and catalogTable of the redirected table object. + // Note: the DeltaTableV2 can only be created using this method. + AnalysisHelper.allowInvokingTransformsInAnalyzer { + val deltaLog = deltaTable.deltaLog + val rootDeltaLogPath = DeltaLog.logPathFor(deltaTable.rootPath.toString) + val finalDeltaLogPath = DeltaLog.formalizeDeltaPath(spark, options, rootDeltaLogPath) + val catalogTableOpt = if (finalDeltaLogPath == deltaLog.logPath) { + // If there is no redirection, use existing catalogTable. + catalogTable + } else { + // If there is redirection, use the catalogTable of deltaLog. + deltaLog.getInitialCatalogTable + } + val tableIdentifier = catalogTableOpt.map(_.identifier.identifier) + val newPath = new Path(deltaLog.dataPath.toUri.getPath) + deltaTable.copy( + path = newPath, catalogTable = catalogTableOpt, tableIdentifier = tableIdentifier + ) + } + } + /** Resolves a path into a DeltaTableV2, leveraging standard v2 table resolution. */ def apply(spark: SparkSession, tablePath: Path, options: Map[String, String], cmd: String) : DeltaTableV2 = { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index f684714c39f..f51d13a4a13 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingComm import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraints} import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils import org.apache.spark.sql.delta.logging.DeltaLogKeys +import org.apache.spark.sql.delta.redirect.RedirectFeature import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.delta.schema.SchemaUtils.transformSchema import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -166,6 +167,8 @@ case class AlterTableSetPropertiesDeltaCommand( CoordinatedCommitsUtils.validateConfigurationsForAlterTableSetPropertiesDeltaCommand( metadata.configuration, filteredConfs) + // If table redirect feature is updated, validates its property. + RedirectFeature.validateTableRedirect(txn.snapshot, table.catalogTable, configuration) val newMetadata = metadata.copy( description = configuration.getOrElse(TableCatalog.PROP_COMMENT, metadata.description), configuration = metadata.configuration ++ filteredConfs) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala b/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala index c9f64c656d9..3f889955396 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala @@ -16,8 +16,12 @@ package org.apache.spark.sql.delta.redirect +import java.util.UUID + import scala.reflect.ClassTag +import scala.util.DynamicVariable +// scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.{ DeltaConfig, DeltaConfigs, @@ -28,14 +32,22 @@ import org.apache.spark.sql.delta.{ RedirectWriterOnlyFeature, Snapshot } +import org.apache.spark.sql.delta.DeltaLog.logPathFor import org.apache.spark.sql.delta.actions.Metadata +import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import org.apache.spark.sql.delta.sources.DeltaSQLConf.ENABLE_TABLE_REDIRECT_FEATURE import org.apache.spark.sql.delta.util.JsonUtils import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ /** * The table redirection feature includes specific states that manage the behavior of Delta clients @@ -106,7 +118,12 @@ case object DropRedirectInProgress extends RedirectState { * This is the abstract class of the redirect specification, which stores the information * of accessing the redirect destination table. */ -abstract class RedirectSpec() +abstract class RedirectSpec() { + /** Determine whether `dataPath` is the redirect destination location. */ + def isRedirectDest(catalog: SessionCatalog, config: Configuration, dataPath: String): Boolean + /** Determine whether `dataPath` is the redirect source location. */ + def isRedirectSource(dataPath: String): Boolean +} /** * The default redirect spec that is used for OSS delta. @@ -115,12 +132,24 @@ abstract class RedirectSpec() * { * ...... * "spec": { - * "tablePath": "s3:///tables/" + * "redirectSrc": "s3:///tables/" + * "redirectDest": "s3:///tables/" * } * } - * @param tablePath this is the path where stores the redirect destination table's location. + * + * @param sourcePath this is the path where stores the redirect source table's location. + * @param destPath: this is the path where stores the redirect destination table's location. */ -class PathBasedRedirectSpec(val tablePath: String) extends RedirectSpec +class PathBasedRedirectSpec( + val sourcePath: String, + val destPath: String +) extends RedirectSpec { + def isRedirectDest(catalog: SessionCatalog, config: Configuration, dataPath: String): Boolean = { + destPath == dataPath + } + + def isRedirectSource(dataPath: String): Boolean = sourcePath == dataPath +} object PathBasedRedirectSpec { /** @@ -221,6 +250,35 @@ case class TableRedirectConfiguration( val isInProgressState: Boolean = { redirectState == EnableRedirectInProgress || redirectState == DropRedirectInProgress } + + /** Determines whether the current application fulfills the no-redirect rules. */ + private def isNoRedirectApp(spark: SparkSession): Boolean = { + noRedirectRules.exists { rule => + // If rule.appName is empty, then it applied to "spark.app.name" + // Todo(LC-6953): The operation name should also be taken into consideration. + rule.appName.forall(_.equalsIgnoreCase(spark.conf.get("spark.app.name"))) + } + } + + /** Determines whether the current session needs to access the redirect dest location. */ + def needRedirect(spark: SparkSession, logPath: Path): Boolean = { + !isNoRedirectApp(spark) && + redirectState == RedirectReady && + spec.isRedirectSource(logPath.toUri.getPath) + } + + /** + * Get the redirect destination location from `deltaLog` object. + */ + def getRedirectLocation(deltaLog: DeltaLog, spark: SparkSession): Path = { + spec match { + case spec: PathBasedRedirectSpec => + val location = new Path(spec.destPath) + val fs = location.getFileSystem(deltaLog.newDeltaHadoopConf()) + fs.makeQualified(logPathFor(location)) + case other => throw DeltaErrors.unrecognizedRedirectSpec(other) + } + } } /** @@ -238,9 +296,7 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) { */ def getRedirectConfiguration(deltaLogMetadata: Metadata): Option[TableRedirectConfiguration] = { config.fromMetaData(deltaLogMetadata).map { propertyValue => - val mapper = new ObjectMapper() - mapper.registerModule(DefaultScalaModule) - mapper.readValue(propertyValue, classOf[TableRedirectConfiguration]) + RedirectFeature.parseRedirectConfiguration(propertyValue) } } @@ -289,23 +345,12 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) { } // There should be an existing table redirect configuration. if (currentConfigOpt.isEmpty) { - DeltaErrors.invalidRedirectStateTransition(tableIdent, NoRedirect, state) + throw DeltaErrors.invalidRedirectStateTransition(tableIdent, NoRedirect, state) } val currentConfig = currentConfigOpt.get val redirectState = currentConfig.redirectState - state match { - case RedirectReady => - if (redirectState != EnableRedirectInProgress && redirectState != RedirectReady) { - DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state) - } - case DropRedirectInProgress => - if (redirectState != RedirectReady) { - DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state) - } - case _ => - DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state) - } + RedirectFeature.validateStateTransition(tableIdent, redirectState, state) val properties = generateRedirectMetadata(currentConfig.`type`, state, spec, noRedirectRules) val newConfigs = txn.metadata.configuration ++ properties val newMetadata = txn.metadata.copy(configuration = newConfigs) @@ -333,7 +378,7 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) { val txn = deltaLog.startTransaction(catalogTableOpt) val snapshot = txn.snapshot getRedirectConfiguration(snapshot.metadata).foreach { currentConfig => - DeltaErrors.invalidRedirectStateTransition( + throw DeltaErrors.invalidRedirectStateTransition( catalogTableOpt.map(_.identifier.quotedString).getOrElse { s"delta.`${deltaLog.dataPath.toString}`" }, @@ -406,6 +451,52 @@ object RedirectFeature { RedirectWriterOnly.isFeatureSupported(snapshot) } + private def getRedirectConfigurationFromDeltaLog( + spark: SparkSession, + deltaLog: DeltaLog, + initialCatalogTable: Option[CatalogTable] + ): Option[TableRedirectConfiguration] = { + val snapshot = deltaLog.update( + catalogTableOpt = initialCatalogTable + ) + getRedirectConfiguration(snapshot.getProperties.toMap) + } + + /** + * This is the main method that redirect `deltaLog` to the destination location. + */ + def getRedirectLocationAndTable( + spark: SparkSession, + deltaLog: DeltaLog, + redirectConfig: TableRedirectConfiguration + ): (Path, Option[CatalogTable]) = { + // Try to get the catalogTable object for the redirect destination table. + val catalogTableOpt = redirectConfig.spec match { + case pathRedirect: PathBasedRedirectSpec => + withUpdateTableRedirectDDL(updateTableRedirectDDL = true) { + import spark.sessionState.analyzer.CatalogAndIdentifier + val CatalogAndIdentifier(catalog, ident) = Seq("delta", pathRedirect.destPath) + catalog.asTableCatalog.loadTable(ident).asInstanceOf[DeltaTableV2].catalogTable + } + } + // Get the redirect destination location. + val redirectLocation = redirectConfig.getRedirectLocation(deltaLog, spark) + (redirectLocation, catalogTableOpt) + } + + def parseRedirectConfiguration(configString: String): TableRedirectConfiguration = { + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + mapper.readValue(configString, classOf[TableRedirectConfiguration]) + } + + def getRedirectConfiguration( + properties: Map[String, String]): Option[TableRedirectConfiguration] = { + properties.get(DeltaConfigs.REDIRECT_READER_WRITER.key) + .orElse(properties.get(DeltaConfigs.REDIRECT_WRITER_ONLY.key)) + .map(parseRedirectConfiguration) + } + /** * Determine whether the operation `op` updates the existing redirect-reader-writer or * redirect-writer-only table property of a table with `snapshot`. @@ -431,4 +522,136 @@ object RedirectFeature { RedirectReaderWriter.getRedirectConfiguration(snapshot.metadata) } } + + /** Determines whether `configs` contains redirect configuration. */ + def hasRedirectConfig(configs: Map[String, String]): Boolean = + getRedirectConfiguration(configs).isDefined + + /** Determines whether the property `name` is redirect property. */ + def isRedirectProperty(name: String): Boolean = { + name == DeltaConfigs.REDIRECT_READER_WRITER.key || name == DeltaConfigs.REDIRECT_WRITER_ONLY.key + } + + // Helper method to validate state transitions + def validateStateTransition( + identifier: String, + currentState: RedirectState, + newState: RedirectState + ): Unit = { + (currentState, newState) match { + case (state, RedirectReady) => + if (state == DropRedirectInProgress) { + throw DeltaErrors.invalidRedirectStateTransition(identifier, state, newState) + } + case (state, DropRedirectInProgress) => + if (state != RedirectReady) { + throw DeltaErrors.invalidRedirectStateTransition(identifier, state, newState) + } + case (state, _) => + throw DeltaErrors.invalidRedirectStateTransition(identifier, state, newState) + } + } + + /** Determine whether the current `deltaLog` needs to skip redirect feature. */ + def needDeltaLogRedirect( + spark: SparkSession, + deltaLog: DeltaLog, + initialCatalogTable: Option[CatalogTable] + ): Option[TableRedirectConfiguration] = { + // It can skip redirect, if the table fulfills any of the following conditions: + // - redirect feature is not enable, + // - current command is an DDL that updates table redirect property, or + // - deltaLog doesn't have valid table. + val canSkipTableRedirect = !spark.conf.get(ENABLE_TABLE_REDIRECT_FEATURE) || + isUpdateTableRedirectDDL.value || + !deltaLog.tableExists + if (canSkipTableRedirect) return None + + val redirectConfigOpt = getRedirectConfigurationFromDeltaLog( + spark, + deltaLog, + initialCatalogTable + ) + val needRedirectToDest = redirectConfigOpt.exists { redirectConfig => + // If the current deltaLog already points to destination, early returns since + // no need to redirect deltaLog. + redirectConfig.needRedirect(spark, deltaLog.dataPath) + } + if (needRedirectToDest) redirectConfigOpt else None + } + + def validateTableRedirect( + snapshot: Snapshot, + catalogTable: Option[CatalogTable], + configs: Map[String, String] + ): Unit = { + val identifier = catalogTable + .map(_.identifier.quotedString) + .getOrElse(s"delta.`${snapshot.deltaLog.logPath.toString}`") + if (configs.contains(DeltaConfigs.REDIRECT_READER_WRITER.key)) { + if (RedirectWriterOnly.isFeatureSet(snapshot.metadata)) { + throw DeltaErrors.invalidSetUnSetRedirectCommand( + identifier, + DeltaConfigs.REDIRECT_READER_WRITER.key, + DeltaConfigs.REDIRECT_WRITER_ONLY.key + ) + } + } else if (configs.contains(DeltaConfigs.REDIRECT_WRITER_ONLY.key)) { + if (RedirectReaderWriter.isFeatureSet(snapshot.metadata)) { + throw DeltaErrors.invalidSetUnSetRedirectCommand( + identifier, + DeltaConfigs.REDIRECT_WRITER_ONLY.key, + DeltaConfigs.REDIRECT_READER_WRITER.key + ) + } + } else { + return + } + val currentRedirectConfigOpt = getRedirectConfiguration(snapshot) + val newRedirectConfigOpt = getRedirectConfiguration(configs) + newRedirectConfigOpt.foreach { newRedirectConfig => + val newState = newRedirectConfig.redirectState + // Validate state transitions based on current and new states + currentRedirectConfigOpt match { + case Some(currentRedirectConfig) => + validateStateTransition(identifier, currentRedirectConfig.redirectState, newState) + case None if newState == DropRedirectInProgress => + throw DeltaErrors.invalidRedirectStateTransition( + identifier, newState, DropRedirectInProgress + ) + case _ => // No action required for valid transitions + } + } + } + + val DELTALOG_PREFIX = "redirect-delta-log://" + /** + * The thread local variable for indicating whether the current session is an + * DDL that updates redirect table property. + */ + @SuppressWarnings( + Array( + "BadMethodCall-DynamicVariable", + """ + Reason: The redirect feature implementation requires a thread-local variable to control + enable/disable states during SET and UNSET operations. This approach is necessary because: + - Parameter Passing Limitation: The call stack cannot propagate this state via method + parameters, as the feature is triggered through an external open-source API interface + that does not expose this configurability. + - Concurrency Constraints: A global variable (without thread-local isolation) would allow + unintended cross-thread interference, risking undefined behavior in concurrent + transactions. We can not use lock because the lock would introduce big critical session + and create performance issue. + By using thread-local storage, the feature ensures transaction-specific state isolation + while maintaining compatibility with the third-party API's design.""" + ) + ) + private val isUpdateTableRedirectDDL = new DynamicVariable[Boolean](false) + + /** + * Execute `thunk` while `isUpdateTableRedirectDDL` is set to `updateTableRedirectDDL`. + */ + def withUpdateTableRedirectDDL[T](updateTableRedirectDDL: Boolean)(thunk: => T): T = { + isUpdateTableRedirectDDL.withValue(updateTableRedirectDDL) { thunk } + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index d7cf7aec852..06a09d8dbba 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -2327,6 +2327,13 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(false) + val ENABLE_TABLE_REDIRECT_FEATURE = + buildConf("enableTableRedirectFeature") + .doc("True if enabling the table redirect feature.") + .internal() + .booleanConf + .createWithDefault(false) + val DELTA_OPTIMIZE_WRITE_MAX_SHUFFLE_PARTITIONS = buildConf("optimizeWrite.maxShufflePartitions") .internal() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala index f82f132b4c3..f263d0852ab 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala @@ -168,7 +168,7 @@ trait CloneTableSuiteBase extends QueryTest } else { None } - val deltaTable = DeltaTableV2(spark, sourceLog.dataPath, None, None, timeTravelSpec) + val deltaTable = DeltaTableV2(spark, sourceLog.dataPath, timeTravelOpt = timeTravelSpec) val sourceData = Dataset.ofRows( spark, LogicalRelation(sourceLog.createRelation( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala index 6ededce0533..3c6b8de6cb7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta import java.io.File +// scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite import org.apache.spark.sql.delta.redirect.{ DropRedirectInProgress, @@ -26,12 +27,16 @@ import org.apache.spark.sql.delta.redirect.{ PathBasedRedirectSpec, RedirectReaderWriter, RedirectReady, + RedirectSpec, RedirectState, RedirectWriterOnly, - TableRedirect + TableRedirect, + TableRedirectConfiguration } import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils} +import org.apache.spark.sql.delta.util.JsonUtils +import org.apache.commons.text.StringEscapeUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.{QueryTest, SaveMode, SparkSession} @@ -49,6 +54,7 @@ class TableRedirectSuite extends QueryTest private def validateState( deltaLog: DeltaLog, redirectState: RedirectState, + sourceTablePath: File, destTablePath: File, feature: TableRedirect ): Unit = { @@ -65,10 +71,13 @@ class TableRedirectSuite extends QueryTest } assert(redirectConfig.redirectState == redirectState) assert(redirectConfig.`type` == PathBasedRedirectSpec.REDIRECT_TYPE) - val expectedSpecValue = s"""{"tablePath":"${destTablePath.getCanonicalPath}"}""" + val srcPath = sourceTablePath.getCanonicalPath + val dstPath = destTablePath.getCanonicalPath + val expectedSpecValue = s"""{"sourcePath":"$srcPath","destPath":"$dstPath"}""" assert(redirectConfig.specValue == expectedSpecValue) val redirectSpec = redirectConfig.spec.asInstanceOf[PathBasedRedirectSpec] - assert(redirectSpec.tablePath == destTablePath.getCanonicalPath) + assert(redirectSpec.sourcePath == srcPath) + assert(redirectSpec.destPath == dstPath) } private def validateRemovedState(deltaLog: DeltaLog, feature: TableRedirect): Unit = { @@ -84,15 +93,19 @@ class TableRedirectSuite extends QueryTest } } - def redirectTest(label: String)(f: (DeltaLog, File, File, CatalogTable) => Unit): Unit = { + def redirectTest( + label: String, enableRedirect: Boolean + )(f: (DeltaLog, File, File, CatalogTable) => Unit): Unit = { test(s"basic table redirect: $label") { withTempDir { sourceTablePath => withTempDir { destTablePath => - withTable("t1") { - sql(s"CREATE external TABLE t1(c0 long)USING delta LOCATION '$sourceTablePath';") - val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - val deltaLog = DeltaLog.forTable(spark, new Path(sourceTablePath.getCanonicalPath)) - f(deltaLog, sourceTablePath, destTablePath, catalogTable) + withSQLConf(DeltaSQLConf.ENABLE_TABLE_REDIRECT_FEATURE.key -> enableRedirect.toString) { + withTable("t1", "t2") { + sql(s"CREATE external TABLE t1(c0 long) USING delta LOCATION '$sourceTablePath';") + val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + val deltaLog = DeltaLog.forTable(spark, new Path(sourceTablePath.getCanonicalPath)) + f(deltaLog, sourceTablePath, destTablePath, catalogTable) + } } } } @@ -103,30 +116,37 @@ class TableRedirectSuite extends QueryTest val featureName = feature.config.key Seq(true, false).foreach { hasCatalogTable => redirectTest(s"basic redirect: $featureName - " + - s"hasCatalogTable: $hasCatalogTable") { case (deltaLog, _, dest, catalogTable) => + s"hasCatalogTable: $hasCatalogTable", enableRedirect = false) { + case (deltaLog, source, dest, catalogTable) => val snapshot = deltaLog.update() assert(!feature.isFeatureSet(snapshot.metadata)) - val redirectSpec = new PathBasedRedirectSpec(dest.getCanonicalPath) + val redirectSpec = new PathBasedRedirectSpec( + source.getCanonicalPath, + dest.getCanonicalPath + ) val catalogTableOpt = if (hasCatalogTable) Some(catalogTable) else None val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE // Step-1: Initiate table redirection and set to EnableRedirectInProgress state. feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec) - validateState(deltaLog, EnableRedirectInProgress, dest, feature) + validateState(deltaLog, EnableRedirectInProgress, source, dest, feature) // Step-2: Complete table redirection and set to RedirectReady state. feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec) - validateState(deltaLog, RedirectReady, dest, feature) + validateState(deltaLog, RedirectReady, source, dest, feature) // Step-3: Start dropping table redirection and set to DropRedirectInProgress state. feature.update(deltaLog, catalogTableOpt, DropRedirectInProgress, redirectSpec) - validateState(deltaLog, DropRedirectInProgress, dest, feature) + validateState(deltaLog, DropRedirectInProgress, source, dest, feature) // Step-4: Finish dropping table redirection and remove the property completely. feature.remove(deltaLog, catalogTableOpt) validateRemovedState(deltaLog, feature) // Step-5: Initiate table redirection and set to EnableRedirectInProgress state one // more time. withTempDir { destTablePath2 => - val redirectSpec = new PathBasedRedirectSpec(destTablePath2.getCanonicalPath) + val redirectSpec = new PathBasedRedirectSpec( + source.getCanonicalPath, + destTablePath2.getCanonicalPath + ) feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec) - validateState(deltaLog, EnableRedirectInProgress, destTablePath2, feature) + validateState(deltaLog, EnableRedirectInProgress, source, destTablePath2, feature) // Step-6: Finish dropping table redirection and remove the property completely. feature.remove(deltaLog, catalogTableOpt) validateRemovedState(deltaLog, feature) @@ -134,16 +154,19 @@ class TableRedirectSuite extends QueryTest } redirectTest(s"Redirect $featureName: empty no redirect rules - " + - s"hasCatalogTable: $hasCatalogTable") { + s"hasCatalogTable: $hasCatalogTable", enableRedirect = false) { case (deltaLog, source, dest, catalogTable) => val snapshot = deltaLog.update() assert(!feature.isFeatureSet(snapshot.metadata)) - val redirectSpec = new PathBasedRedirectSpec(dest.getCanonicalPath) + val redirectSpec = new PathBasedRedirectSpec( + source.getCanonicalPath, + dest.getCanonicalPath + ) val catalogTableOpt = if (hasCatalogTable) Some(catalogTable) else None val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE // 0. Initialize table redirection by setting table to EnableRedirectInProgress state. feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec) - validateState(deltaLog, EnableRedirectInProgress, dest, feature) + validateState(deltaLog, EnableRedirectInProgress, source, dest, feature) // 1. INSERT should hit DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE because table is in // EnableRedirectInProgress, which doesn't allow any DML and DDL. @@ -164,7 +187,7 @@ class TableRedirectSuite extends QueryTest // 4. INSERT should hit DELTA_NO_REDIRECT_RULES_VIOLATED since the // no-redirect-rules is empty. - validateState(deltaLog, RedirectReady, dest, feature) + validateState(deltaLog, RedirectReady, source, dest, feature) val exception3 = intercept[DeltaIllegalStateException] { sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)") } @@ -182,7 +205,7 @@ class TableRedirectSuite extends QueryTest // 7. INSERT should hit DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE because table is in // DropRedirectInProgress, which doesn't allow any DML and DDL. - validateState(deltaLog, DropRedirectInProgress, dest, feature) + validateState(deltaLog, DropRedirectInProgress, source, dest, feature) val exception5 = intercept[DeltaIllegalStateException] { sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)") } @@ -197,16 +220,19 @@ class TableRedirectSuite extends QueryTest } redirectTest(s"Redirect $featureName: no redirect rules - " + - s"hasCatalogTable: $hasCatalogTable") { + s"hasCatalogTable: $hasCatalogTable", enableRedirect = false) { case (deltaLog, source, dest, catalogTable) => val snapshot = deltaLog.update() assert(!feature.isFeatureSet(snapshot.metadata)) - val redirectSpec = new PathBasedRedirectSpec(dest.getCanonicalPath) + val redirectSpec = new PathBasedRedirectSpec( + source.getCanonicalPath, + dest.getCanonicalPath + ) val catalogTableOpt = if (hasCatalogTable) Some(catalogTable) else None val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)") feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec) - validateState(deltaLog, EnableRedirectInProgress, dest, feature) + validateState(deltaLog, EnableRedirectInProgress, source, dest, feature) // 1. Move table redirect to RedirectReady state with no redirect rules that // allows WRITE, DELETE, UPDATE. var noRedirectRules = Set( @@ -220,7 +246,7 @@ class TableRedirectSuite extends QueryTest ) ) feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec, noRedirectRules) - validateState(deltaLog, RedirectReady, dest, feature) + validateState(deltaLog, RedirectReady, source, dest, feature) sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)") sql(s"update delta.`$source` set c0 = 100") sql(s"delete from delta.`$source` where c0 = 1") @@ -233,7 +259,7 @@ class TableRedirectSuite extends QueryTest ) ) feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec, noRedirectRules) - validateState(deltaLog, RedirectReady, dest, feature) + validateState(deltaLog, RedirectReady, source, dest, feature) // 2.1. WRITE should be aborted because no-redirect-rules only allow UPDATE. val exception1 = intercept[DeltaIllegalStateException] { sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)") @@ -264,7 +290,7 @@ class TableRedirectSuite extends QueryTest ) ) feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec, noRedirectRules) - validateState(deltaLog, RedirectReady, dest, feature) + validateState(deltaLog, RedirectReady, source, dest, feature) // 3.1. The WRITE of appName "dummy" would be aborted because no-redirect-rules // only allow WRITE on application "etl". @@ -298,5 +324,76 @@ class TableRedirectSuite extends QueryTest } } } + + def alterRedirect( + table: String, + redirectType: String, + redirectState: RedirectState, + spec: RedirectSpec, + noRedirectRules: Set[NoRedirectRule] + ): Unit = { + val enableConfig = TableRedirectConfiguration( + redirectType, + redirectState.name, + JsonUtils.toJson(spec), + noRedirectRules + ) + val enableConfigJson = StringEscapeUtils.escapeJson(JsonUtils.toJson(enableConfig)) + sql(s"alter table $table set TBLPROPERTIES('$featureName' = '$enableConfigJson')") + } + + redirectTest(s"Redirect $featureName: modify table property", enableRedirect = true) { + case (deltaLog, source, dest, catalogTable) => + val redirectSpec = new PathBasedRedirectSpec( + source.getCanonicalPath, + dest.getCanonicalPath + ) + val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE + val destPath = dest.toString + val srcPath = source.toString + sql(s"CREATE external TABLE t2(c0 long) USING delta LOCATION '$dest';") + sql(s"insert into t2 values(1),(2),(3),(4),(5)") + val destTable = s"delta.`$destPath`" + val srcTable = s"delta.`$srcPath`" + // Initialize the redirection by moving table into EnableRedirectInProgress state. + alterRedirect(srcTable, redirectType, EnableRedirectInProgress, redirectSpec, Set.empty) + alterRedirect(destTable, redirectType, EnableRedirectInProgress, redirectSpec, Set.empty) + // Delta log is cloned, then moves both redirect destination table and redirect source + // table to RedirectReady state. + alterRedirect(srcTable, redirectType, RedirectReady, redirectSpec, Set.empty) + alterRedirect(destTable, redirectType, RedirectReady, redirectSpec, Set.empty) + sql(s"insert into $srcTable values(1), (2), (3)") + sql(s"insert into $destTable values(1), (2), (3)") + sql(s"insert into t1 values(1), (2), (3)") + sql(s"insert into t2 values(1), (2), (3)") + + var result = sql("select * from t1").collect() + assert(result.length == 17) + result = sql("select * from t2").collect() + assert(result.length == 17) + result = sql(s"select * from $srcTable ").collect() + assert(result.length == 17) + result = sql(s"select * from $destTable ").collect() + assert(result.length == 17) + val root = new Path(catalogTable.location) + val fs = root.getFileSystem(deltaLog.newDeltaHadoopConf) + var files = fs.listStatus(new Path(srcPath + "/_delta_log")) + .filter(_.getPath.toString.endsWith(".json")) + assert(files.length == 3) + files = fs.listStatus(new Path(destPath + "/_delta_log")) + .filter(_.getPath.toString.endsWith(".json")) + assert(files.length == 8) + // Drop redirection by moving both redirect destination table and redirect source table to + // DropRedirectInProgress. + alterRedirect(destTable, redirectType, DropRedirectInProgress, redirectSpec, Set.empty) + alterRedirect(srcTable, redirectType, DropRedirectInProgress, redirectSpec, Set.empty) + // Remove table redirect feature from redirect source table and verify table content. + sql(s"alter table $srcTable unset TBLPROPERTIES('$featureName')") + result = sql("select * from t1").collect() + assert(result.length == 0) + sql("insert into t1 values(1), (2), (3), (4)") + result = sql("select * from t1").collect() + assert(result.length == 4) + } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala b/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala index f5001f69182..0c988d5996b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala @@ -171,7 +171,7 @@ object DeltaTestImplicits { def apply(spark: SparkSession, tableDir: File, clock: Clock): DeltaTableV2 = { val tablePath = new Path(tableDir.getAbsolutePath) - new DeltaTableV2(spark, tablePath) { + new DeltaTableV2(spark, tablePath, catalogTable = None, None, None, Map.empty) { override lazy val deltaLog: DeltaLog = DeltaLog.forTable(spark, tablePath, clock) } }