From 55adec5bcac03065f7dd998b7be79fac1c738c6a Mon Sep 17 00:00:00 2001 From: Carmen Kwan Date: Fri, 28 Feb 2025 18:19:44 +0100 Subject: [PATCH] [Spark] Throw an exception on write for DV size mismatch (#4203) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Right now, we are only logging for delta.deletionVector.write.offsetMismatch. In this PR, we trigger an exception at the time of writing the DV, so before we make a commit to the table. It'll fail the specific query, but won't put the table in a broken and unreadable state. We also change the log from delta.deletionVector.write.offsetMismatch into a delta assertion. ## How was this patch tested? Existing tests pass. ## Does this PR introduce _any_ user-facing changes? Yes. If we detect an issue related to the file size while writing Deletion Vector files, we will now throw an exception at the time of writing the DV, before we make a commit to the table. This will fail the specific query, but will prevent the table from being in an unreadable state. --- .../sql/delta/storage/dv/DeletionVectorStore.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/storage/dv/DeletionVectorStore.scala b/spark/src/main/scala/org/apache/spark/sql/delta/storage/dv/DeletionVectorStore.scala index 51f472472c7..0f40a9dff01 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/storage/dv/DeletionVectorStore.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/storage/dv/DeletionVectorStore.scala @@ -223,13 +223,16 @@ class HadoopFileSystemDVStore(hadoopConf: Configuration) checksum = DeletionVectorStore.calculateChecksum(data)) if (writtenBytes != dvRange.offset) { - recordDeltaEvent( - deltaLog = null, - opType = "delta.deletionVector.write.offsetMismatch", + deltaAssert( + writtenBytes == dvRange.offset, + name = "dv.write.offsetMismatch", + msg = s"Offset mismatch while writing deletion vector to file", data = Map( "path" -> path.path.toString, "reportedOffset" -> dvRange.offset, - "calculatedOffset" -> writtenBytes)) + "calculatedOffset" -> writtenBytes) + ) + throw DeltaErrors.deletionVectorSizeMismatch() } log.debug(s"Writing DV range to file: Path=${path.path}, Range=${dvRange}")