Skip to content

Commit

Permalink
[Kernel] Handle the non-uniform value type in map[string, string] in …
Browse files Browse the repository at this point in the history
…delta commit files (#4182)

## Description
See for #3888 for details. 

Some versions of the Delta-Spark written tables with commit info
containing arbitrary value types in the map of s`tring -> string`. This
has existed for a while. Update the Kernel default delta commit file
reader to always try to parse as a `string` if the value type is a
`string` type. This is not ideal, but no other easy ways to handle this.

## How was this patch tested?
UT.
  • Loading branch information
vkorukanti authored Feb 24, 2025
1 parent e628ff9 commit 9c932bf
Show file tree
Hide file tree
Showing 24 changed files with 97 additions and 1 deletion.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"txnId":"bda32d72-442d-4705-9a8c-16093eb31744","tableSizeBytes":452,"numFiles":1,"numMetadata":1,"numProtocol":1,"setTransactions":[],"domainMetadata":[],"metadata":{"id":"da00fe29-8b6e-4f3b-b91f-a3729283bc1a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"month\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["month"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1740185389028},"protocol":{"minReaderVersion":1,"minWriterVersion":7,"writerFeatures":["changeDataFeed","appendOnly","invariants"]},"allFiles":[{"path":"month=1/part-00000-22d25ea7-a383-44df-ad22-6b06d871b547.c000.snappy.parquet","partitionValues":{"month":"1"},"size":452,"modificationTime":1740185390672,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}"}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1740185390903,"operation":"CREATE TABLE AS SELECT","operationParameters":{"partitionBy":"[\"month\"]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{\"delta.enableChangeDataFeed\":\"true\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"452"},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.4.0-SNAPSHOT","txnId":"bda32d72-442d-4705-9a8c-16093eb31744"}}
{"metaData":{"id":"da00fe29-8b6e-4f3b-b91f-a3729283bc1a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"month\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["month"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1740185389028}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":7,"writerFeatures":["changeDataFeed","appendOnly","invariants"]}}
{"add":{"path":"month=1/part-00000-22d25ea7-a383-44df-ad22-6b06d871b547.c000.snappy.parquet","partitionValues":{"month":"1"},"size":452,"modificationTime":1740185390672,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"txnId":"0d7d28b8-55c2-4d8b-b48e-88b22c90aed1","tableSizeBytes":904,"numFiles":2,"numMetadata":1,"numProtocol":1,"setTransactions":[],"domainMetadata":[],"metadata":{"id":"da00fe29-8b6e-4f3b-b91f-a3729283bc1a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"month\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["month"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1740185389028},"protocol":{"minReaderVersion":1,"minWriterVersion":7,"writerFeatures":["changeDataFeed","appendOnly","invariants"]},"allFiles":[{"path":"month=2/part-00000-cc2a9650-0450-4879-9757-873b7f544510.c000.snappy.parquet","partitionValues":{"month":"2"},"size":452,"modificationTime":1740185395663,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}"},{"path":"month=1/part-00000-22d25ea7-a383-44df-ad22-6b06d871b547.c000.snappy.parquet","partitionValues":{"month":"1"},"size":452,"modificationTime":1740185390672,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}"}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1740185395669,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"452"},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.4.0-SNAPSHOT","txnId":"0d7d28b8-55c2-4d8b-b48e-88b22c90aed1"}}
{"add":{"path":"month=2/part-00000-cc2a9650-0450-4879-9757-873b7f544510.c000.snappy.parquet","partitionValues":{"month":"2"},"size":452,"modificationTime":1740185395663,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"txnId":"79b3e3aa-82dc-4c18-b95e-8b50089b55c7","tableSizeBytes":904,"numFiles":2,"numMetadata":1,"numProtocol":1,"setTransactions":[],"domainMetadata":[],"metadata":{"id":"da00fe29-8b6e-4f3b-b91f-a3729283bc1a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"month\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["month"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1740185389028},"protocol":{"minReaderVersion":1,"minWriterVersion":7,"writerFeatures":["changeDataFeed","appendOnly","invariants"]},"allFiles":[{"path":"month=2/part-00000-129a0441-5f41-4e46-be33-fd0289e53614.c000.snappy.parquet","partitionValues":{"month":"2"},"size":452,"modificationTime":1740185397380,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}"},{"path":"month=1/part-00000-c5babbd8-6013-484c-818f-22d546976866.c000.snappy.parquet","partitionValues":{"month":"1"},"size":452,"modificationTime":1740185397384,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}"}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"commitInfo":{"timestamp":1740185397394,"operation":"OPTIMIZE","operationParameters":{"predicate":"[]","zOrderBy":"[\"id\"]","clusterBy":"[]","auto":false},"readVersion":1,"isolationLevel":"SnapshotIsolation","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"2","numRemovedBytes":"904","p25FileSize":"452","numDeletionVectorsRemoved":"0","minFileSize":"452","numAddedFiles":"2","maxFileSize":"452","p75FileSize":"452","p50FileSize":"452","numAddedBytes":"904"},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.4.0-SNAPSHOT","txnId":"79b3e3aa-82dc-4c18-b95e-8b50089b55c7"}}
{"add":{"path":"month=1/part-00000-c5babbd8-6013-484c-818f-22d546976866.c000.snappy.parquet","partitionValues":{"month":"1"},"size":452,"modificationTime":1740185397384,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}"}}
{"remove":{"path":"month=1/part-00000-22d25ea7-a383-44df-ad22-6b06d871b547.c000.snappy.parquet","deletionTimestamp":1740185396708,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{"month":"1"},"size":452,"stats":"{\"numRecords\":1}"}}
{"add":{"path":"month=2/part-00000-129a0441-5f41-4e46-be33-fd0289e53614.c000.snappy.parquet","partitionValues":{"month":"2"},"size":452,"modificationTime":1740185397380,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}"}}
{"remove":{"path":"month=2/part-00000-cc2a9650-0450-4879-9757-873b7f544510.c000.snappy.parquet","deletionTimestamp":1740185396708,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{"month":"2"},"size":452,"stats":"{\"numRecords\":1}"}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,24 @@ class GoldenTables extends QueryTest with SharedSparkSession {
Row(0, 0) :: Nil,
schema)
}

generateGoldenTable("commit-info-containing-arbitrary-operationParams-types") { tablePath =>
spark.sql(
f"""
|CREATE TABLE delta.`$tablePath`
|USING DELTA
|PARTITIONED BY (month)
|TBLPROPERTIES (delta.enableChangeDataFeed = true)
|AS
|SELECT 1 AS id, 1 AS month""".stripMargin)

// Add some data
spark.sql("INSERT INTO delta.`%s` VALUES (2, 2)".format(tablePath))

// Run optimize that generates a commitInfo with arbitrary value types
// operationParameters
spark.sql("OPTIMIZE delta.`%s` ZORDER BY id".format(tablePath))
}
}

case class TestStruct(f1: String, f2: Long)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,23 @@ public ColumnVector getElements() {
List<Object> values = new ArrayList<>(jsonValue.size());
final Iterator<Map.Entry<String, JsonNode>> iter = jsonValue.fields();

boolean isValueOfStringType = mapType.getValueType() instanceof StringType;
while (iter.hasNext()) {
Map.Entry<String, JsonNode> entry = iter.next();
String keyParsed = entry.getKey();
Object valueParsed = decodeElement(entry.getValue(), mapType.getValueType());

Object valueParsed = null;
if (isValueOfStringType) {
// Special handling for value which is of type string. Delta tables generated by
// Delta-Spark ended up having serializing values as their original type and not
// as string in the Delta commit files.
// Ex. {"key": true} instead of {"key": "true"}
if (!entry.getValue().isNull()) {
valueParsed = entry.getValue().asText();
}
} else {
valueParsed = decodeElement(entry.getValue(), mapType.getValueType());
}
if (valueParsed == null && !mapType.isValueContainsNull()) {
throw new RuntimeException(
"Map type expects no nulls in values, but " + "received `null` as value");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,17 @@ class TableChangesSuite extends AnyFunSuite with TestUtils {
}.getMessage.contains("Unsupported Delta protocol reader version"))
}

withGoldenTable("commit-info-containing-arbitrary-operationParams-types") { tablePath =>
test("getChanges - commit info with arbitrary operationParams types") {
// Check all actions are correctly retrieved
testGetChangesVsSpark(
tablePath,
0,
2,
FULL_ACTION_SET)
}
}

//////////////////////////////////////////////////////////////////////////////////
// Helpers to compare actions returned between Kernel and Spark
//////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._

import io.delta.kernel.data.ColumnVector
import io.delta.kernel.defaults.utils.{DefaultVectorTestUtils, TestRow, TestUtils}
import io.delta.kernel.internal.actions.CommitInfo
import io.delta.kernel.internal.util.InternalUtils.singletonStringColumnVector
import io.delta.kernel.types._

Expand Down Expand Up @@ -436,4 +437,43 @@ class DefaultJsonHandlerSuite extends AnyFunSuite with TestUtils with DefaultVec
writeAndVerify(overwrite = true)
}
}

test("parse diverse type values in a map[string, string]") {
val input =
"""
|{
| "inCommitTimestamp":1740009523401,
| "timestamp":1740009523401,
| "engineInfo":"myengine.com",
| "operation":"WRITE",
| "operationParameters":
| {"mode":"Append","statsOnLoad":false,"partitionBy":"[]"},
| "isBlindAppend":true,
| "txnId":"cb009f42-5da1-4e7e-b4fa-09de3332f52a",
| "operationMetrics": {
| "numFiles":"1",
| "serializedAsNumber":2,
| "serializedAsBoolean":true
| }
|}
|""".stripMargin

val output = jsonHandler.parseJson(
stringVector(Seq(input)),
CommitInfo.FULL_SCHEMA,
Optional.empty())
assert(output.getSize == 1)
val actResult = TestRow(output.getRows.next)
val expResult = TestRow(
1740009523401L,
1740009523401L,
"myengine.com",
"WRITE",
Map("mode" -> "Append", "statsOnLoad" -> "false", "partitionBy" -> "[]"),
true,
"cb009f42-5da1-4e7e-b4fa-09de3332f52a",
Map("numFiles" -> "1", "serializedAsNumber" -> "2", "serializedAsBoolean" -> "true"))

checkAnswer(Seq(actResult), Seq(expResult))
}
}

0 comments on commit 9c932bf

Please sign in to comment.