diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DatasetHolder.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DatasetHolder.scala deleted file mode 100644 index 66f591bf1fb99..0000000000000 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DatasetHolder.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql - -/** - * A container for a [[Dataset]], used for implicit conversions in Scala. - * - * To use this, import implicit conversions in SQL: - * {{{ - * val spark: SparkSession = ... - * import spark.implicits._ - * }}} - * - * @since 3.4.0 - */ -case class DatasetHolder[T] private[sql] (private val ds: Dataset[T]) { - - // This is declared with parentheses to prevent the Scala compiler from treating - // `rdd.toDS("1")` as invoking this toDS and then apply on the returned Dataset. - def toDS(): Dataset[T] = ds - - // This is declared with parentheses to prevent the Scala compiler from treating - // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. - def toDF(): DataFrame = ds.toDF() - - def toDF(colNames: String*): DataFrame = ds.toDF(colNames: _*) -} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index 7799d395d5c6a..4690253da808b 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -16,283 +16,8 @@ */ package org.apache.spark.sql -import scala.collection.Map -import scala.language.implicitConversions -import scala.reflect.classTag -import scala.reflect.runtime.universe.TypeTag - -import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, AgnosticEncoders} -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._ - -/** - * A collection of implicit methods for converting names and Symbols into [[Column]]s, and for - * converting common Scala objects into [[Dataset]]s. - * - * @since 3.4.0 - */ -abstract class SQLImplicits private[sql] (session: SparkSession) extends LowPrioritySQLImplicits { - - /** - * Converts $"col name" into a [[Column]]. - * - * @since 3.4.0 - */ - implicit class StringToColumn(val sc: StringContext) { - def $(args: Any*): ColumnName = { - new ColumnName(sc.s(args: _*)) - } - } - - /** - * An implicit conversion that turns a Scala `Symbol` into a [[Column]]. - * @since 3.4.0 - */ - implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name) - - /** @since 3.4.0 */ - implicit val newIntEncoder: Encoder[Int] = PrimitiveIntEncoder - - /** @since 3.4.0 */ - implicit val newLongEncoder: Encoder[Long] = PrimitiveLongEncoder - - /** @since 3.4.0 */ - implicit val newDoubleEncoder: Encoder[Double] = PrimitiveDoubleEncoder - - /** @since 3.4.0 */ - implicit val newFloatEncoder: Encoder[Float] = PrimitiveFloatEncoder - - /** @since 3.4.0 */ - implicit val newByteEncoder: Encoder[Byte] = PrimitiveByteEncoder - - /** @since 3.4.0 */ - implicit val newShortEncoder: Encoder[Short] = PrimitiveShortEncoder - - /** @since 3.4.0 */ - implicit val newBooleanEncoder: Encoder[Boolean] = PrimitiveBooleanEncoder - - /** @since 3.4.0 */ - implicit val newStringEncoder: Encoder[String] = StringEncoder - - /** @since 3.4.0 */ - implicit val newJavaDecimalEncoder: Encoder[java.math.BigDecimal] = - AgnosticEncoders.DEFAULT_JAVA_DECIMAL_ENCODER - - /** @since 3.4.0 */ - implicit val newScalaDecimalEncoder: Encoder[scala.math.BigDecimal] = - AgnosticEncoders.DEFAULT_SCALA_DECIMAL_ENCODER - - /** @since 3.4.0 */ - implicit val newDateEncoder: Encoder[java.sql.Date] = AgnosticEncoders.STRICT_DATE_ENCODER - - /** @since 3.4.0 */ - implicit val newLocalDateEncoder: Encoder[java.time.LocalDate] = - AgnosticEncoders.STRICT_LOCAL_DATE_ENCODER - - /** @since 3.4.0 */ - implicit val newLocalDateTimeEncoder: Encoder[java.time.LocalDateTime] = - AgnosticEncoders.LocalDateTimeEncoder - - /** @since 3.4.0 */ - implicit val newTimeStampEncoder: Encoder[java.sql.Timestamp] = - AgnosticEncoders.STRICT_TIMESTAMP_ENCODER - - /** @since 3.4.0 */ - implicit val newInstantEncoder: Encoder[java.time.Instant] = - AgnosticEncoders.STRICT_INSTANT_ENCODER - - /** @since 3.4.0 */ - implicit val newDurationEncoder: Encoder[java.time.Duration] = DayTimeIntervalEncoder - - /** @since 3.4.0 */ - implicit val newPeriodEncoder: Encoder[java.time.Period] = YearMonthIntervalEncoder - - /** @since 3.4.0 */ - implicit def newJavaEnumEncoder[A <: java.lang.Enum[_]: TypeTag]: Encoder[A] = { - ScalaReflection.encoderFor[A] - } - - // Boxed primitives - - /** @since 3.4.0 */ - implicit val newBoxedIntEncoder: Encoder[java.lang.Integer] = BoxedIntEncoder - - /** @since 3.4.0 */ - implicit val newBoxedLongEncoder: Encoder[java.lang.Long] = BoxedLongEncoder - - /** @since 3.4.0 */ - implicit val newBoxedDoubleEncoder: Encoder[java.lang.Double] = BoxedDoubleEncoder - - /** @since 3.4.0 */ - implicit val newBoxedFloatEncoder: Encoder[java.lang.Float] = BoxedFloatEncoder - - /** @since 3.4.0 */ - implicit val newBoxedByteEncoder: Encoder[java.lang.Byte] = BoxedByteEncoder - - /** @since 3.4.0 */ - implicit val newBoxedShortEncoder: Encoder[java.lang.Short] = BoxedShortEncoder - - /** @since 3.4.0 */ - implicit val newBoxedBooleanEncoder: Encoder[java.lang.Boolean] = BoxedBooleanEncoder - - // Seqs - private def newSeqEncoder[E](elementEncoder: AgnosticEncoder[E]): AgnosticEncoder[Seq[E]] = { - IterableEncoder( - classTag[Seq[E]], - elementEncoder, - elementEncoder.nullable, - elementEncoder.lenientSerialization) - } - - /** - * @since 3.4.0 - * @deprecated - * use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - val newIntSeqEncoder: Encoder[Seq[Int]] = newSeqEncoder(PrimitiveIntEncoder) - - /** - * @since 3.4.0 - * @deprecated - * use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - val newLongSeqEncoder: Encoder[Seq[Long]] = newSeqEncoder(PrimitiveLongEncoder) - - /** - * @since 3.4.0 - * @deprecated - * use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - val newDoubleSeqEncoder: Encoder[Seq[Double]] = newSeqEncoder(PrimitiveDoubleEncoder) - - /** - * @since 3.4.0 - * @deprecated - * use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - val newFloatSeqEncoder: Encoder[Seq[Float]] = newSeqEncoder(PrimitiveFloatEncoder) - - /** - * @since 3.4.0 - * @deprecated - * use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - val newByteSeqEncoder: Encoder[Seq[Byte]] = newSeqEncoder(PrimitiveByteEncoder) - - /** - * @since 3.4.0 - * @deprecated - * use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - val newShortSeqEncoder: Encoder[Seq[Short]] = newSeqEncoder(PrimitiveShortEncoder) - - /** - * @since 3.4.0 - * @deprecated - * use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - val newBooleanSeqEncoder: Encoder[Seq[Boolean]] = newSeqEncoder(PrimitiveBooleanEncoder) - - /** - * @since 3.4.0 - * @deprecated - * use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - val newStringSeqEncoder: Encoder[Seq[String]] = newSeqEncoder(StringEncoder) - - /** - * @since 3.4.0 - * @deprecated - * use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - def newProductSeqEncoder[A <: Product: TypeTag]: Encoder[Seq[A]] = - newSeqEncoder(ScalaReflection.encoderFor[A]) - - /** @since 3.4.0 */ - implicit def newSequenceEncoder[T <: Seq[_]: TypeTag]: Encoder[T] = - ScalaReflection.encoderFor[T] - - // Maps - /** @since 3.4.0 */ - implicit def newMapEncoder[T <: Map[_, _]: TypeTag]: Encoder[T] = ScalaReflection.encoderFor[T] - - /** - * Notice that we serialize `Set` to Catalyst array. The set property is only kept when - * manipulating the domain objects. The serialization format doesn't keep the set property. When - * we have a Catalyst array which contains duplicated elements and convert it to - * `Dataset[Set[T]]` by using the encoder, the elements will be de-duplicated. - * - * @since 3.4.0 - */ - implicit def newSetEncoder[T <: Set[_]: TypeTag]: Encoder[T] = ScalaReflection.encoderFor[T] - - // Arrays - private def newArrayEncoder[E]( - elementEncoder: AgnosticEncoder[E]): AgnosticEncoder[Array[E]] = { - ArrayEncoder(elementEncoder, elementEncoder.nullable) - } - - /** @since 3.4.0 */ - implicit val newIntArrayEncoder: Encoder[Array[Int]] = newArrayEncoder(PrimitiveIntEncoder) - - /** @since 3.4.0 */ - implicit val newLongArrayEncoder: Encoder[Array[Long]] = newArrayEncoder(PrimitiveLongEncoder) - - /** @since 3.4.0 */ - implicit val newDoubleArrayEncoder: Encoder[Array[Double]] = - newArrayEncoder(PrimitiveDoubleEncoder) - - /** @since 3.4.0 */ - implicit val newFloatArrayEncoder: Encoder[Array[Float]] = newArrayEncoder( - PrimitiveFloatEncoder) - - /** @since 3.4.0 */ - implicit val newByteArrayEncoder: Encoder[Array[Byte]] = BinaryEncoder - - /** @since 3.4.0 */ - implicit val newShortArrayEncoder: Encoder[Array[Short]] = newArrayEncoder( - PrimitiveShortEncoder) - - /** @since 3.4.0 */ - implicit val newBooleanArrayEncoder: Encoder[Array[Boolean]] = - newArrayEncoder(PrimitiveBooleanEncoder) - - /** @since 3.4.0 */ - implicit val newStringArrayEncoder: Encoder[Array[String]] = newArrayEncoder(StringEncoder) - - /** @since 3.4.0 */ - implicit def newProductArrayEncoder[A <: Product: TypeTag]: Encoder[Array[A]] = { - newArrayEncoder(ScalaReflection.encoderFor[A]) - } - - /** - * Creates a [[Dataset]] from a local Seq. - * @since 3.4.0 - */ - implicit def localSeqToDatasetHolder[T: Encoder](s: Seq[T]): DatasetHolder[T] = { - DatasetHolder(session.createDataset(s)) - } -} - -/** - * Lower priority implicit methods for converting Scala objects into [[Dataset]]s. Conflicting - * implicits are placed here to disambiguate resolution. - * - * Reasons for including specific implicits: newProductEncoder - to disambiguate for `List`s which - * are both `Seq` and `Product` - */ -trait LowPrioritySQLImplicits { - - /** @since 3.4.0 */ - implicit def newProductEncoder[T <: Product: TypeTag]: Encoder[T] = - ScalaReflection.encoderFor[T] +/** @inheritdoc */ +abstract class SQLImplicits private[sql] (override val session: SparkSession) + extends api.SQLImplicits { + type DS[U] = Dataset[U] } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 04f8eeb5c6d46..0663f0186888e 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -252,19 +252,8 @@ class SparkSession private[sql] ( lazy val udf: UDFRegistration = new UDFRegistration(this) // scalastyle:off - // Disable style checker so "implicits" object can start with lowercase i - /** - * (Scala-specific) Implicit methods available in Scala for converting common names and Symbols - * into [[Column]]s, and for converting common Scala objects into DataFrame`s. - * - * {{{ - * val sparkSession = SparkSession.builder.getOrCreate() - * import sparkSession.implicits._ - * }}} - * - * @since 3.4.0 - */ - object implicits extends SQLImplicits(this) with Serializable + /** @inheritdoc */ + object implicits extends SQLImplicits(this) // scalastyle:on /** @inheritdoc */ diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index ece4504395f12..6e3fd50d0afd4 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -183,6 +183,17 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryStartedEvent"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryStatus"), + + // SPARK-49415: Shared SQLImplicits. + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DatasetHolder"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DatasetHolder$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.LowPrioritySQLImplicits"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.SQLContext$implicits$"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.SQLImplicits"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLImplicits.StringToColumn"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.this"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLImplicits$StringToColumn"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.SparkSession$implicits$"), ) // Default exclude rules diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala b/sql/api/src/main/scala/org/apache/spark/sql/DatasetHolder.scala similarity index 80% rename from sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala rename to sql/api/src/main/scala/org/apache/spark/sql/DatasetHolder.scala index 1c4ffefb897ea..84c68c39e7996 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/DatasetHolder.scala @@ -18,9 +18,10 @@ package org.apache.spark.sql import org.apache.spark.annotation.Stable +import org.apache.spark.sql.api.Dataset /** - * A container for a [[Dataset]], used for implicit conversions in Scala. + * A container for a [[api.Dataset]], used for implicit conversions in Scala. * * To use this, import implicit conversions in SQL: * {{{ @@ -31,15 +32,15 @@ import org.apache.spark.annotation.Stable * @since 1.6.0 */ @Stable -case class DatasetHolder[T] private[sql](private val ds: Dataset[T]) { +class DatasetHolder[T, DS[U] <: Dataset[U]](ds: DS[T]) { // This is declared with parentheses to prevent the Scala compiler from treating // `rdd.toDS("1")` as invoking this toDS and then apply on the returned Dataset. - def toDS(): Dataset[T] = ds + def toDS(): DS[T] = ds // This is declared with parentheses to prevent the Scala compiler from treating // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. - def toDF(): DataFrame = ds.toDF() + def toDF(): DS[Row] = ds.toDF().asInstanceOf[DS[Row]] - def toDF(colNames: String*): DataFrame = ds.toDF(colNames : _*) + def toDF(colNames: String*): DS[Row] = ds.toDF(colNames: _*).asInstanceOf[DS[Row]] } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SQLImplicits.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/SQLImplicits.scala new file mode 100644 index 0000000000000..f6b44e168390a --- /dev/null +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SQLImplicits.scala @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.api + +import scala.collection.Map +import scala.language.implicitConversions +import scala.reflect.classTag +import scala.reflect.runtime.universe.TypeTag + +import _root_.java + +import org.apache.spark.sql.{ColumnName, DatasetHolder, Encoder, Encoders} +import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, DEFAULT_SCALA_DECIMAL_ENCODER, IterableEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, StringEncoder} + +/** + * A collection of implicit methods for converting common Scala objects into + * [[org.apache.spark.sql.api.Dataset]]s. + * + * @since 1.6.0 + */ +abstract class SQLImplicits extends LowPrioritySQLImplicits with Serializable { + type DS[U] <: Dataset[U] + + protected def session: SparkSession + + /** + * Converts $"col name" into a [[org.apache.spark.sql.Column]]. + * + * @since 2.0.0 + */ + implicit class StringToColumn(val sc: StringContext) { + def $(args: Any*): ColumnName = { + new ColumnName(sc.s(args: _*)) + } + } + + // Primitives + + /** @since 1.6.0 */ + implicit def newIntEncoder: Encoder[Int] = Encoders.scalaInt + + /** @since 1.6.0 */ + implicit def newLongEncoder: Encoder[Long] = Encoders.scalaLong + + /** @since 1.6.0 */ + implicit def newDoubleEncoder: Encoder[Double] = Encoders.scalaDouble + + /** @since 1.6.0 */ + implicit def newFloatEncoder: Encoder[Float] = Encoders.scalaFloat + + /** @since 1.6.0 */ + implicit def newByteEncoder: Encoder[Byte] = Encoders.scalaByte + + /** @since 1.6.0 */ + implicit def newShortEncoder: Encoder[Short] = Encoders.scalaShort + + /** @since 1.6.0 */ + implicit def newBooleanEncoder: Encoder[Boolean] = Encoders.scalaBoolean + + /** @since 1.6.0 */ + implicit def newStringEncoder: Encoder[String] = Encoders.STRING + + /** @since 2.2.0 */ + implicit def newJavaDecimalEncoder: Encoder[java.math.BigDecimal] = Encoders.DECIMAL + + /** @since 2.2.0 */ + implicit def newScalaDecimalEncoder: Encoder[scala.math.BigDecimal] = + DEFAULT_SCALA_DECIMAL_ENCODER + + /** @since 2.2.0 */ + implicit def newDateEncoder: Encoder[java.sql.Date] = Encoders.DATE + + /** @since 3.0.0 */ + implicit def newLocalDateEncoder: Encoder[java.time.LocalDate] = Encoders.LOCALDATE + + /** @since 3.4.0 */ + implicit def newLocalDateTimeEncoder: Encoder[java.time.LocalDateTime] = Encoders.LOCALDATETIME + + /** @since 2.2.0 */ + implicit def newTimeStampEncoder: Encoder[java.sql.Timestamp] = Encoders.TIMESTAMP + + /** @since 3.0.0 */ + implicit def newInstantEncoder: Encoder[java.time.Instant] = Encoders.INSTANT + + /** @since 3.2.0 */ + implicit def newDurationEncoder: Encoder[java.time.Duration] = Encoders.DURATION + + /** @since 3.2.0 */ + implicit def newPeriodEncoder: Encoder[java.time.Period] = Encoders.PERIOD + + /** @since 3.2.0 */ + implicit def newJavaEnumEncoder[A <: java.lang.Enum[_]: TypeTag]: Encoder[A] = + ScalaReflection.encoderFor[A] + + // Boxed primitives + + /** @since 2.0.0 */ + implicit def newBoxedIntEncoder: Encoder[java.lang.Integer] = Encoders.INT + + /** @since 2.0.0 */ + implicit def newBoxedLongEncoder: Encoder[java.lang.Long] = Encoders.LONG + + /** @since 2.0.0 */ + implicit def newBoxedDoubleEncoder: Encoder[java.lang.Double] = Encoders.DOUBLE + + /** @since 2.0.0 */ + implicit def newBoxedFloatEncoder: Encoder[java.lang.Float] = Encoders.FLOAT + + /** @since 2.0.0 */ + implicit def newBoxedByteEncoder: Encoder[java.lang.Byte] = Encoders.BYTE + + /** @since 2.0.0 */ + implicit def newBoxedShortEncoder: Encoder[java.lang.Short] = Encoders.SHORT + + /** @since 2.0.0 */ + implicit def newBoxedBooleanEncoder: Encoder[java.lang.Boolean] = Encoders.BOOLEAN + + // Seqs + private def newSeqEncoder[E](elementEncoder: AgnosticEncoder[E]): AgnosticEncoder[Seq[E]] = { + IterableEncoder( + classTag[Seq[E]], + elementEncoder, + elementEncoder.nullable, + elementEncoder.lenientSerialization) + } + + /** + * @since 1.6.1 + * @deprecated + * use [[newSequenceEncoder]] + */ + @deprecated("Use newSequenceEncoder instead", "2.2.0") + val newIntSeqEncoder: Encoder[Seq[Int]] = newSeqEncoder(PrimitiveIntEncoder) + + /** + * @since 1.6.1 + * @deprecated + * use [[newSequenceEncoder]] + */ + @deprecated("Use newSequenceEncoder instead", "2.2.0") + val newLongSeqEncoder: Encoder[Seq[Long]] = newSeqEncoder(PrimitiveLongEncoder) + + /** + * @since 1.6.1 + * @deprecated + * use [[newSequenceEncoder]] + */ + @deprecated("Use newSequenceEncoder instead", "2.2.0") + val newDoubleSeqEncoder: Encoder[Seq[Double]] = newSeqEncoder(PrimitiveDoubleEncoder) + + /** + * @since 1.6.1 + * @deprecated + * use [[newSequenceEncoder]] + */ + @deprecated("Use newSequenceEncoder instead", "2.2.0") + val newFloatSeqEncoder: Encoder[Seq[Float]] = newSeqEncoder(PrimitiveFloatEncoder) + + /** + * @since 1.6.1 + * @deprecated + * use [[newSequenceEncoder]] + */ + @deprecated("Use newSequenceEncoder instead", "2.2.0") + val newByteSeqEncoder: Encoder[Seq[Byte]] = newSeqEncoder(PrimitiveByteEncoder) + + /** + * @since 1.6.1 + * @deprecated + * use [[newSequenceEncoder]] + */ + @deprecated("Use newSequenceEncoder instead", "2.2.0") + val newShortSeqEncoder: Encoder[Seq[Short]] = newSeqEncoder(PrimitiveShortEncoder) + + /** + * @since 1.6.1 + * @deprecated + * use [[newSequenceEncoder]] + */ + @deprecated("Use newSequenceEncoder instead", "2.2.0") + val newBooleanSeqEncoder: Encoder[Seq[Boolean]] = newSeqEncoder(PrimitiveBooleanEncoder) + + /** + * @since 1.6.1 + * @deprecated + * use [[newSequenceEncoder]] + */ + @deprecated("Use newSequenceEncoder instead", "2.2.0") + val newStringSeqEncoder: Encoder[Seq[String]] = newSeqEncoder(StringEncoder) + + /** + * @since 1.6.1 + * @deprecated + * use [[newSequenceEncoder]] + */ + @deprecated("Use newSequenceEncoder instead", "2.2.0") + def newProductSeqEncoder[A <: Product: TypeTag]: Encoder[Seq[A]] = + newSeqEncoder(ScalaReflection.encoderFor[A]) + + /** @since 2.2.0 */ + implicit def newSequenceEncoder[T <: Seq[_]: TypeTag]: Encoder[T] = + ScalaReflection.encoderFor[T] + + // Maps + /** @since 2.3.0 */ + implicit def newMapEncoder[T <: Map[_, _]: TypeTag]: Encoder[T] = ScalaReflection.encoderFor[T] + + /** + * Notice that we serialize `Set` to Catalyst array. The set property is only kept when + * manipulating the domain objects. The serialization format doesn't keep the set property. When + * we have a Catalyst array which contains duplicated elements and convert it to + * `Dataset[Set[T]]` by using the encoder, the elements will be de-duplicated. + * + * @since 2.3.0 + */ + implicit def newSetEncoder[T <: Set[_]: TypeTag]: Encoder[T] = ScalaReflection.encoderFor[T] + + // Arrays + private def newArrayEncoder[E]( + elementEncoder: AgnosticEncoder[E]): AgnosticEncoder[Array[E]] = { + ArrayEncoder(elementEncoder, elementEncoder.nullable) + } + + /** @since 1.6.1 */ + implicit val newIntArrayEncoder: Encoder[Array[Int]] = newArrayEncoder(PrimitiveIntEncoder) + + /** @since 1.6.1 */ + implicit val newLongArrayEncoder: Encoder[Array[Long]] = newArrayEncoder(PrimitiveLongEncoder) + + /** @since 1.6.1 */ + implicit val newDoubleArrayEncoder: Encoder[Array[Double]] = + newArrayEncoder(PrimitiveDoubleEncoder) + + /** @since 1.6.1 */ + implicit val newFloatArrayEncoder: Encoder[Array[Float]] = + newArrayEncoder(PrimitiveFloatEncoder) + + /** @since 1.6.1 */ + implicit val newByteArrayEncoder: Encoder[Array[Byte]] = Encoders.BINARY + + /** @since 1.6.1 */ + implicit val newShortArrayEncoder: Encoder[Array[Short]] = + newArrayEncoder(PrimitiveShortEncoder) + + /** @since 1.6.1 */ + implicit val newBooleanArrayEncoder: Encoder[Array[Boolean]] = + newArrayEncoder(PrimitiveBooleanEncoder) + + /** @since 1.6.1 */ + implicit val newStringArrayEncoder: Encoder[Array[String]] = + newArrayEncoder(StringEncoder) + + /** @since 1.6.1 */ + implicit def newProductArrayEncoder[A <: Product: TypeTag]: Encoder[Array[A]] = + newArrayEncoder(ScalaReflection.encoderFor[A]) + + /** + * Creates a [[Dataset]] from a local Seq. + * @since 1.6.0 + */ + implicit def localSeqToDatasetHolder[T: Encoder](s: Seq[T]): DatasetHolder[T, DS] = { + new DatasetHolder(session.createDataset(s).asInstanceOf[DS[T]]) + } + + /** + * An implicit conversion that turns a Scala `Symbol` into a [[org.apache.spark.sql.Column]]. + * @since 1.3.0 + */ + implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name) +} + +/** + * Lower priority implicit methods for converting Scala objects into + * [[org.apache.spark.sql.api.Dataset]]s. Conflicting implicits are placed here to disambiguate + * resolution. + * + * Reasons for including specific implicits: newProductEncoder - to disambiguate for `List`s which + * are both `Seq` and `Product` + */ +trait LowPrioritySQLImplicits { + + /** @since 1.6.0 */ + implicit def newProductEncoder[T <: Product: TypeTag]: Encoder[T] = Encoders.product[T] +} diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala index 41d16b16ab1c5..2623db4060ee6 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala @@ -505,6 +505,19 @@ abstract class SparkSession extends Serializable with Closeable { */ def read: DataFrameReader + /** + * (Scala-specific) Implicit methods available in Scala for converting common Scala objects into + * `DataFrame`s. + * + * {{{ + * val sparkSession = SparkSession.builder.getOrCreate() + * import sparkSession.implicits._ + * }}} + * + * @since 2.0.0 + */ + val implicits: SQLImplicits + /** * Executes some code block and prints to stdout the time taken to execute the block. This is * available in Scala only and is used primarily for interactive testing and debugging. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index ffcc0b923f2cb..636899a7acb06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -251,8 +251,8 @@ class SQLContext private[sql](val sparkSession: SparkSession) * @group basic * @since 1.3.0 */ - object implicits extends SQLImplicits with Serializable { - protected override def session: SparkSession = self.sparkSession + object implicits extends SQLImplicits { + override protected def session: SparkSession = sparkSession } // scalastyle:on diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index a657836aafbea..1bc7e3ee98e76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -17,259 +17,21 @@ package org.apache.spark.sql -import scala.collection.Map import scala.language.implicitConversions -import scala.reflect.runtime.universe.TypeTag import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -/** - * A collection of implicit methods for converting common Scala objects into [[Dataset]]s. - * - * @since 1.6.0 - */ -abstract class SQLImplicits extends LowPrioritySQLImplicits { +/** @inheritdoc */ +abstract class SQLImplicits extends api.SQLImplicits { + type DS[U] = Dataset[U] protected def session: SparkSession - /** - * Converts $"col name" into a [[Column]]. - * - * @since 2.0.0 - */ - implicit class StringToColumn(val sc: StringContext) { - def $(args: Any*): ColumnName = { - new ColumnName(sc.s(args: _*)) - } - } - - // Primitives - - /** @since 1.6.0 */ - implicit def newIntEncoder: Encoder[Int] = Encoders.scalaInt - - /** @since 1.6.0 */ - implicit def newLongEncoder: Encoder[Long] = Encoders.scalaLong - - /** @since 1.6.0 */ - implicit def newDoubleEncoder: Encoder[Double] = Encoders.scalaDouble - - /** @since 1.6.0 */ - implicit def newFloatEncoder: Encoder[Float] = Encoders.scalaFloat - - /** @since 1.6.0 */ - implicit def newByteEncoder: Encoder[Byte] = Encoders.scalaByte - - /** @since 1.6.0 */ - implicit def newShortEncoder: Encoder[Short] = Encoders.scalaShort - - /** @since 1.6.0 */ - implicit def newBooleanEncoder: Encoder[Boolean] = Encoders.scalaBoolean - - /** @since 1.6.0 */ - implicit def newStringEncoder: Encoder[String] = Encoders.STRING - - /** @since 2.2.0 */ - implicit def newJavaDecimalEncoder: Encoder[java.math.BigDecimal] = Encoders.DECIMAL - - /** @since 2.2.0 */ - implicit def newScalaDecimalEncoder: Encoder[scala.math.BigDecimal] = ExpressionEncoder() - - /** @since 2.2.0 */ - implicit def newDateEncoder: Encoder[java.sql.Date] = Encoders.DATE - - /** @since 3.0.0 */ - implicit def newLocalDateEncoder: Encoder[java.time.LocalDate] = Encoders.LOCALDATE - - /** @since 3.4.0 */ - implicit def newLocalDateTimeEncoder: Encoder[java.time.LocalDateTime] = Encoders.LOCALDATETIME - - /** @since 2.2.0 */ - implicit def newTimeStampEncoder: Encoder[java.sql.Timestamp] = Encoders.TIMESTAMP - - /** @since 3.0.0 */ - implicit def newInstantEncoder: Encoder[java.time.Instant] = Encoders.INSTANT - - /** @since 3.2.0 */ - implicit def newDurationEncoder: Encoder[java.time.Duration] = Encoders.DURATION - - /** @since 3.2.0 */ - implicit def newPeriodEncoder: Encoder[java.time.Period] = Encoders.PERIOD - - /** @since 3.2.0 */ - implicit def newJavaEnumEncoder[A <: java.lang.Enum[_] : TypeTag]: Encoder[A] = - ExpressionEncoder() - - // Boxed primitives - - /** @since 2.0.0 */ - implicit def newBoxedIntEncoder: Encoder[java.lang.Integer] = Encoders.INT - - /** @since 2.0.0 */ - implicit def newBoxedLongEncoder: Encoder[java.lang.Long] = Encoders.LONG - - /** @since 2.0.0 */ - implicit def newBoxedDoubleEncoder: Encoder[java.lang.Double] = Encoders.DOUBLE - - /** @since 2.0.0 */ - implicit def newBoxedFloatEncoder: Encoder[java.lang.Float] = Encoders.FLOAT - - /** @since 2.0.0 */ - implicit def newBoxedByteEncoder: Encoder[java.lang.Byte] = Encoders.BYTE - - /** @since 2.0.0 */ - implicit def newBoxedShortEncoder: Encoder[java.lang.Short] = Encoders.SHORT - - /** @since 2.0.0 */ - implicit def newBoxedBooleanEncoder: Encoder[java.lang.Boolean] = Encoders.BOOLEAN - - // Seqs - - /** - * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - def newIntSeqEncoder: Encoder[Seq[Int]] = ExpressionEncoder() - - /** - * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - def newLongSeqEncoder: Encoder[Seq[Long]] = ExpressionEncoder() - - /** - * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - def newDoubleSeqEncoder: Encoder[Seq[Double]] = ExpressionEncoder() - - /** - * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - def newFloatSeqEncoder: Encoder[Seq[Float]] = ExpressionEncoder() - - /** - * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - def newByteSeqEncoder: Encoder[Seq[Byte]] = ExpressionEncoder() - - /** - * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - def newShortSeqEncoder: Encoder[Seq[Short]] = ExpressionEncoder() - - /** - * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - def newBooleanSeqEncoder: Encoder[Seq[Boolean]] = ExpressionEncoder() - - /** - * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - def newStringSeqEncoder: Encoder[Seq[String]] = ExpressionEncoder() - - /** - * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] - */ - @deprecated("Use newSequenceEncoder instead", "2.2.0") - def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = ExpressionEncoder() - - /** @since 2.2.0 */ - implicit def newSequenceEncoder[T <: Seq[_] : TypeTag]: Encoder[T] = ExpressionEncoder() - - // Maps - /** @since 2.3.0 */ - implicit def newMapEncoder[T <: Map[_, _] : TypeTag]: Encoder[T] = ExpressionEncoder() - - /** - * Notice that we serialize `Set` to Catalyst array. The set property is only kept when - * manipulating the domain objects. The serialization format doesn't keep the set property. - * When we have a Catalyst array which contains duplicated elements and convert it to - * `Dataset[Set[T]]` by using the encoder, the elements will be de-duplicated. - * - * @since 2.3.0 - */ - implicit def newSetEncoder[T <: Set[_] : TypeTag]: Encoder[T] = ExpressionEncoder() - - // Arrays - - /** @since 1.6.1 */ - implicit def newIntArrayEncoder: Encoder[Array[Int]] = ExpressionEncoder() - - /** @since 1.6.1 */ - implicit def newLongArrayEncoder: Encoder[Array[Long]] = ExpressionEncoder() - - /** @since 1.6.1 */ - implicit def newDoubleArrayEncoder: Encoder[Array[Double]] = ExpressionEncoder() - - /** @since 1.6.1 */ - implicit def newFloatArrayEncoder: Encoder[Array[Float]] = ExpressionEncoder() - - /** @since 1.6.1 */ - implicit def newByteArrayEncoder: Encoder[Array[Byte]] = Encoders.BINARY - - /** @since 1.6.1 */ - implicit def newShortArrayEncoder: Encoder[Array[Short]] = ExpressionEncoder() - - /** @since 1.6.1 */ - implicit def newBooleanArrayEncoder: Encoder[Array[Boolean]] = ExpressionEncoder() - - /** @since 1.6.1 */ - implicit def newStringArrayEncoder: Encoder[Array[String]] = ExpressionEncoder() - - /** @since 1.6.1 */ - implicit def newProductArrayEncoder[A <: Product : TypeTag]: Encoder[Array[A]] = - ExpressionEncoder() - /** * Creates a [[Dataset]] from an RDD. * * @since 1.6.0 */ - implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = { - DatasetHolder(session.createDataset(rdd)) - } - - /** - * Creates a [[Dataset]] from a local Seq. - * @since 1.6.0 - */ - implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { - DatasetHolder(session.createDataset(s)) - } - - /** - * An implicit conversion that turns a Scala `Symbol` into a [[Column]]. - * @since 1.3.0 - */ - implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name) - -} - -/** - * Lower priority implicit methods for converting Scala objects into [[Dataset]]s. - * Conflicting implicits are placed here to disambiguate resolution. - * - * Reasons for including specific implicits: - * newProductEncoder - to disambiguate for `List`s which are both `Seq` and `Product` - */ -trait LowPrioritySQLImplicits { - /** @since 1.6.0 */ - implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T] - + implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T, Dataset] = + new DatasetHolder(session.createDataset(rdd)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 137dbaed9f00a..938df206b9792 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -752,19 +752,8 @@ class SparkSession private( // scalastyle:off // Disable style checker so "implicits" object can start with lowercase i - /** - * (Scala-specific) Implicit methods available in Scala for converting - * common Scala objects into `DataFrame`s. - * - * {{{ - * val sparkSession = SparkSession.builder.getOrCreate() - * import sparkSession.implicits._ - * }}} - * - * @since 2.0.0 - */ - object implicits extends SQLImplicits with Serializable { - protected override def session: SparkSession = SparkSession.this + object implicits extends SQLImplicits { + override protected def session: SparkSession = self } // scalastyle:on diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala index 6277f8b459248..8d17edd42442e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala @@ -39,11 +39,6 @@ object typed { // For example, avg in the Scala version returns Scala primitive Double, whose bytecode // signature is just a java.lang.Object; avg in the Java version returns java.lang.Double. - // TODO: This is pretty hacky. Maybe we should have an object for implicit encoders. - private val implicits = new SQLImplicits { - override protected def session: SparkSession = null - } - /** * Average aggregate function. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala index d7c00b68828c4..90432dea3a017 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala @@ -35,7 +35,7 @@ private[sql] trait SQLTestData { self => // Helper object to import SQL implicits without a concrete SparkSession private object internalImplicits extends SQLImplicits { - protected override def session: SparkSession = self.spark + override protected def session: SparkSession = self.spark } import internalImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 54d6840eb5775..fe5a0f8ee257a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -240,7 +240,7 @@ private[sql] trait SQLTestUtilsBase * but the implicits import is needed in the constructor. */ protected object testImplicits extends SQLImplicits { - protected override def session: SparkSession = self.spark + override protected def session: SparkSession = self.spark implicit def toRichColumn(c: Column): SparkSession#RichColumn = session.RichColumn(c) }