diff --git a/common/variant/README.md b/common/variant/README.md index 391815dabf99f..a66d708da75bf 100644 --- a/common/variant/README.md +++ b/common/variant/README.md @@ -322,8 +322,6 @@ Each `array_val` and `object_val` must contain exactly `num_elements + 1` values The "short string" basic type may be used as an optimization to fold string length into the type byte for strings less than 64 bytes. It is semantically identical to the "string" primitive type. -String and binary values may also be represented as an index into the metadata dictionary. (See “string from metadata” and “binary from metadata” in the “Primitive Types” table) Writers may choose to use this mechanism to avoid repeating identical string values in a Variant object. - The Decimal type contains a scale, but no precision. The implied precision of a decimal value is `floor(log_10(val)) + 1`. # Encoding types @@ -354,8 +352,6 @@ The Decimal type contains a scale, but no precision. The implied precision of a | float | `14` | FLOAT | IEEE little-endian | | binary | `15` | BINARY | 4 byte little-endian size, followed by bytes | | string | `16` | STRING | 4 byte little-endian size, followed by UTF-8 encoded bytes | -| binary from metadata | `17` | BINARY | Little-endian index into the metadata dictionary. Number of bytes is equal to the metadata `offset_size`. | -| string from metadata | `18` | STRING | Little-endian index into the metadata dictionary. Number of bytes is equal to the metadata `offset_size`. | | year-month interval | `19` | INT(32, signed)1 | 1 byte denoting start field (1 bit) and end field (1 bit) starting at LSB followed by 4-byte little-endian value. | | day-time interval | `20` | INT(64, signed)1 | 1 byte denoting start field (2 bits) and end field (2 bits) starting at LSB followed by 8-byte little-endian value. | @@ -368,6 +364,8 @@ The Decimal type contains a scale, but no precision. The implied precision of a The year-month and day-time interval types have one byte at the beginning indicating the start and end fields. In the case of the year-month interval, the least significant bit denotes the start field and the next least significant bit denotes the end field. The remaining 6 bits are unused. A field value of 0 represents YEAR and 1 represents MONTH. In the case of the day-time interval, the least significant 2 bits denote the start field and the next least significant 2 bits denote the end field. The remaining 4 bits are unused. A field value of 0 represents DAY, 1 represents HOUR, 2 represents MINUTE, and 3 represents SECOND. +Type IDs 17 and 18 were originally reserved for a prototype feature (string-from-metadata) that was never implemented. These IDs are available for use by new types. + [1] The parquet format does not have pure equivalents for the year-month and day-time interval types. Year-month intervals are usually represented using int32 values and the day-time intervals are usually represented using int64 values. However, these values don't include the start and end fields of these types. Therefore, Spark stores them in the column metadata. # Field ID order and uniqueness diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala index 7001fa96deb80..432c3fa9be3ac 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala @@ -22,16 +22,18 @@ import java.io.ByteArrayOutputStream import scala.jdk.CollectionConverters._ import org.apache.avro.{Schema, SchemaBuilder} -import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, GenericRecordBuilder} -import org.apache.avro.io.EncoderFactory +import org.apache.avro.file.SeekableByteArrayInput +import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder} +import org.apache.avro.io.{DecoderFactory, EncoderFactory} import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.avro.{functions => Fns} import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.functions.{col, lit, struct} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{BinaryType, StructType} class AvroFunctionsSuite extends QueryTest with SharedSparkSession { import testImplicits._ @@ -371,4 +373,218 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession { stop = 138))) } } + + private def serialize(record: GenericRecord, avroSchema: String): Array[Byte] = { + val schema = new Schema.Parser().parse(avroSchema) + val datumWriter = new GenericDatumWriter[GenericRecord](schema) + var outputStream: ByteArrayOutputStream = null + var bytes: Array[Byte] = null + try { + outputStream = new ByteArrayOutputStream() + val encoder = EncoderFactory.get.binaryEncoder(outputStream, null) + datumWriter.write(record, encoder) + encoder.flush() + bytes = outputStream.toByteArray + } finally { + if (outputStream != null) { + outputStream.close() + } + } + bytes + } + + private def deserialize(bytes: Array[Byte], avroSchema: String): GenericRecord = { + val schema = new Schema.Parser().parse(avroSchema) + val datumReader = new GenericDatumReader[GenericRecord](schema) + var inputStream: SeekableByteArrayInput = null + var record: GenericRecord = null + try { + inputStream = new SeekableByteArrayInput(bytes) + val decoder = DecoderFactory.get.binaryDecoder(inputStream, null) + record = datumReader.read(null, decoder) + } finally { + if (inputStream != null) { + inputStream.close() + } + } + record + } + + // write: `GenericRecord` -> binary (by `serialize`) -> dataframe + // read: dataframe -> binary -> `GenericRecord` (by `deserialize`) + test("roundtrip in serialize and deserialize - GenericRecord") { + val avroSchema = + """ + |{ + | "type": "record", + | "name": "person", + | "fields": [ + | {"name": "name", "type": "string"}, + | {"name": "age", "type": "int"}, + | {"name": "country", "type": "string"} + | ] + |} + |""".stripMargin + val testTable = "test_avro" + withTable(testTable) { + val schema = new Schema.Parser().parse(avroSchema) + val person1 = new GenericRecordBuilder(schema) + .set("name", "sparkA") + .set("age", 18) + .set("country", "usa") + .build() + val person2 = new GenericRecordBuilder(schema) + .set("name", "sparkB") + .set("age", 19) + .set("country", "usb") + .build() + Seq(person1, person2) + .map(p => serialize(p, avroSchema)) + .toDF("data") + .repartition(1) + .writeTo(testTable) + .create() + + val expectedSchema = new StructType().add("data", BinaryType) + assert(spark.table(testTable).schema === expectedSchema) + + // Note that what is returned here is `Row[Array[Byte]]` + val avroDF = sql(s"SELECT data FROM $testTable") + val readbacks = avroDF + .collect() + .map(row => deserialize(row.get(0).asInstanceOf[Array[Byte]], avroSchema)) + + val readbackPerson1 = readbacks.head + assert(readbackPerson1.get(0).toString === person1.get(0)) + assert(readbackPerson1.get(1).asInstanceOf[Int] === person1.get(1).asInstanceOf[Int]) + assert(readbackPerson1.get(2).toString === person1.get(2)) + + val readbackPerson2 = readbacks(1) + assert(readbackPerson2.get(0).toString === person2.get(0)) + assert(readbackPerson2.get(1).asInstanceOf[Int] === person2.get(1).asInstanceOf[Int]) + assert(readbackPerson2.get(2).toString === person2.get(2)) + } + } + + // write: `GenericRecord` -> binary (by `serialize`) -> dataframe + // read: dataframe -> binary -> struct (by `from_avro`) -> `GenericRecord` + test("use `serialize` to write GenericRecord and `from_avro` to read GenericRecord") { + val avroSchema = + """ + |{ + | "type": "record", + | "name": "person", + | "fields": [ + | {"name": "name", "type": "string"}, + | {"name": "age", "type": "int"}, + | {"name": "country", "type": "string"} + | ] + |} + |""".stripMargin + val testTable = "test_avro" + withTable(testTable) { + val schema = new Schema.Parser().parse(avroSchema) + val person1 = new GenericRecordBuilder(schema) + .set("name", "sparkA") + .set("age", 18) + .set("country", "usa") + .build() + val person2 = new GenericRecordBuilder(schema) + .set("name", "sparkB") + .set("age", 19) + .set("country", "usb") + .build() + Seq(person1, person2) + .map(p => serialize(p, avroSchema)) + .toDF("data") + .repartition(1) + .writeTo(testTable) + .create() + + val expectedSchema = new StructType().add("data", BinaryType) + assert(spark.table(testTable).schema === expectedSchema) + + // Note that what is returned here is `Row[Struct]` + val avroDF = sql(s"SELECT from_avro(data, '$avroSchema', map()) FROM $testTable") + val readbacks = avroDF + .collect() + .map(row => + new GenericRecordBuilder(schema) + .set("name", row.getStruct(0).getString(0)) + .set("age", row.getStruct(0).getInt(1)) + .set("country", row.getStruct(0).getString(2)) + .build()) + + val readbackPerson1 = readbacks.head + assert(readbackPerson1.get(0) === person1.get(0)) + assert(readbackPerson1.get(1).asInstanceOf[Int] === person1.get(1).asInstanceOf[Int]) + assert(readbackPerson1.get(2) === person1.get(2)) + + val readbackPerson2 = readbacks(1) + assert(readbackPerson2.get(0) === person2.get(0)) + assert(readbackPerson2.get(1).asInstanceOf[Int] === person2.get(1).asInstanceOf[Int]) + assert(readbackPerson2.get(2) === person2.get(2)) + } + } + + // write: `GenericRecord` (to `struct`) -> binary (by `to_avro`) -> dataframe + // read: dataframe -> binary -> `GenericRecord` (by `deserialize`) + test("use `to_avro` to write GenericRecord and `deserialize` to read GenericRecord") { + val avroSchema = + """ + |{ + | "type": "record", + | "name": "person", + | "fields": [ + | {"name": "name", "type": "string"}, + | {"name": "age", "type": "int"}, + | {"name": "country", "type": "string"} + | ] + |} + |""".stripMargin + val testTable = "test_avro" + withTable(testTable) { + val schema = new Schema.Parser().parse(avroSchema) + val person1 = new GenericRecordBuilder(schema) + .set("name", "sparkA") + .set("age", 18) + .set("country", "usa") + .build() + val person2 = new GenericRecordBuilder(schema) + .set("name", "sparkB") + .set("age", 19) + .set("country", "usb") + .build() + Seq(person1, person2) + .map(p => ( + p.get(0).asInstanceOf[String], + p.get(1).asInstanceOf[Int], + p.get(2).asInstanceOf[String])) + .toDF("name", "age", "country") + .select(Fns.to_avro(struct($"name", $"age", $"country"), avroSchema).as("data")) + .repartition(1) + .writeTo(testTable) + .create() + + val expectedSchema = new StructType().add("data", BinaryType) + assert(spark.table(testTable).schema === expectedSchema) + + // Note that what is returned here is `Row[Array[Byte]]` + val avroDF = sql(s"select data from $testTable") + val readbacks = avroDF + .collect() + .map(row => row.get(0).asInstanceOf[Array[Byte]]) + .map(bytes => deserialize(bytes, avroSchema)) + + val readbackPerson1 = readbacks.head + assert(readbackPerson1.get(0).toString === person1.get(0)) + assert(readbackPerson1.get(1).asInstanceOf[Int] === person1.get(1).asInstanceOf[Int]) + assert(readbackPerson1.get(2).toString === person1.get(2)) + + val readbackPerson2 = readbacks(1) + assert(readbackPerson2.get(0).toString === person2.get(0)) + assert(readbackPerson2.get(1).asInstanceOf[Int] === person2.get(1).asInstanceOf[Int]) + assert(readbackPerson2.get(2).toString === person2.get(2)) + } + } } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index ddf4e623cb71d..6914b2cc8a0f7 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -16,11 +16,9 @@ */ package org.apache.spark.sql -import java.io.Closeable import java.math.BigInteger import java.net.URI import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.TimeUnit._ import java.util.concurrent.atomic.{AtomicLong, AtomicReference} import scala.jdk.CollectionConverters._ @@ -70,8 +68,7 @@ import org.apache.spark.util.ArrayImplicits._ class SparkSession private[sql] ( private[sql] val client: SparkConnectClient, private val planIdGenerator: AtomicLong) - extends Serializable - with Closeable + extends api.SparkSession[Dataset] with Logging { private[this] val allocator = new RootAllocator() @@ -101,35 +98,11 @@ class SparkSession private[sql] ( */ val conf: RuntimeConfig = new RuntimeConfig(client) - /** - * Executes some code block and prints to stdout the time taken to execute the block. This is - * available in Scala only and is used primarily for interactive testing and debugging. - * - * @since 3.4.0 - */ - def time[T](f: => T): T = { - val start = System.nanoTime() - val ret = f - val end = System.nanoTime() - // scalastyle:off println - println(s"Time taken: ${NANOSECONDS.toMillis(end - start)} ms") - // scalastyle:on println - ret - } - - /** - * Returns a `DataFrame` with no rows or columns. - * - * @since 3.4.0 - */ + /** @inheritdoc */ @transient val emptyDataFrame: DataFrame = emptyDataset(UnboundRowEncoder) - /** - * Creates a new [[Dataset]] of type T containing zero elements. - * - * @since 3.4.0 - */ + /** @inheritdoc */ def emptyDataset[T: Encoder]: Dataset[T] = createDataset[T](Nil) private def createDataset[T](encoder: AgnosticEncoder[T], data: Iterator[T]): Dataset[T] = { @@ -152,104 +125,33 @@ class SparkSession private[sql] ( } } - /** - * Creates a `DataFrame` from a local Seq of Product. - * - * @since 3.4.0 - */ + /** @inheritdoc */ def createDataFrame[A <: Product: TypeTag](data: Seq[A]): DataFrame = { createDataset(ScalaReflection.encoderFor[A], data.iterator).toDF() } - /** - * :: DeveloperApi :: Creates a `DataFrame` from a `java.util.List` containing [[Row]]s using - * the given schema. It is important to make sure that the structure of every [[Row]] of the - * provided List matches the provided schema. Otherwise, there will be runtime exception. - * - * @since 3.4.0 - */ + /** @inheritdoc */ def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = { createDataset(RowEncoder.encoderFor(schema), rows.iterator().asScala).toDF() } - /** - * Applies a schema to a List of Java Beans. - * - * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, SELECT * queries - * will return the columns in an undefined order. - * @since 3.4.0 - */ + /** @inheritdoc */ def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame = { val encoder = JavaTypeInference.encoderFor(beanClass.asInstanceOf[Class[Any]]) createDataset(encoder, data.iterator().asScala).toDF() } - /** - * Creates a [[Dataset]] from a local Seq of data of a given type. This method requires an - * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL - * representation) that is generally created automatically through implicits from a - * `SparkSession`, or can be created explicitly by calling static methods on [[Encoders]]. - * - * ==Example== - * - * {{{ - * - * import spark.implicits._ - * case class Person(name: String, age: Long) - * val data = Seq(Person("Michael", 29), Person("Andy", 30), Person("Justin", 19)) - * val ds = spark.createDataset(data) - * - * ds.show() - * // +-------+---+ - * // | name|age| - * // +-------+---+ - * // |Michael| 29| - * // | Andy| 30| - * // | Justin| 19| - * // +-------+---+ - * }}} - * - * @since 3.4.0 - */ + /** @inheritdoc */ def createDataset[T: Encoder](data: Seq[T]): Dataset[T] = { createDataset(encoderFor[T], data.iterator) } - /** - * Creates a [[Dataset]] from a `java.util.List` of a given type. This method requires an - * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL - * representation) that is generally created automatically through implicits from a - * `SparkSession`, or can be created explicitly by calling static methods on [[Encoders]]. - * - * ==Java Example== - * - * {{{ - * List data = Arrays.asList("hello", "world"); - * Dataset ds = spark.createDataset(data, Encoders.STRING()); - * }}} - * - * @since 3.4.0 - */ + /** @inheritdoc */ def createDataset[T: Encoder](data: java.util.List[T]): Dataset[T] = { createDataset(data.asScala.toSeq) } - /** - * Executes a SQL query substituting positional parameters by the given arguments, returning the - * result as a `DataFrame`. This API eagerly runs DDL/DML commands, but not for SELECT queries. - * - * @param sqlText - * A SQL statement with positional parameters to execute. - * @param args - * An array of Java/Scala objects that can be converted to SQL literal expressions. See Supported Data - * Types for supported value types in Scala/Java. For example: 1, "Steven", - * LocalDate.of(2023, 4, 2). A value can be also a `Column` of a literal or collection - * constructor functions such as `map()`, `array()`, `struct()`, in that case it is taken as - * is. - * - * @since 3.5.0 - */ + /** @inheritdoc */ @Experimental def sql(sqlText: String, args: Array[_]): DataFrame = newDataFrame { builder => // Send the SQL once to the server and then check the output. @@ -274,45 +176,15 @@ class SparkSession private[sql] ( } } - /** - * Executes a SQL query substituting named parameters by the given arguments, returning the - * result as a `DataFrame`. This API eagerly runs DDL/DML commands, but not for SELECT queries. - * - * @param sqlText - * A SQL statement with named parameters to execute. - * @param args - * A map of parameter names to Java/Scala objects that can be converted to SQL literal - * expressions. See - * Supported Data Types for supported value types in Scala/Java. For example, map keys: - * "rank", "name", "birthdate"; map values: 1, "Steven", LocalDate.of(2023, 4, 2). Map value - * can be also a `Column` of a literal or collection constructor functions such as `map()`, - * `array()`, `struct()`, in that case it is taken as is. - * - * @since 3.4.0 - */ + /** @inheritdoc */ @Experimental def sql(sqlText: String, args: Map[String, Any]): DataFrame = { sql(sqlText, args.asJava) } - /** - * Executes a SQL query substituting named parameters by the given arguments, returning the - * result as a `DataFrame`. This API eagerly runs DDL/DML commands, but not for SELECT queries. - * - * @param sqlText - * A SQL statement with named parameters to execute. - * @param args - * A map of parameter names to Java/Scala objects that can be converted to SQL literal - * expressions. See - * Supported Data Types for supported value types in Scala/Java. For example, map keys: - * "rank", "name", "birthdate"; map values: 1, "Steven", LocalDate.of(2023, 4, 2). Map value - * can be also a `Column` of a literal or collection constructor functions such as `map()`, - * `array()`, `struct()`, in that case it is taken as is. - * - * @since 3.4.0 - */ + /** @inheritdoc */ @Experimental - def sql(sqlText: String, args: java.util.Map[String, Any]): DataFrame = newDataFrame { + override def sql(sqlText: String, args: java.util.Map[String, Any]): DataFrame = newDataFrame { builder => // Send the SQL once to the server and then check the output. val cmd = newCommand(b => @@ -336,13 +208,8 @@ class SparkSession private[sql] ( } } - /** - * Executes a SQL query using Spark, returning the result as a `DataFrame`. This API eagerly - * runs DDL/DML commands, but not for SELECT queries. - * - * @since 3.4.0 - */ - def sql(query: String): DataFrame = { + /** @inheritdoc */ + override def sql(query: String): DataFrame = { sql(query, Array.empty) } @@ -379,83 +246,30 @@ class SparkSession private[sql] ( */ lazy val catalog: Catalog = new CatalogImpl(this) - /** - * Returns the specified table/view as a `DataFrame`. If it's a table, it must support batch - * reading and the returned DataFrame is the batch scan query plan of this table. If it's a - * view, the returned DataFrame is simply the query plan of the view, which can either be a - * batch or streaming query plan. - * - * @param tableName - * is either a qualified or unqualified name that designates a table or view. If a database is - * specified, it identifies the table/view from the database. Otherwise, it first attempts to - * find a temporary view with the given name and then match the table/view from the current - * database. Note that, the global temporary view database is also valid here. - * @since 3.4.0 - */ + /** @inheritdoc */ def table(tableName: String): DataFrame = { read.table(tableName) } - /** - * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements in a - * range from 0 to `end` (exclusive) with step value 1. - * - * @since 3.4.0 - */ + /** @inheritdoc */ def range(end: Long): Dataset[java.lang.Long] = range(0, end) - /** - * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements in a - * range from `start` to `end` (exclusive) with step value 1. - * - * @since 3.4.0 - */ + /** @inheritdoc */ def range(start: Long, end: Long): Dataset[java.lang.Long] = { range(start, end, step = 1) } - /** - * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements in a - * range from `start` to `end` (exclusive) with a step value. - * - * @since 3.4.0 - */ + /** @inheritdoc */ def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = { range(start, end, step, None) } - /** - * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements in a - * range from `start` to `end` (exclusive) with a step value, with partition number specified. - * - * @since 3.4.0 - */ + /** @inheritdoc */ def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long] = { range(start, end, step, Option(numPartitions)) } - /** - * A collection of methods for registering user-defined functions (UDF). - * - * The following example registers a Scala closure as UDF: - * {{{ - * sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1) - * }}} - * - * The following example registers a UDF in Java: - * {{{ - * sparkSession.udf().register("myUDF", - * (Integer arg1, String arg2) -> arg2 + arg1, - * DataTypes.StringType); - * }}} - * - * @note - * The user-defined functions must be deterministic. Due to optimization, duplicate - * invocations may be eliminated or the function may even be invoked more times than it is - * present in the query. - * - * @since 3.5.0 - */ + /** @inheritdoc */ lazy val udf: UDFRegistration = new UDFRegistration(this) // scalastyle:off @@ -474,6 +288,7 @@ class SparkSession private[sql] ( object implicits extends SQLImplicits(this) with Serializable // scalastyle:on + /** @inheritdoc */ def newSession(): SparkSession = { SparkSession.builder().client(client.copy()).create() } @@ -708,13 +523,6 @@ class SparkSession private[sql] ( client.interruptOperation(operationId).getInterruptedIdsList.asScala.toSeq } - /** - * Synonym for `close()`. - * - * @since 3.4.0 - */ - def stop(): Unit = close() - /** * Close the [[SparkSession]]. * diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala index 7571395289967..2ab49eae8cd85 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala @@ -24,15 +24,21 @@ import jakarta.servlet.http.HttpServletRequest import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.SparkConf +import org.apache.spark.annotation.{DeveloperApi, Since, Unstable} import org.apache.spark.ui.JettyUtils._ /** + * :: DeveloperApi :: * This exposes the metrics of the given registry with Prometheus format. * * The output is consistent with /metrics/json result in terms of item ordering * and with the previous result of Spark JMX Sink + Prometheus JMX Converter combination * in terms of key string format. + * + * This is used by Spark MetricsSystem internally and Spark K8s operator. */ +@Unstable +@DeveloperApi private[spark] class PrometheusServlet( val property: Properties, val registry: MetricRegistry) extends Sink { @@ -47,7 +53,10 @@ private[spark] class PrometheusServlet( ) } - def getMetricsSnapshot(request: HttpServletRequest): String = { + def getMetricsSnapshot(request: HttpServletRequest): String = getMetricsSnapshot() + + @Since("4.0.0") + def getMetricsSnapshot(): String = { import scala.jdk.CollectionConverters._ val gaugesLabel = """{type="gauges"}""" diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index 518c0592488fc..1966a60c1665e 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -380,8 +380,10 @@ abstract class SparkFunSuite } else if (actual.contextType() == QueryContextType.DataFrame) { assert(actual.fragment() === expected.fragment, "Invalid code fragment of a query context. Actual:" + actual.toString) - assert(actual.callSite().matches(expected.callSitePattern), - "Invalid callSite of a query context. Actual:" + actual.toString) + if (expected.callSitePattern.nonEmpty) { + assert(actual.callSite().matches(expected.callSitePattern), + "Invalid callSite of a query context. Actual:" + actual.toString) + } } } } diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index b8c51764b9cf2..e93e8e94a993e 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -136,8 +136,8 @@ jersey-container-servlet/3.0.12//jersey-container-servlet-3.0.12.jar jersey-hk2/3.0.12//jersey-hk2-3.0.12.jar jersey-server/3.0.12//jersey-server-3.0.12.jar jettison/1.5.4//jettison-1.5.4.jar -jetty-util-ajax/11.0.21//jetty-util-ajax-11.0.21.jar -jetty-util/11.0.21//jetty-util-11.0.21.jar +jetty-util-ajax/11.0.23//jetty-util-ajax-11.0.23.jar +jetty-util/11.0.23//jetty-util-11.0.23.jar jjwt-api/0.12.6//jjwt-api-0.12.6.jar jline/2.14.6//jline-2.14.6.jar jline/3.25.1//jline-3.25.1.jar diff --git a/pom.xml b/pom.xml index c6730fe678bfb..7a163f3d02dd5 100644 --- a/pom.xml +++ b/pom.xml @@ -140,7 +140,7 @@ 1.14.1 2.0.2 shaded-protobuf - 11.0.21 + 11.0.23 5.0.0 4.0.1 diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst index 7585448204f60..dc4329c603241 100644 --- a/python/docs/source/reference/pyspark.sql/functions.rst +++ b/python/docs/source/reference/pyspark.sql/functions.rst @@ -48,9 +48,11 @@ Conditional Functions ifnull nanvl nullif + nullifzero nvl nvl2 when + zeroifnull Predicate Functions diff --git a/python/pyspark/sql/connect/functions/builtin.py b/python/pyspark/sql/connect/functions/builtin.py index db3680f5cd42f..ad6dbbf58e48d 100644 --- a/python/pyspark/sql/connect/functions/builtin.py +++ b/python/pyspark/sql/connect/functions/builtin.py @@ -3921,6 +3921,13 @@ def nullif(col1: "ColumnOrName", col2: "ColumnOrName") -> Column: nullif.__doc__ = pysparkfuncs.nullif.__doc__ +def nullifzero(col: "ColumnOrName") -> Column: + return _invoke_function_over_columns("nullifzero", col) + + +nullifzero.__doc__ = pysparkfuncs.nullifzero.__doc__ + + def nvl(col1: "ColumnOrName", col2: "ColumnOrName") -> Column: return _invoke_function_over_columns("nvl", col1, col2) @@ -3935,6 +3942,13 @@ def nvl2(col1: "ColumnOrName", col2: "ColumnOrName", col3: "ColumnOrName") -> Co nvl2.__doc__ = pysparkfuncs.nvl2.__doc__ +def zeroifnull(col: "ColumnOrName") -> Column: + return _invoke_function_over_columns("zeroifnull", col) + + +zeroifnull.__doc__ = pysparkfuncs.zeroifnull.__doc__ + + def aes_encrypt( input: "ColumnOrName", key: "ColumnOrName", diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index 3b472c0bf3cd4..344ba8d009ac4 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -20681,6 +20681,31 @@ def nullif(col1: "ColumnOrName", col2: "ColumnOrName") -> Column: return _invoke_function_over_columns("nullif", col1, col2) +@_try_remote_functions +def nullifzero(col: "ColumnOrName") -> Column: + """ + Returns null if `col` is equal to zero, or `col` otherwise. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + + Examples + -------- + >>> df = spark.createDataFrame([(0,), (1,)], ["a"]) + >>> df.select(nullifzero(df.a).alias("result")).show() + +------+ + |result| + +------+ + | NULL| + | 1| + +------+ + """ + return _invoke_function_over_columns("nullifzero", col) + + @_try_remote_functions def nvl(col1: "ColumnOrName", col2: "ColumnOrName") -> Column: """ @@ -20724,6 +20749,31 @@ def nvl2(col1: "ColumnOrName", col2: "ColumnOrName", col3: "ColumnOrName") -> Co return _invoke_function_over_columns("nvl2", col1, col2, col3) +@_try_remote_functions +def zeroifnull(col: "ColumnOrName") -> Column: + """ + Returns zero if `col` is null, or `col` otherwise. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + + Examples + -------- + >>> df = spark.createDataFrame([(None,), (1,)], ["a"]) + >>> df.select(zeroifnull(df.a).alias("result")).show() + +------+ + |result| + +------+ + | 0| + | 1| + +------+ + """ + return _invoke_function_over_columns("zeroifnull", col) + + @_try_remote_functions def aes_encrypt( input: "ColumnOrName", diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index aade86a9450fb..f7f2485a43e16 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -29,6 +29,7 @@ from pyspark.sql import Row, Window, functions as F, types from pyspark.sql.avro.functions import from_avro, to_avro from pyspark.sql.column import Column +from pyspark.sql.functions.builtin import nullifzero, zeroifnull from pyspark.testing.sqlutils import ReusedSQLTestCase, SQLTestUtils from pyspark.testing.utils import have_numpy @@ -1593,6 +1594,15 @@ class IntEnum(Enum): for r, c, e in zip(result, cols, expected): self.assertEqual(r, e, str(c)) + def test_nullifzero_zeroifnull(self): + df = self.spark.createDataFrame([(0,), (1,)], ["a"]) + result = df.select(nullifzero(df.a).alias("r")).collect() + self.assertEqual([Row(r=None), Row(r=1)], result) + + df = self.spark.createDataFrame([(None,), (1,)], ["a"]) + result = df.select(zeroifnull(df.a).alias("r")).collect() + self.assertEqual([Row(r=0), Row(r=1)], result) + class FunctionsTests(ReusedSQLTestCase, FunctionsTestsMixin): pass diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 2f9ee6943fe61..393ffc5674011 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -457,7 +457,7 @@ private[spark] object Config extends Logging { .doc("Time to wait between each round of executor allocation.") .version("2.3.0") .timeConf(TimeUnit.MILLISECONDS) - .checkValue(value => value > 0, "Allocation batch delay must be a positive time value.") + .checkValue(value => value > 100, "Allocation batch delay must be greater than 0.1s.") .createWithDefaultString("1s") val KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT = diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 093f5ef3bcb7e..1ad5e0af0bd73 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -150,6 +150,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { when(persistentVolumeClaimList.getItems).thenReturn(Seq.empty[PersistentVolumeClaim].asJava) } + test("SPARK-49447: Prevent small values less than 100 for batch delay") { + val m = intercept[IllegalArgumentException] { + val conf = new SparkConf().set(KUBERNETES_ALLOCATION_BATCH_DELAY.key, "1") + conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + }.getMessage + assert(m.contains("Allocation batch delay must be greater than 0.1s.")) + } + test("SPARK-41210: Window based executor failure tracking mechanism") { var _exitCode = -1 val _conf = conf.clone diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala index 6c0c88b5d8441..1982ea6b615f7 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala @@ -120,7 +120,8 @@ import org.apache.spark.util.SparkClassUtils * @since 1.6.0 */ @Stable -abstract class Dataset[T, DS[_] <: Dataset[_, DS]] extends Serializable { +abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { + def sparkSession: SparkSession[DS] val encoder: Encoder[T] diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala new file mode 100644 index 0000000000000..12a1a13619030 --- /dev/null +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.api + +import scala.concurrent.duration.NANOSECONDS +import scala.jdk.CollectionConverters._ +import scala.reflect.runtime.universe.TypeTag + +import _root_.java.io.Closeable +import _root_.java.lang +import _root_.java.util + +import org.apache.spark.annotation.{DeveloperApi, Experimental} +import org.apache.spark.sql.{Encoder, Row} +import org.apache.spark.sql.types.StructType + +/** + * The entry point to programming Spark with the Dataset and DataFrame API. + * + * In environments that this has been created upfront (e.g. REPL, notebooks), use the builder + * to get an existing session: + * + * {{{ + * SparkSession.builder().getOrCreate() + * }}} + * + * The builder can also be used to create a new session: + * + * {{{ + * SparkSession.builder + * .master("local") + * .appName("Word Count") + * .config("spark.some.config.option", "some-value") + * .getOrCreate() + * }}} + */ +abstract class SparkSession[DS[U] <: Dataset[U, DS]] extends Serializable with Closeable { + /** + * The version of Spark on which this application is running. + * + * @since 2.0.0 + */ + def version: String + + /** + * A collection of methods for registering user-defined functions (UDF). + * + * The following example registers a Scala closure as UDF: + * {{{ + * sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1) + * }}} + * + * The following example registers a UDF in Java: + * {{{ + * sparkSession.udf().register("myUDF", + * (Integer arg1, String arg2) -> arg2 + arg1, + * DataTypes.StringType); + * }}} + * + * @note The user-defined functions must be deterministic. Due to optimization, + * duplicate invocations may be eliminated or the function may even be invoked more times + * than it is present in the query. + * @since 2.0.0 + */ + def udf: UDFRegistration + + /** + * Start a new session with isolated SQL configurations, temporary tables, registered + * functions are isolated, but sharing the underlying `SparkContext` and cached data. + * + * @note Other than the `SparkContext`, all shared state is initialized lazily. + * This method will force the initialization of the shared state to ensure that parent + * and child sessions are set up with the same shared state. If the underlying catalog + * implementation is Hive, this will initialize the metastore, which may take some time. + * @since 2.0.0 + */ + def newSession(): SparkSession[DS] + + /* --------------------------------- * + | Methods for creating DataFrames | + * --------------------------------- */ + + /** + * Returns a `DataFrame` with no rows or columns. + * + * @since 2.0.0 + */ + @transient + def emptyDataFrame: DS[Row] + + /** + * Creates a `DataFrame` from a local Seq of Product. + * + * @since 2.0.0 + */ + def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DS[Row] + + /** + * :: DeveloperApi :: + * Creates a `DataFrame` from a `java.util.List` containing [[org.apache.spark.sql.Row]]s using + * the given schema.It is important to make sure that the structure of every + * [[org.apache.spark.sql.Row]] of the provided List matches the provided schema. Otherwise, + * there will be runtime exception. + * + * @since 2.0.0 + */ + @DeveloperApi + def createDataFrame(rows: util.List[Row], schema: StructType): DS[Row] + + /** + * Applies a schema to a List of Java Beans. + * + * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, + * SELECT * queries will return the columns in an undefined order. + * + * @since 1.6.0 + */ + def createDataFrame(data: util.List[_], beanClass: Class[_]): DS[Row] + + /* ------------------------------- * + | Methods for creating DataSets | + * ------------------------------- */ + + /** + * Creates a new [[Dataset]] of type T containing zero elements. + * + * @since 2.0.0 + */ + def emptyDataset[T: Encoder]: DS[T] + + /** + * Creates a [[Dataset]] from a local Seq of data of a given type. This method requires an + * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL representation) + * that is generally created automatically through implicits from a `SparkSession`, or can be + * created explicitly by calling static methods on `Encoders`. + * + * == Example == + * + * {{{ + * + * import spark.implicits._ + * case class Person(name: String, age: Long) + * val data = Seq(Person("Michael", 29), Person("Andy", 30), Person("Justin", 19)) + * val ds = spark.createDataset(data) + * + * ds.show() + * // +-------+---+ + * // | name|age| + * // +-------+---+ + * // |Michael| 29| + * // | Andy| 30| + * // | Justin| 19| + * // +-------+---+ + * }}} + * + * @since 2.0.0 + */ + def createDataset[T: Encoder](data: Seq[T]): DS[T] + + /** + * Creates a [[Dataset]] from a `java.util.List` of a given type. This method requires an + * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL representation) + * that is generally created automatically through implicits from a `SparkSession`, or can be + * created explicitly by calling static methods on `Encoders`. + * + * == Java Example == + * + * {{{ + * List data = Arrays.asList("hello", "world"); + * Dataset ds = spark.createDataset(data, Encoders.STRING()); + * }}} + * + * @since 2.0.0 + */ + def createDataset[T: Encoder](data: util.List[T]): DS[T] + + /** + * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements + * in a range from 0 to `end` (exclusive) with step value 1. + * + * @since 2.0.0 + */ + def range(end: Long): DS[lang.Long] + + /** + * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements + * in a range from `start` to `end` (exclusive) with step value 1. + * + * @since 2.0.0 + */ + def range(start: Long, end: Long): DS[lang.Long] + + /** + * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements + * in a range from `start` to `end` (exclusive) with a step value. + * + * @since 2.0.0 + */ + def range(start: Long, end: Long, step: Long): DS[lang.Long] + + /** + * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements + * in a range from `start` to `end` (exclusive) with a step value, with partition number + * specified. + * + * @since 2.0.0 + */ + def range(start: Long, end: Long, step: Long, numPartitions: Int): DS[lang.Long] + + /* ------------------------- * + | Catalog-related methods | + * ------------------------- */ + + /** + * Returns the specified table/view as a `DataFrame`. If it's a table, it must support batch + * reading and the returned DataFrame is the batch scan query plan of this table. If it's a view, + * the returned DataFrame is simply the query plan of the view, which can either be a batch or + * streaming query plan. + * + * @param tableName is either a qualified or unqualified name that designates a table or view. + * If a database is specified, it identifies the table/view from the database. + * Otherwise, it first attempts to find a temporary view with the given name + * and then match the table/view from the current database. + * Note that, the global temporary view database is also valid here. + * @since 2.0.0 + */ + def table(tableName: String): DS[Row] + + /* ----------------- * + | Everything else | + * ----------------- */ + + /** + * Executes a SQL query substituting positional parameters by the given arguments, + * returning the result as a `DataFrame`. + * This API eagerly runs DDL/DML commands, but not for SELECT queries. + * + * @param sqlText A SQL statement with positional parameters to execute. + * @param args An array of Java/Scala objects that can be converted to + * SQL literal expressions. See + * + * Supported Data Types for supported value types in Scala/Java. + * For example, 1, "Steven", LocalDate.of(2023, 4, 2). + * A value can be also a `Column` of a literal or collection constructor functions + * such as `map()`, `array()`, `struct()`, in that case it is taken as is. + * @since 3.5.0 + */ + @Experimental + def sql(sqlText: String, args: Array[_]): DS[Row] + + /** + * Executes a SQL query substituting named parameters by the given arguments, + * returning the result as a `DataFrame`. + * This API eagerly runs DDL/DML commands, but not for SELECT queries. + * + * @param sqlText A SQL statement with named parameters to execute. + * @param args A map of parameter names to Java/Scala objects that can be converted to + * SQL literal expressions. See + * + * Supported Data Types for supported value types in Scala/Java. + * For example, map keys: "rank", "name", "birthdate"; + * map values: 1, "Steven", LocalDate.of(2023, 4, 2). + * Map value can be also a `Column` of a literal or collection constructor + * functions such as `map()`, `array()`, `struct()`, in that case it is taken + * as is. + * @since 3.4.0 + */ + @Experimental + def sql(sqlText: String, args: Map[String, Any]): DS[Row] + + /** + * Executes a SQL query substituting named parameters by the given arguments, + * returning the result as a `DataFrame`. + * This API eagerly runs DDL/DML commands, but not for SELECT queries. + * + * @param sqlText A SQL statement with named parameters to execute. + * @param args A map of parameter names to Java/Scala objects that can be converted to + * SQL literal expressions. See + * + * Supported Data Types for supported value types in Scala/Java. + * For example, map keys: "rank", "name", "birthdate"; + * map values: 1, "Steven", LocalDate.of(2023, 4, 2). + * Map value can be also a `Column` of a literal or collection constructor + * functions such as `map()`, `array()`, `struct()`, in that case it is taken + * as is. + * @since 3.4.0 + */ + @Experimental + def sql(sqlText: String, args: util.Map[String, Any]): DS[Row] = { + sql(sqlText, args.asScala.toMap) + } + + /** + * Executes a SQL query using Spark, returning the result as a `DataFrame`. + * This API eagerly runs DDL/DML commands, but not for SELECT queries. + * + * @since 2.0.0 + */ + def sql(sqlText: String): DS[Row] = sql(sqlText, Map.empty[String, Any]) + + /** + * Executes some code block and prints to stdout the time taken to execute the block. This is + * available in Scala only and is used primarily for interactive testing and debugging. + * + * @since 2.1.0 + */ + def time[T](f: => T): T = { + val start = System.nanoTime() + val ret = f + val end = System.nanoTime() + // scalastyle:off println + println(s"Time taken: ${NANOSECONDS.toMillis(end - start)} ms") + // scalastyle:on println + ret + } + + /** + * Synonym for `close()`. + * + * @since 2.0.0 + */ + def stop(): Unit = close() +} diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala index ffc198fb4cae3..e46d6c95b31ae 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala @@ -1702,7 +1702,7 @@ object functions { * @group normal_funcs * @since 1.5.0 */ - def broadcast[DS[_] <: api.Dataset[_, DS]](df: DS[_]): df.type = { + def broadcast[DS[U] <: api.Dataset[U, DS]](df: DS[_]): df.type = { df.hint("broadcast").asInstanceOf[df.type] } @@ -7834,6 +7834,14 @@ object functions { */ def nullif(col1: Column, col2: Column): Column = Column.fn("nullif", col1, col2) + /** + * Returns null if `col` is equal to zero, or `col` otherwise. + * + * @group conditional_funcs + * @since 4.0.0 + */ + def nullifzero(col: Column): Column = Column.fn("nullifzero", col) + /** * Returns `col2` if `col1` is null, or `col1` otherwise. * @@ -7850,6 +7858,14 @@ object functions { */ def nvl2(col1: Column, col2: Column, col3: Column): Column = Column.fn("nvl2", col1, col2, col3) + /** + * Returns zero if `col` is null, or `col` otherwise. + * + * @group conditional_funcs + * @since 4.0.0 + */ + def zeroifnull(col: Column): Column = Column.fn("zeroifnull", col) + // scalastyle:off line.size.limit // scalastyle:off parameter.number diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ProcedureCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ProcedureCatalog.java new file mode 100644 index 0000000000000..6eaacf340cb80 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ProcedureCatalog.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.catalog.procedures.UnboundProcedure; + +/** + * A catalog API for working with procedures. + * + * @since 4.0.0 + */ +@Evolving +public interface ProcedureCatalog extends CatalogPlugin { + /** + * Load a procedure by {@link Identifier identifier} from the catalog. + * + * @param ident a procedure identifier + * @return the loaded unbound procedure + */ + UnboundProcedure loadProcedure(Identifier ident); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/BoundProcedure.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/BoundProcedure.java new file mode 100644 index 0000000000000..99f0836576f80 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/BoundProcedure.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog.procedures; + +import java.util.Iterator; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.LocalScan; +import org.apache.spark.sql.connector.read.Scan; + +/** + * A procedure that is bound to input types. + * + * @since 4.0.0 + */ +@Evolving +public interface BoundProcedure extends Procedure { + /** + * Returns parameters of this procedure. + */ + ProcedureParameter[] parameters(); + + /** + * Indicates whether this procedure is deterministic. + */ + boolean isDeterministic(); + + /** + * Executes this procedure with the given input. + *

