Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Feb 25, 2025
1 parent fbd525c commit 30127c3
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -485,20 +485,15 @@ public static void validateKernelCanReadTheTable(Protocol protocol, String table
public static void validateKernelCanWriteToTable(
Protocol protocol, Metadata metadata, String tablePath) {

if (protocol.getMinReaderVersion() > TABLE_FEATURES_MIN_READER_VERSION) {
throw DeltaErrors.unsupportedReaderProtocol(tablePath, protocol.getMinReaderVersion());
}
validateKernelCanReadTheTable(protocol, tablePath);

if (protocol.getMinWriterVersion() > TABLE_FEATURES_MIN_WRITER_VERSION) {
throw unsupportedWriterProtocol(tablePath, protocol.getMinWriterVersion());
}

Set<TableFeature> unsupportedFeatures =
protocol.getImplicitlyAndExplicitlySupportedFeatures().stream()
.filter(
f ->
!f.hasKernelWriteSupport(metadata)
|| (f.isReaderWriterFeature() && !f.hasKernelReadSupport()))
.filter(f -> !f.hasKernelWriteSupport(metadata))
.collect(toSet());

if (!unsupportedFeatures.isEmpty()) {
Expand Down Expand Up @@ -538,11 +533,8 @@ private static Set<TableFeature> extractAutomaticallyEnabledNewFeatures(
.metadataRequiresFeatureToBeEnabled(currentProtocol, newMetadata))
.collect(toSet());

Set<TableFeature> combinedFeatures = new HashSet<>(protocolSupportedFeatures);
combinedFeatures.addAll(metadataEnabledFeatures);

// Each feature may have dependencies that are not yet enabled in the protocol.
Set<TableFeature> newFeatures = getDependencyFeatures(combinedFeatures);
Set<TableFeature> newFeatures = getDependencyFeatures(metadataEnabledFeatures);
newFeatures.removeAll(protocolSupportedFeatures);

return newFeatures;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ class TableFeaturesSuite extends AnyFunSuite {
.filter(_.hasKernelWriteSupport(testMetadata()))
.collect(toList()).asScala

// checkConstraints, generatedColumns, identityColumns, invariants, changeDataFeed,
// timestampNtz are writable because the metadata has not been set the info that
// checkConstraints, generatedColumns, identityColumns, invariants and changeDataFeed
// are writable because the metadata has not been set the info that
// these features are enabled
val expected = Seq(
"columnMapping",
Expand Down Expand Up @@ -281,7 +281,42 @@ class TableFeaturesSuite extends AnyFunSuite {
}
}

// Read is supported when all table readerWriter features are supported by the Kernel,
// but the table has writeOnly table feature unknonw to Kernel
test("validateKernelCanReadTheTable: with writeOnly feature unknown to Kernel") {

// legacy reader protocol version
val protocol1 = new Protocol(1, 7, emptySet(), singleton("unknownFeature"))
validateKernelCanReadTheTable(protocol1, "/test/table")

// table feature supported reader version
val protocol2 = new Protocol(
3,
7,
Set("columnMapping", "timestampNtz").asJava,
Set("columnMapping", "timestampNtz", "unknownFeature").asJava)
validateKernelCanReadTheTable(protocol2, "/test/table")
}

test("validateKernelCanReadTheTable: reader version > 3") {
val protocol = new Protocol(4, 7, emptySet(), singleton("unknownFeature"))
val ex = intercept[KernelException] {
validateKernelCanReadTheTable(protocol, "/test/table")
}
assert(ex.getMessage.contains(
"requires reader version 4 which is unsupported by this version of Delta Kernel"))
}

// Writes
checkWriteUnsupported(
"validateKernelCanWriteToTable: protocol 8", // beyond the table feature writer version
new Protocol(3, 8))

checkWriteUnsupported(
// beyond the table feature reader/writer version
"validateKernelCanWriteToTable: protocol 4, 8",
new Protocol(4, 8))

checkWriteSupported(
"validateKernelCanWriteToTable: protocol 1",
new Protocol(1, 1),
Expand Down Expand Up @@ -471,7 +506,7 @@ class TableFeaturesSuite extends AnyFunSuite {
s"validateKernelCanWriteToTable: protocol 7 with $feature, " +
s"metadata contains $feature",
new Protocol(3, 7, singleton(feature), singleton(feature)),
testMetadata(tblProps = Map("delta.enableTypeWidening" -> "true")))
testMetadata(includeInvariant = true))
}

checkWriteSupported(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
assertCheckpointReadiness(result, expIsReadyForCheckpoint)
}

// TODO: Change this to use the table metadata and protocol and
// not rely on DESCRIBE which adds some properties based on the protocol.
def verifyTableProperties(
tablePath: String,
expProperties: ListMap[String, Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
test("create table with all supported types") {
withTempDirAndEngine { (tablePath, engine) =>
val parquetAllTypes = goldenTablePath("parquet-all-types")
val schema = removeUnsupportedTypes(tableSchema(parquetAllTypes))
val schema = removeTimestampNtzTypeColumns(tableSchema(parquetAllTypes))

val table = Table.forPath(engine, tablePath)
val txnBuilder = table.createTransactionBuilder(engine, testEngineInfo, CREATE_TABLE)
Expand Down Expand Up @@ -628,7 +628,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
test("insert into table - all supported types data") {
withTempDirAndEngine { (tblPath, engine) =>
val parquetAllTypes = goldenTablePath("parquet-all-types")
val schema = removeUnsupportedTypes(tableSchema(parquetAllTypes))
val schema = removeTimestampNtzTypeColumns(tableSchema(parquetAllTypes))

val data = readTableUsingKernel(engine, parquetAllTypes, schema).to[Seq]
val dataWithPartInfo = Seq(Map.empty[String, Literal] -> data)
Expand Down Expand Up @@ -656,10 +656,14 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa

Seq(true, false).foreach { includeTimestampNtz =>
test(s"insert into partitioned table - all supported partition column types data - " +
s"timesatmp_ntz included = $includeTimestampNtz") {
s"timestamp_ntz included = $includeTimestampNtz") {
withTempDirAndEngine { (tblPath, engine) =>
val parquetAllTypes = goldenTablePath("parquet-all-types")
val schema = removeUnsupportedTypes(tableSchema(parquetAllTypes))
val tableSchema = tableSchema(parquetAllTypes)
val schema = if (includeTimestampNtz) {
removeTimestampNtzTypeColumns(tableSchema(parquetAllTypes))
} else tableSchema

val partCols = Seq(
"byteType",
"shortType",
Expand Down Expand Up @@ -987,7 +991,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

def removeUnsupportedTypes(structType: StructType): StructType = {
def removeTimestampNtzTypeColumns(structType: StructType): StructType = {
def process(dataType: DataType): Option[DataType] = dataType match {
case a: ArrayType =>
val newElementType = process(a.getElementType)
Expand All @@ -1000,9 +1004,9 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
Some(new MapType(newKeyType, newValueType, m.isValueContainsNull))
case _ => None
}
// case _: TimestampNTZType => None // ignore
case _: TimestampNTZType => None // ignore
case s: StructType =>
val newType = removeUnsupportedTypes(s);
val newType = removeTimestampNtzTypeColumns(s);
if (newType.length() > 0) {
Some(newType)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase {
tablePath,
ListMap(
// appendOnly, invariants implicitly supported as the protocol is upgraded from 2 to 7
// These properties are not set in the table properties, but are generated by the
// Spark describe
IN_COMMIT_TIMESTAMPS_ENABLED.getKey -> true,
"delta.feature.appendOnly" -> "supported",
"delta.feature.inCommitTimestamp" -> "supported",
Expand Down

0 comments on commit 30127c3

Please sign in to comment.