Skip to content

Commit

Permalink
[SPARK-46179][SQL] Add CrossDbmsQueryTestSuites, which runs other DBM…
Browse files Browse the repository at this point in the history
…S against golden files with other DBMS, starting with Postgres

### What changes were proposed in this pull request?

Create `CrossDbmsQueryTestSuite`, which extends `SQLQueryTestHelper` and `DockerIntegrationSuite`. `CrossDbmsQueryTestSuite` is a trait class that allows testing golden files against other DBMS.

`PostgreSQLQueryTestSuite` is an implementation of `CrossDbmsQueryTestSuite`. For starters, sql files in the subquery sql-tests are automatically opted into this test. In this PR, all files except for `exists-having.sql` are ignored, otherwise this PR would have 10K+ line changes (I would like to do that in the next PR, if possible). I had to change the syntax for view creation in `exists-having.sql` slightly, and this is reflected in the `analyzer-results` file, but crucially, the query output (in the `results` file) are not changed.

Note that this will not be applicable to many of the current sql tests we have due to:
- Incompatible SQL syntax between spark sql and postgres.
- Incompatible data types.
- Difference in numerical precision with doubles.
- Missing functions in either system.
- Test files with specific configs, such as ANSI, count bug etc.

### Why are the changes needed?

For correctness checking of our SQLQueryTestSuites, we want to run SQLQueryTestSuites with Postgres as a reference DBMS. This can be easily extensible to other DBMS.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This is a test-related PR, does not affect system behaviors.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44084 from andylam-db/crossdbms.

Authored-by: Andy Lam <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
andylam-db authored and cloud-fan committed Jan 5, 2024
1 parent 3793c2f commit f9ca519
Show file tree
Hide file tree
Showing 46 changed files with 738 additions and 479 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc

import java.io.File
import java.sql.ResultSet

import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLQueryTestHelper
import org.apache.spark.sql.catalyst.util.fileToString

/**
* This suite builds off of that to allow us to run other DBMS against the SQL test golden files (on
* which SQLQueryTestSuite generates and tests against) to perform cross-checking for correctness.
* Note that this is not currently run on all SQL input files by default because there is
* incompatibility between SQL dialects for Spark and the other DBMS.
*
* This suite adds a new comment argument, --ONLY_IF. This comment is used to indicate the DBMS for
* which is eligible for the SQL file. These strings are defined in the companion object. For
* example, if you have a SQL file named `describe.sql`, and you want to indicate that Postgres is
* incompatible, add the following comment into the input file:
* --ONLY_IF spark
*/
trait CrossDbmsQueryTestSuite extends DockerJDBCIntegrationSuite with SQLQueryTestHelper {

val DATABASE_NAME: String

protected val baseResourcePath = {
// We use a path based on Spark home for 2 reasons:
// 1. Maven can't get correct resource directory when resources in other jars.
// 2. We test subclasses in the hive-thriftserver module.
getWorkspaceFilePath("sql", "core", "src", "test", "resources", "sql-tests").toFile
}
protected val inputFilePath = new File(baseResourcePath, "inputs").getAbsolutePath
protected val customInputFilePath: String
protected val goldenFilePath = new File(baseResourcePath, "results").getAbsolutePath

protected def listTestCases: Seq[TestCase] = {
listFilesRecursively(new File(customInputFilePath)).flatMap { file =>
val resultFile = file.getAbsolutePath.replace(inputFilePath, goldenFilePath) + ".out"
val absPath = file.getAbsolutePath
val testCaseName = absPath.stripPrefix(customInputFilePath).stripPrefix(File.separator)
RegularTestCase(testCaseName, absPath, resultFile) :: Nil
}.sortBy(_.name)
}

def createScalaTestCase(testCase: TestCase): Unit = {
testCase match {
case _: RegularTestCase =>
// Create a test case to run this case.
test(testCase.name) {
runSqlTestCase(testCase, listTestCases)
}
case _ =>
ignore(s"Ignoring test cases that are not [[RegularTestCase]] for now") {
log.debug(s"${testCase.name} is not a RegularTestCase and is ignored.")
}
}
}

protected def runSqlTestCase(testCase: TestCase, listTestCases: Seq[TestCase]): Unit = {
val input = fileToString(new File(testCase.inputFile))
val (comments, code) = splitCommentsAndCodes(input)
val queries = getQueries(code, comments, listTestCases)

val dbmsConfig = comments.filter(_.startsWith(CrossDbmsQueryTestSuite.ONLY_IF_ARG))
.map(_.substring(CrossDbmsQueryTestSuite.ONLY_IF_ARG.length))
// If `--ONLY_IF` is found, check if the DBMS being used is allowed.
if (dbmsConfig.nonEmpty && !dbmsConfig.contains(DATABASE_NAME)) {
log.info(s"This test case (${testCase.name}) is ignored because it indicates that it is " +
s"not eligible with $DATABASE_NAME.")
} else {
runQueriesAndCheckAgainstGoldenFile(queries, testCase)
}
}

protected def runQueriesAndCheckAgainstGoldenFile(
queries: Seq[String], testCase: TestCase): Unit = {
// The local Spark session is needed because we use Spark analyzed plan to check if the query
// result is already semantically sorted, below.
val localSparkSession = spark.newSession()
val conn = getConnection()
val stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

val outputs: Seq[QueryTestOutput] = queries.map { sql =>
val output = {
try {
val sparkDf = localSparkSession.sql(sql)
val isResultSet = stmt.execute(sql)
val rows = ArrayBuffer[Row]()
if (isResultSet) {
val rs = stmt.getResultSet
val metadata = rs.getMetaData
while (rs.next()) {
val row = Row.fromSeq((1 to metadata.getColumnCount).map(i => {
val value = rs.getObject(i)
if (value == null) {
"NULL"
} else {
value
}
}))
rows.append(row)
}
}
val output = rows.map(_.mkString("\t")).toSeq
if (isSemanticallySorted(sparkDf.queryExecution.analyzed)) {
output
} else {
// Sort the answer manually if it isn't sorted.
output.sorted
}
} catch {
case NonFatal(e) => Seq(e.getClass.getName, e.getMessage)
}
}

ExecutionOutput(
sql = sql,
// Don't care about the schema for this test. Only care about correctness.
schema = None,
output = output.mkString("\n"))
}
conn.close()

// Read back the golden files.
var curSegment = 0
val expectedOutputs: Seq[QueryTestOutput] = {
val goldenOutput = fileToString(new File(testCase.resultFile))
val segments = goldenOutput.split("-- !query.*\n")
outputs.map { output =>
val result =
ExecutionOutput(
segments(curSegment + 1).trim, // SQL
None, // Schema
normalizeTestResults(segments(curSegment + 3))) // Output
// Assume that the golden file always has all 3 segments.
curSegment += 3
result
}
}

// Compare results.
assertResult(expectedOutputs.size, s"Number of queries should be ${expectedOutputs.size}") {
outputs.size
}

outputs.zip(expectedOutputs).zipWithIndex.foreach { case ((output, expected), i) =>
assertResult(expected.sql, s"SQL query did not match for query #$i\n${expected.sql}") {
output.sql
}
assertResult(expected.output, s"Result did not match" +
s" for query #$i\n${expected.sql}") {
output.output
}
}
}

}

