Skip to content

Commit

Permalink
Use correctly formatted commit in CommitCoordinatorClientImplSuiteBase
Browse files Browse the repository at this point in the history
  • Loading branch information
ctring committed Feb 26, 2025
1 parent 6e9498c commit c92ae12
Showing 1 changed file with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import scala.concurrent.duration._

import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol}
import org.apache.spark.sql.delta.actions.{Action, CommitInfo, Metadata, Protocol}
import org.apache.spark.sql.delta.storage.{LogStore, LogStoreProvider}
import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
Expand Down Expand Up @@ -122,7 +122,10 @@ trait CommitCoordinatorClientImplSuiteBase extends QueryTest
}

protected def writeCommitZero(logPath: Path): Unit = {
store.write(FileNames.unsafeDeltaFile(logPath, 0), Iterator("0", "0"), overwrite = false)
val commitInfo = CommitInfo.empty(version = Some(0)).withTimestamp(0)
.copy(inCommitTimestamp = Some(0))
val actions = Iterator(commitInfo.json, Metadata().json, Protocol().json)
store.write(FileNames.unsafeDeltaFile(logPath, 0), actions, overwrite = false)
}

/**
Expand Down Expand Up @@ -163,7 +166,7 @@ trait CommitCoordinatorClientImplSuiteBase extends QueryTest
}
tableCommitCoordinatorClient.commit(
version,
Iterator(s"$version", s"$timestamp"),
Iterator(commitInfo.json),
updatedActions).getCommit
}

Expand All @@ -173,9 +176,13 @@ trait CommitCoordinatorClientImplSuiteBase extends QueryTest
timestampOpt: Option[Long] = None): Unit = {
val delta = FileNames.unsafeDeltaFile(logPath, version)
if (timestampOpt.isDefined) {
assert(store.read(delta, sessionHadoopConf) == Seq(s"$version", s"${timestampOpt.get}"))
val commitInfo = CommitInfo.empty(version = Some(version))
.withTimestamp(timestampOpt.get)
.copy(inCommitTimestamp = timestampOpt)
assert(store.read(delta, sessionHadoopConf).head == commitInfo.json)
} else {
assert(store.read(delta, sessionHadoopConf).take(1) == Seq(s"$version"))
assert(Action.fromJson(store.read(delta, sessionHadoopConf).head)
.isInstanceOf[CommitInfo])
}
}

Expand Down

0 comments on commit c92ae12

Please sign in to comment.