Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into tfIntegration
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Feb 24, 2025
2 parents 5498ef7 + 9c932bf commit c2b6902
Show file tree
Hide file tree
Showing 24 changed files with 97 additions and 1 deletion.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"txnId":"bda32d72-442d-4705-9a8c-16093eb31744","tableSizeBytes":452,"numFiles":1,"numMetadata":1,"numProtocol":1,"setTransactions":[],"domainMetadata":[],"metadata":{"id":"da00fe29-8b6e-4f3b-b91f-a3729283bc1a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"month\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["month"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1740185389028},"protocol":{"minReaderVersion":1,"minWriterVersion":7,"writerFeatures":["changeDataFeed","appendOnly","invariants"]},"allFiles":[{"path":"month=1/part-00000-22d25ea7-a383-44df-ad22-6b06d871b547.c000.snappy.parquet","partitionValues":{"month":"1"},"size":452,"modificationTime":1740185390672,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}"}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1740185390903,"operation":"CREATE TABLE AS SELECT","operationParameters":{"partitionBy":"[\"month\"]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{\"delta.enableChangeDataFeed\":\"true\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"452"},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.4.0-SNAPSHOT","txnId":"bda32d72-442d-4705-9a8c-16093eb31744"}}
{"metaData":{"id":"da00fe29-8b6e-4f3b-b91f-a3729283bc1a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"month\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["month"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1740185389028}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":7,"writerFeatures":["changeDataFeed","appendOnly","invariants"]}}
{"add":{"path":"month=1/part-00000-22d25ea7-a383-44df-ad22-6b06d871b547.c000.snappy.parquet","partitionValues":{"month":"1"},"size":452,"modificationTime":1740185390672,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"txnId":"0d7d28b8-55c2-4d8b-b48e-88b22c90aed1","tableSizeBytes":904,"numFiles":2,"numMetadata":1,"numProtocol":1,"setTransactions":[],"domainMetadata":[],"metadata":{"id":"da00fe29-8b6e-4f3b-b91f-a3729283bc1a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"month\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["month"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1740185389028},"protocol":{"minReaderVersion":1,"minWriterVersion":7,"writerFeatures":["changeDataFeed","appendOnly","invariants"]},"allFiles":[{"path":"month=2/part-00000-cc2a9650-0450-4879-9757-873b7f544510.c000.snappy.parquet","partitionValues":{"month":"2"},"size":452,"modificationTime":1740185395663,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}"},{"path":"month=1/part-00000-22d25ea7-a383-44df-ad22-6b06d871b547.c000.snappy.parquet","partitionValues":{"month":"1"},"size":452,"modificationTime":1740185390672,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}"}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1740185395669,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"452"},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.4.0-SNAPSHOT","txnId":"0d7d28b8-55c2-4d8b-b48e-88b22c90aed1"}}
{"add":{"path":"month=2/part-00000-cc2a9650-0450-4879-9757-873b7f544510.c000.snappy.parquet","partitionValues":{"month":"2"},"size":452,"modificationTime":1740185395663,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"txnId":"79b3e3aa-82dc-4c18-b95e-8b50089b55c7","tableSizeBytes":904,"numFiles":2,"numMetadata":1,"numProtocol":1,"setTransactions":[],"domainMetadata":[],"metadata":{"id":"da00fe29-8b6e-4f3b-b91f-a3729283bc1a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"month\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["month"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1740185389028},"protocol":{"minReaderVersion":1,"minWriterVersion":7,"writerFeatures":["changeDataFeed","appendOnly","invariants"]},"allFiles":[{"path":"month=2/part-00000-129a0441-5f41-4e46-be33-fd0289e53614.c000.snappy.parquet","partitionValues":{"month":"2"},"size":452,"modificationTime":1740185397380,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}"},{"path":"month=1/part-00000-c5babbd8-6013-484c-818f-22d546976866.c000.snappy.parquet","partitionValues":{"month":"1"},"size":452,"modificationTime":1740185397384,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}"}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"commitInfo":{"timestamp":1740185397394,"operation":"OPTIMIZE","operationParameters":{"predicate":"[]","zOrderBy":"[\"id\"]","clusterBy":"[]","auto":false},"readVersion":1,"isolationLevel":"SnapshotIsolation","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"2","numRemovedBytes":"904","p25FileSize":"452","numDeletionVectorsRemoved":"0","minFileSize":"452","numAddedFiles":"2","maxFileSize":"452","p75FileSize":"452","p50FileSize":"452","numAddedBytes":"904"},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.4.0-SNAPSHOT","txnId":"79b3e3aa-82dc-4c18-b95e-8b50089b55c7"}}
{"add":{"path":"month=1/part-00000-c5babbd8-6013-484c-818f-22d546976866.c000.snappy.parquet","partitionValues":{"month":"1"},"size":452,"modificationTime":1740185397384,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}"}}
{"remove":{"path":"month=1/part-00000-22d25ea7-a383-44df-ad22-6b06d871b547.c000.snappy.parquet","deletionTimestamp":1740185396708,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{"month":"1"},"size":452,"stats":"{\"numRecords\":1}"}}
{"add":{"path":"month=2/part-00000-129a0441-5f41-4e46-be33-fd0289e53614.c000.snappy.parquet","partitionValues":{"month":"2"},"size":452,"modificationTime":1740185397380,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}"}}
{"remove":{"path":"month=2/part-00000-cc2a9650-0450-4879-9757-873b7f544510.c000.snappy.parquet","deletionTimestamp":1740185396708,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{"month":"2"},"size":452,"stats":"{\"numRecords\":1}"}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,24 @@ class GoldenTables extends QueryTest with SharedSparkSession {
Row(0, 0) :: Nil,
schema)
}

generateGoldenTable("commit-info-containing-arbitrary-operationParams-types") { tablePath =>
spark.sql(
f"""
|CREATE TABLE delta.`$tablePath`
|USING DELTA
|PARTITIONED BY (month)
|TBLPROPERTIES (delta.enableChangeDataFeed = true)
|AS
|SELECT 1 AS id, 1 AS month""".stripMargin)

// Add some data
spark.sql("INSERT INTO delta.`%s` VALUES (2, 2)".format(tablePath))

// Run optimize that generates a commitInfo with arbitrary value types
// operationParameters
spark.sql("OPTIMIZE delta.`%s` ZORDER BY id".format(tablePath))
}
}

case class TestStruct(f1: String, f2: Long)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,23 @@ public ColumnVector getElements() {
List<Object> values = new ArrayList<>(jsonValue.size());
final Iterator<Map.Entry<String, JsonNode>> iter = jsonValue.fields();

boolean isValueOfStringType = mapType.getValueType() instanceof StringType;
while (iter.hasNext()) {
Map.Entry<String, JsonNode> entry = iter.next();
String keyParsed = entry.getKey();
Object valueParsed = decodeElement(entry.getValue(), mapType.getValueType());

Object valueParsed = null;
if (isValueOfStringType) {
// Special handling for value which is of type string. Delta tables generated by
// Delta-Spark ended up having serializing values as their original type and not
// as string in the Delta commit files.
// Ex. {"key": true} instead of {"key": "true"}
if (!entry.getValue().isNull()) {
valueParsed = entry.getValue().asText();
}
} else {
valueParsed = decodeElement(entry.getValue(), mapType.getValueType());
}
if (valueParsed == null && !mapType.isValueContainsNull()) {
throw new RuntimeException(
"Map type expects no nulls in values, but " + "received `null` as value");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,17 @@ class TableChangesSuite extends AnyFunSuite with TestUtils {
}.getMessage.contains("Unsupported Delta protocol reader version"))
}

withGoldenTable("commit-info-containing-arbitrary-operationParams-types") { tablePath =>
test("getChanges - commit info with arbitrary operationParams types") {
// Check all actions are correctly retrieved
testGetChangesVsSpark(
tablePath,
0,
2,
FULL_ACTION_SET)
}
}

//////////////////////////////////////////////////////////////////////////////////
// Helpers to compare actions returned between Kernel and Spark
//////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._

import io.delta.kernel.data.ColumnVector
import io.delta.kernel.defaults.utils.{DefaultVectorTestUtils, TestRow, TestUtils}
import io.delta.kernel.internal.actions.CommitInfo
import io.delta.kernel.internal.util.InternalUtils.singletonStringColumnVector
import io.delta.kernel.types._

Expand Down Expand Up @@ -436,4 +437,43 @@ class DefaultJsonHandlerSuite extends AnyFunSuite with TestUtils with DefaultVec
writeAndVerify(overwrite = true)
}
}

test("parse diverse type values in a map[string, string]") {
val input =
"""
|{
| "inCommitTimestamp":1740009523401,
| "timestamp":1740009523401,
| "engineInfo":"myengine.com",
| "operation":"WRITE",
| "operationParameters":
| {"mode":"Append","statsOnLoad":false,"partitionBy":"[]"},
| "isBlindAppend":true,
| "txnId":"cb009f42-5da1-4e7e-b4fa-09de3332f52a",
| "operationMetrics": {
| "numFiles":"1",
| "serializedAsNumber":2,
| "serializedAsBoolean":true
| }
|}
|""".stripMargin

val output = jsonHandler.parseJson(
stringVector(Seq(input)),
CommitInfo.FULL_SCHEMA,
Optional.empty())
assert(output.getSize == 1)
val actResult = TestRow(output.getRows.next)
val expResult = TestRow(
1740009523401L,
1740009523401L,
"myengine.com",
"WRITE",
Map("mode" -> "Append", "statsOnLoad" -> "false", "partitionBy" -> "[]"),
true,
"cb009f42-5da1-4e7e-b4fa-09de3332f52a",
Map("numFiles" -> "1", "serializedAsNumber" -> "2", "serializedAsBoolean" -> "true"))

checkAnswer(Seq(actResult), Seq(expResult))
}
}

0 comments on commit c2b6902

Please sign in to comment.