Skip to content

Commit

Permalink
[Spark] Use correctly formatted commit in CommitCoordinatorClientImpl…
Browse files Browse the repository at this point in the history
…SuiteBase (#4192)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

The helpers for commit coordinator tests currently write plain strings
for commits. This PR switches these helpers to use actual commit json
instead so that the log generated by these helpers are correctly
formatted.

## How was this patch tested?
Existing unit tests

## Does this PR introduce _any_ user-facing changes?
No
  • Loading branch information
ctring authored Feb 27, 2025
1 parent 6e9498c commit 37996ce
Showing 1 changed file with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import scala.concurrent.duration._

import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol}
import org.apache.spark.sql.delta.actions.{Action, CommitInfo, Metadata, Protocol}
import org.apache.spark.sql.delta.storage.{LogStore, LogStoreProvider}
import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
Expand Down Expand Up @@ -122,7 +122,10 @@ trait CommitCoordinatorClientImplSuiteBase extends QueryTest
}

protected def writeCommitZero(logPath: Path): Unit = {
store.write(FileNames.unsafeDeltaFile(logPath, 0), Iterator("0", "0"), overwrite = false)
val commitInfo = CommitInfo.empty(version = Some(0)).withTimestamp(0)
.copy(inCommitTimestamp = Some(0))
val actions = Iterator(commitInfo.json, Metadata().json, Protocol().json)
store.write(FileNames.unsafeDeltaFile(logPath, 0), actions, overwrite = false)
}

/**
Expand Down Expand Up @@ -163,7 +166,7 @@ trait CommitCoordinatorClientImplSuiteBase extends QueryTest
}
tableCommitCoordinatorClient.commit(
version,
Iterator(s"$version", s"$timestamp"),
Iterator(commitInfo.json),
updatedActions).getCommit
}

Expand All @@ -173,9 +176,13 @@ trait CommitCoordinatorClientImplSuiteBase extends QueryTest
timestampOpt: Option[Long] = None): Unit = {
val delta = FileNames.unsafeDeltaFile(logPath, version)
if (timestampOpt.isDefined) {
assert(store.read(delta, sessionHadoopConf) == Seq(s"$version", s"${timestampOpt.get}"))
val commitInfo = CommitInfo.empty(version = Some(version))
.withTimestamp(timestampOpt.get)
.copy(inCommitTimestamp = timestampOpt)
assert(store.read(delta, sessionHadoopConf).head == commitInfo.json)
} else {
assert(store.read(delta, sessionHadoopConf).take(1) == Seq(s"$version"))
assert(Action.fromJson(store.read(delta, sessionHadoopConf).head)
.isInstanceOf[CommitInfo])
}
}

Expand Down

0 comments on commit 37996ce

Please sign in to comment.