object CrossDbmsQueryTestSuite {

final val POSTGRES = "postgres"
// Argument in input files to indicate that the sql file is restricted to certain systems.
final val ONLY_IF_ARG = "--ONLY_IF "
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc

import java.io.File
import java.sql.Connection

import org.apache.spark.tags.DockerTest

/**
* READ THIS IF YOU ADDED A NEW SQL TEST AND THIS SUITE IS FAILING:
* Your new SQL test is automatically opted into this suite. It is likely failing because it is not
* compatible with the default Postgres. You have two options:
* 1. (Recommended) Modify your queries to be compatible with both systems. This is recommended
* because it will run your queries against postgres, providing higher correctness testing
* confidence, and you won't have to manually verify the golden files generated with your test.
* 2. Add this line to your .sql file: --ONLY_IF spark
*
* Note: To run this test suite for a specific version (e.g., postgres:15.1):
* {{{
* ENABLE_DOCKER_INTEGRATION_TESTS=1 POSTGRES_DOCKER_IMAGE_NAME=postgres:15.1
* ./build/sbt -Pdocker-integration-tests
* "testOnly org.apache.spark.sql.jdbc.PostgreSQLQueryTestSuite"
* }}}
*/
@DockerTest
class PostgreSQLQueryTestSuite extends CrossDbmsQueryTestSuite {

val DATABASE_NAME = CrossDbmsQueryTestSuite.POSTGRES
// Scope to only subquery directory for now.
protected val customInputFilePath: String = new File(inputFilePath, "subquery").getAbsolutePath

override val db = new DatabaseOnDocker {
override val imageName = sys.env.getOrElse("POSTGRES_DOCKER_IMAGE_NAME", "postgres:15.1-alpine")
override val env = Map(
"POSTGRES_PASSWORD" -> "rootpass"
)
override val usesIpc = false
override val jdbcPort = 5432

override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:postgresql://$ip:$port/postgres?user=postgres&password=rootpass"
}

override def dataPreparation(conn: Connection): Unit = {
conn.prepareStatement(
// Custom function `double` to imitate Spark's function, so that more tests are covered.
"""
|CREATE OR REPLACE FUNCTION double(numeric_value numeric) RETURNS double precision
| AS 'select CAST($1 AS double precision);'
| LANGUAGE SQL
| IMMUTABLE
| RETURNS NULL ON NULL INPUT;
|""".stripMargin
).executeUpdate()
}

listTestCases.foreach(createScalaTestCase)
}
Loading

0 comments on commit f9ca519

Please sign in to comment.