+ * Spark validates and rearranges arguments provided in the CALL statement to ensure that + * the order and data types of the fields in {@code input} matches the expected order and + * types defined by {@link #parameters() parameters}. + *

+ * Each procedure can return any number of result sets. Each result set is represented by + * a {@link Scan scan} that reports the type of records it produces and can be used to + * collect the output, if needed. If a result set is local and does not a distributed job, + * implementations should use {@link LocalScan}. + */ + Iterator call(InternalRow input); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/Procedure.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/Procedure.java new file mode 100644 index 0000000000000..4f88d215d3197 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/Procedure.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog.procedures; + +import org.apache.spark.annotation.Evolving; + +/** + * A base interface for all procedures. + * + * @since 4.0.0 + */ +@Evolving +public interface Procedure { + /** + * Returns the name of this procedure. + */ + String name(); + + /** + * Returns the description of this procedure. + */ + default String description() { + return getClass().toString(); + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/ProcedureParameter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/ProcedureParameter.java new file mode 100644 index 0000000000000..90d531ae21892 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/ProcedureParameter.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog.procedures; + +import javax.annotation.Nullable; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.internal.connector.ProcedureParameterImpl; +import org.apache.spark.sql.types.DataType; + +import static org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter.Mode.IN; + +/** + * A {@link Procedure procedure} parameter. + * + * @since 4.0.0 + */ +@Evolving +public interface ProcedureParameter { + /** + * Creates a builder for an IN procedure parameter. + * + * @param name the name of the parameter + * @param dataType the type of the parameter + * @return the constructed stored procedure parameter + */ + static Builder in(String name, DataType dataType) { + return new Builder(IN, name, dataType); + } + + /** + * Returns the mode of this parameter. + */ + Mode mode(); + + /** + * Returns the name of this parameter. + */ + String name(); + + /** + * Returns the data type of this parameter. + */ + DataType dataType(); + + /** + * Returns the SQL string (Spark SQL dialect) of the default value expression of this parameter or + * null if not provided. + */ + @Nullable + String defaultValueExpression(); + + /** + * Returns the comment of this parameter or null if not provided. + */ + @Nullable + String comment(); + + /** + * An enum representing procedure parameter modes. + */ + enum Mode { + IN, + INOUT, + OUT + } + + class Builder { + private final Mode mode; + private final String name; + private final DataType dataType; + private String defaultValueExpression; + private String comment; + + private Builder(Mode mode, String name, DataType dataType) { + this.mode = mode; + this.name = name; + this.dataType = dataType; + } + + /** + * Sets the default value expression of the parameter. + */ + public Builder defaultValue(String defaultValueExpression) { + this.defaultValueExpression = defaultValueExpression; + return this; + } + + /** + * Sets the comment of the parameter. + */ + public Builder comment(String comment) { + this.comment = comment; + return this; + } + + /** + * Builds the stored procedure parameter. + */ + public ProcedureParameter build() { + return new ProcedureParameterImpl(mode, name, dataType, defaultValueExpression, comment); + } + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/UnboundProcedure.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/UnboundProcedure.java new file mode 100644 index 0000000000000..ee9a09055243b --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/UnboundProcedure.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog.procedures; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.types.StructType; + +/** + * A procedure that is not bound to input types. + * + * @since 4.0.0 + */ +@Evolving +public interface UnboundProcedure extends Procedure { + /** + * Binds this procedure to input types. + *

+ * If the catalog supports procedure overloading, the implementation is expected to pick the best + * matching version of the procedure. If overloading is not supported, the implementation can + * validate if the input types are compatible while binding or delegate that to Spark. Regardless, + * Spark will always perform the final validation of the arguments and rearrange them as needed + * based on {@link BoundProcedure#parameters() reported parameters}. + * + * @param inputType the input types to bind to + * @return the bound procedure that is most suitable for the given input types + */ + BoundProcedure bind(StructType inputType); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ProcedureParameterImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ProcedureParameterImpl.scala new file mode 100644 index 0000000000000..01ea48af1537c --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ProcedureParameterImpl.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.internal.connector + +import org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter +import org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter.Mode +import org.apache.spark.sql.types.DataType + +case class ProcedureParameterImpl( + mode: Mode, + name: String, + dataType: DataType, + defaultValueExpression: String, + comment: String) extends ProcedureParameter diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index a236577cba438..358541c942f1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -17,9 +17,7 @@ package org.apache.spark.sql -import java.io.Closeable import java.util.{ServiceLoader, UUID} -import java.util.concurrent.TimeUnit._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.jdk.CollectionConverters._ @@ -93,7 +91,7 @@ class SparkSession private( @transient private val parentSessionState: Option[SessionState], @transient private[sql] val extensions: SparkSessionExtensions, @transient private[sql] val initialSessionOptions: Map[String, String]) - extends Serializable with Closeable with Logging { self => + extends api.SparkSession[Dataset] with Logging { self => // The call site where this SparkSession was constructed. private val creationSite: CallSite = Utils.getCallSite() @@ -122,11 +120,7 @@ class SparkSession private( .getOrElse(SQLConf.getFallbackConf) }) - /** - * The version of Spark on which this application is running. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def version: String = SPARK_VERSION /* ----------------------- * @@ -207,27 +201,7 @@ class SparkSession private( @Unstable def experimental: ExperimentalMethods = sessionState.experimentalMethods - /** - * A collection of methods for registering user-defined functions (UDF). - * - * The following example registers a Scala closure as UDF: - * {{{ - * sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1) - * }}} - * - * The following example registers a UDF in Java: - * {{{ - * sparkSession.udf().register("myUDF", - * (Integer arg1, String arg2) -> arg2 + arg1, - * DataTypes.StringType); - * }}} - * - * @note The user-defined functions must be deterministic. Due to optimization, - * duplicate invocations may be eliminated or the function may even be invoked more times than - * it is present in the query. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def udf: UDFRegistration = sessionState.udfRegistration private[sql] def udtf: UDTFRegistration = sessionState.udtfRegistration @@ -260,17 +234,7 @@ class SparkSession private( @Unstable private[sql] def artifactManager: ArtifactManager = sessionState.artifactManager - /** - * Start a new session with isolated SQL configurations, temporary tables, registered - * functions are isolated, but sharing the underlying `SparkContext` and cached data. - * - * @note Other than the `SparkContext`, all shared state is initialized lazily. - * This method will force the initialization of the shared state to ensure that parent - * and child sessions are set up with the same shared state. If the underlying catalog - * implementation is Hive, this will initialize the metastore, which may take some time. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def newSession(): SparkSession = { new SparkSession( sparkContext, @@ -308,19 +272,11 @@ class SparkSession private( | Methods for creating DataFrames | * --------------------------------- */ - /** - * Returns a `DataFrame` with no rows or columns. - * - * @since 2.0.0 - */ + /** @inheritdoc */ @transient lazy val emptyDataFrame: DataFrame = Dataset.ofRows(self, LocalRelation()) - /** - * Creates a new [[Dataset]] of type T containing zero elements. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def emptyDataset[T: Encoder]: Dataset[T] = { val encoder = implicitly[Encoder[T]] new Dataset(self, LocalRelation(encoder.schema), encoder) @@ -336,11 +292,7 @@ class SparkSession private( Dataset.ofRows(self, ExternalRDD(rdd, self)(encoder)) } - /** - * Creates a `DataFrame` from a local Seq of Product. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = withActive { val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = toAttributes(schema) @@ -403,14 +355,7 @@ class SparkSession private( createDataFrame(rowRDD.rdd, replaced) } - /** - * :: DeveloperApi :: - * Creates a `DataFrame` from a `java.util.List` containing [[Row]]s using the given schema. - * It is important to make sure that the structure of every [[Row]] of the provided List matches - * the provided schema. Otherwise, there will be runtime exception. - * - * @since 2.0.0 - */ + /** @inheritdoc */ @DeveloperApi def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = withActive { val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType] @@ -447,13 +392,7 @@ class SparkSession private( createDataFrame(rdd.rdd, beanClass) } - /** - * Applies a schema to a List of Java Beans. - * - * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, - * SELECT * queries will return the columns in an undefined order. - * @since 1.6.0 - */ + /** @inheritdoc */ def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame = withActive { val attrSeq = getSchema(beanClass) val rows = SQLContext.beansToRows(data.asScala.iterator, beanClass, attrSeq) @@ -473,33 +412,7 @@ class SparkSession private( | Methods for creating DataSets | * ------------------------------- */ - /** - * Creates a [[Dataset]] from a local Seq of data of a given type. This method requires an - * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL representation) - * that is generally created automatically through implicits from a `SparkSession`, or can be - * created explicitly by calling static methods on [[Encoders]]. - * - * == Example == - * - * {{{ - * - * import spark.implicits._ - * case class Person(name: String, age: Long) - * val data = Seq(Person("Michael", 29), Person("Andy", 30), Person("Justin", 19)) - * val ds = spark.createDataset(data) - * - * ds.show() - * // +-------+---+ - * // | name|age| - * // +-------+---+ - * // |Michael| 29| - * // | Andy| 30| - * // | Justin| 19| - * // +-------+---+ - * }}} - * - * @since 2.0.0 - */ + /** @inheritdoc */ def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = { val enc = encoderFor[T] val toRow = enc.createSerializer() @@ -521,60 +434,25 @@ class SparkSession private( Dataset[T](self, ExternalRDD(data, self)) } - /** - * Creates a [[Dataset]] from a `java.util.List` of a given type. This method requires an - * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL representation) - * that is generally created automatically through implicits from a `SparkSession`, or can be - * created explicitly by calling static methods on [[Encoders]]. - * - * == Java Example == - * - * {{{ - * List data = Arrays.asList("hello", "world"); - * Dataset ds = spark.createDataset(data, Encoders.STRING()); - * }}} - * - * @since 2.0.0 - */ + /** @inheritdoc */ def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T] = { createDataset(data.asScala.toSeq) } - /** - * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements - * in a range from 0 to `end` (exclusive) with step value 1. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def range(end: Long): Dataset[java.lang.Long] = range(0, end) - /** - * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements - * in a range from `start` to `end` (exclusive) with step value 1. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def range(start: Long, end: Long): Dataset[java.lang.Long] = { range(start, end, step = 1, numPartitions = leafNodeDefaultParallelism) } - /** - * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements - * in a range from `start` to `end` (exclusive) with a step value. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = { range(start, end, step, numPartitions = leafNodeDefaultParallelism) } - /** - * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements - * in a range from `start` to `end` (exclusive) with a step value, with partition number - * specified. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long] = { new Dataset(self, Range(start, end, step, numPartitions), Encoders.LONG) } @@ -608,19 +486,7 @@ class SparkSession private( */ @transient lazy val catalog: Catalog = new CatalogImpl(self) - /** - * Returns the specified table/view as a `DataFrame`. If it's a table, it must support batch - * reading and the returned DataFrame is the batch scan query plan of this table. If it's a view, - * the returned DataFrame is simply the query plan of the view, which can either be a batch or - * streaming query plan. - * - * @param tableName is either a qualified or unqualified name that designates a table or view. - * If a database is specified, it identifies the table/view from the database. - * Otherwise, it first attempts to find a temporary view with the given name - * and then match the table/view from the current database. - * Note that, the global temporary view database is also valid here. - * @since 2.0.0 - */ + /** @inheritdoc */ def table(tableName: String): DataFrame = { read.table(tableName) } @@ -661,22 +527,7 @@ class SparkSession private( Dataset.ofRows(self, plan, tracker) } - /** - * Executes a SQL query substituting positional parameters by the given arguments, - * returning the result as a `DataFrame`. - * This API eagerly runs DDL/DML commands, but not for SELECT queries. - * - * @param sqlText A SQL statement with positional parameters to execute. - * @param args An array of Java/Scala objects that can be converted to - * SQL literal expressions. See - * - * Supported Data Types for supported value types in Scala/Java. - * For example, 1, "Steven", LocalDate.of(2023, 4, 2). - * A value can be also a `Column` of a literal or collection constructor functions - * such as `map()`, `array()`, `struct()`, in that case it is taken as is. - * - * @since 3.5.0 - */ + /** @inheritdoc */ @Experimental def sql(sqlText: String, args: Array[_]): DataFrame = { sql(sqlText, args, new QueryPlanningTracker) @@ -714,57 +565,20 @@ class SparkSession private( Dataset.ofRows(self, plan, tracker) } - /** - * Executes a SQL query substituting named parameters by the given arguments, - * returning the result as a `DataFrame`. - * This API eagerly runs DDL/DML commands, but not for SELECT queries. - * - * @param sqlText A SQL statement with named parameters to execute. - * @param args A map of parameter names to Java/Scala objects that can be converted to - * SQL literal expressions. See - * - * Supported Data Types for supported value types in Scala/Java. - * For example, map keys: "rank", "name", "birthdate"; - * map values: 1, "Steven", LocalDate.of(2023, 4, 2). - * Map value can be also a `Column` of a literal or collection constructor functions - * such as `map()`, `array()`, `struct()`, in that case it is taken as is. - * - * @since 3.4.0 - */ + /** @inheritdoc */ @Experimental def sql(sqlText: String, args: Map[String, Any]): DataFrame = { sql(sqlText, args, new QueryPlanningTracker) } - /** - * Executes a SQL query substituting named parameters by the given arguments, - * returning the result as a `DataFrame`. - * This API eagerly runs DDL/DML commands, but not for SELECT queries. - * - * @param sqlText A SQL statement with named parameters to execute. - * @param args A map of parameter names to Java/Scala objects that can be converted to - * SQL literal expressions. See - * - * Supported Data Types for supported value types in Scala/Java. - * For example, map keys: "rank", "name", "birthdate"; - * map values: 1, "Steven", LocalDate.of(2023, 4, 2). - * Map value can be also a `Column` of a literal or collection constructor functions - * such as `map()`, `array()`, `struct()`, in that case it is taken as is. - * - * @since 3.4.0 - */ + /** @inheritdoc */ @Experimental - def sql(sqlText: String, args: java.util.Map[String, Any]): DataFrame = { + override def sql(sqlText: String, args: java.util.Map[String, Any]): DataFrame = { sql(sqlText, args.asScala.toMap) } - /** - * Executes a SQL query using Spark, returning the result as a `DataFrame`. - * This API eagerly runs DDL/DML commands, but not for SELECT queries. - * - * @since 2.0.0 - */ - def sql(sqlText: String): DataFrame = sql(sqlText, Map.empty[String, Any]) + /** @inheritdoc */ + override def sql(sqlText: String): DataFrame = sql(sqlText, Map.empty[String, Any]) /** * Execute an arbitrary string command inside an external execution engine rather than Spark. @@ -817,22 +631,6 @@ class SparkSession private( */ def readStream: DataStreamReader = new DataStreamReader(self) - /** - * Executes some code block and prints to stdout the time taken to execute the block. This is - * available in Scala only and is used primarily for interactive testing and debugging. - * - * @since 2.1.0 - */ - def time[T](f: => T): T = { - val start = System.nanoTime() - val ret = f - val end = System.nanoTime() - // scalastyle:off println - println(s"Time taken: ${NANOSECONDS.toMillis(end - start)} ms") - // scalastyle:on println - ret - } - // scalastyle:off // Disable style checker so "implicits" object can start with lowercase i /** @@ -854,19 +652,12 @@ class SparkSession private( /** * Stop the underlying `SparkContext`. * - * @since 2.0.0 + * @since 2.1.0 */ - def stop(): Unit = { + override def close(): Unit = { sparkContext.stop() } - /** - * Synonym for `stop()`. - * - * @since 2.1.0 - */ - override def close(): Unit = stop() - /** * Parses the data type in our internal string representation. The data type string should * have the same format as the one generated by `toString` in scala. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 3ae76a1db22b2..5d59a48d544a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -138,9 +138,8 @@ trait HashJoin extends JoinCodegenSupport { UnsafeProjection.create(streamedBoundKeys) @transient protected[this] lazy val boundCondition = if (condition.isDefined) { - if (joinType == FullOuter && buildSide == BuildLeft) { - // Put join left side before right side. This is to be consistent with - // `ShuffledHashJoinExec.fullOuterJoin`. + if ((joinType == FullOuter || joinType == LeftOuter) && buildSide == BuildLeft) { + // Put join left side before right side. Predicate.create(condition.get, buildPlan.output ++ streamedPlan.output).eval _ } else { Predicate.create(condition.get, streamedPlan.output ++ buildPlan.output).eval _ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index cf8dcc0b8b2f0..739cef035c38c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -23,10 +23,10 @@ import java.sql.{Date, Timestamp} import scala.util.Random -import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkRuntimeException} +import org.apache.spark.{QueryContextType, SPARK_DOC_ROOT, SparkException, SparkRuntimeException} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.FunctionRegistry -import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, UnaryExpression} import org.apache.spark.sql.catalyst.expressions.Cast._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation @@ -331,6 +331,66 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { checkAnswer(df.select(nullif(lit(5), lit(5))), Seq(Row(null))) } + test("nullifzero function") { + withTable("t") { + // Here we exercise a non-nullable, non-foldable column. + sql("create table t(col int not null) using csv") + sql("insert into t values (0)") + val df = sql("select col from t") + checkAnswer(df.select(nullifzero($"col")), Seq(Row(null))) + } + // Here we exercise invalid cases including types that do not support ordering. + val df = Seq((0)).toDF("a") + var expr = nullifzero(map(lit(1), lit("a"))) + checkError( + intercept[AnalysisException](df.select(expr)), + errorClass = "DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES", + parameters = Map( + "left" -> "\"MAP\"", + "right" -> "\"INT\"", + "sqlExpr" -> "\"(map(1, a) = 0)\""), + context = ExpectedContext( + contextType = QueryContextType.DataFrame, + fragment = "nullifzero", + objectType = "", + objectName = "", + callSitePattern = "", + startIndex = 0, + stopIndex = 0)) + expr = nullifzero(array(lit(1), lit(2))) + checkError( + intercept[AnalysisException](df.select(expr)), + errorClass = "DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES", + parameters = Map( + "left" -> "\"ARRAY\"", + "right" -> "\"INT\"", + "sqlExpr" -> "\"(array(1, 2) = 0)\""), + context = ExpectedContext( + contextType = QueryContextType.DataFrame, + fragment = "nullifzero", + objectType = "", + objectName = "", + callSitePattern = "", + startIndex = 0, + stopIndex = 0)) + expr = nullifzero(Literal.create(20201231, DateType)) + checkError( + intercept[AnalysisException](df.select(expr)), + errorClass = "DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES", + parameters = Map( + "left" -> "\"DATE\"", + "right" -> "\"INT\"", + "sqlExpr" -> "\"(DATE '+57279-02-03' = 0)\""), + context = ExpectedContext( + contextType = QueryContextType.DataFrame, + fragment = "nullifzero", + objectType = "", + objectName = "", + callSitePattern = "", + startIndex = 0, + stopIndex = 0)) + } + test("nvl") { val df = Seq[(Integer, Integer)]((null, 8)).toDF("a", "b") checkAnswer(df.selectExpr("nvl(a, b)"), Seq(Row(8))) @@ -349,6 +409,66 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { checkAnswer(df.select(nvl2(col("b"), col("a"), col("c"))), Seq(Row(null))) } + test("zeroifnull function") { + withTable("t") { + // Here we exercise a non-nullable, non-foldable column. + sql("create table t(col int not null) using csv") + sql("insert into t values (0)") + val df = sql("select col from t") + checkAnswer(df.select(zeroifnull($"col")), Seq(Row(0))) + } + // Here we exercise invalid cases including types that do not support ordering. + val df = Seq((0)).toDF("a") + var expr = zeroifnull(map(lit(1), lit("a"))) + checkError( + intercept[AnalysisException](df.select(expr)), + errorClass = "DATATYPE_MISMATCH.DATA_DIFF_TYPES", + parameters = Map( + "functionName" -> "`coalesce`", + "dataType" -> "(\"MAP\" or \"INT\")", + "sqlExpr" -> "\"coalesce(map(1, a), 0)\""), + context = ExpectedContext( + contextType = QueryContextType.DataFrame, + fragment = "zeroifnull", + objectType = "", + objectName = "", + callSitePattern = "", + startIndex = 0, + stopIndex = 0)) + expr = zeroifnull(array(lit(1), lit(2))) + checkError( + intercept[AnalysisException](df.select(expr)), + errorClass = "DATATYPE_MISMATCH.DATA_DIFF_TYPES", + parameters = Map( + "functionName" -> "`coalesce`", + "dataType" -> "(\"ARRAY\" or \"INT\")", + "sqlExpr" -> "\"coalesce(array(1, 2), 0)\""), + context = ExpectedContext( + contextType = QueryContextType.DataFrame, + fragment = "zeroifnull", + objectType = "", + objectName = "", + callSitePattern = "", + startIndex = 0, + stopIndex = 0)) + expr = zeroifnull(Literal.create(20201231, DateType)) + checkError( + intercept[AnalysisException](df.select(expr)), + errorClass = "DATATYPE_MISMATCH.DATA_DIFF_TYPES", + parameters = Map( + "functionName" -> "`coalesce`", + "dataType" -> "(\"DATE\" or \"INT\")", + "sqlExpr" -> "\"coalesce(DATE '+57279-02-03', 0)\""), + context = ExpectedContext( + contextType = QueryContextType.DataFrame, + fragment = "zeroifnull", + objectType = "", + objectName = "", + callSitePattern = "", + startIndex = 0, + stopIndex = 0)) + } + test("misc md5 function") { val df = Seq(("ABC", Array[Byte](1, 2, 3, 4, 5, 6))).toDF("a", "b") checkAnswer( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index e4ea88067c7c2..7ba93ee13e182 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -26,11 +26,12 @@ import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint} import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.exchange.EnsureRequirements import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestData} import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType} -class OuterJoinSuite extends SparkPlanTest with SharedSparkSession { +class OuterJoinSuite extends SparkPlanTest with SharedSparkSession with SQLTestData { import testImplicits.toRichColumn + setupTestData() private val EnsureRequirements = new EnsureRequirements() @@ -326,4 +327,21 @@ class OuterJoinSuite extends SparkPlanTest with SharedSparkSession { (null, null, 7, 7.0) ) ) + + testWithWholeStageCodegenOnAndOff( + "SPARK-46037: ShuffledHashJoin build left with left outer join, codegen off") { _ => + def join(hint: String): DataFrame = { + sql( + s""" + |SELECT /*+ $hint */ * + |FROM testData t1 + |LEFT OUTER JOIN + |testData2 t2 + |ON key = a AND concat(value, b) = '12' + |""".stripMargin) + } + val df1 = join("SHUFFLE_HASH(t1)") + val df2 = join("SHUFFLE_MERGE(t1)") + checkAnswer(df1, identity, df2.collect().toSeq) + } }