Skip to content

Commit

Permalink
[SPARK-49307][CONNECT][SQL] Add Kryo serialization to agnostic encode…
Browse files Browse the repository at this point in the history
…r framework

### What changes were proposed in this pull request?
This PR adds support for Kryo based serialization using the AgnosticEncoder framework. Please note that Kryo serialization is only support for Classic.

### Why are the changes needed?
This PR is a stepping stone for centralizing the Encoders object in sql/api.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #47983 from hvanhovell/SPARK-49307.

Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
hvanhovell authored and dongjoon-hyun committed Sep 6, 2024
1 parent 23bea28 commit e49e31d
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 5 deletions.
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,12 @@
],
"sqlState" : "42846"
},
"CANNOT_USE_KRYO" : {
"message" : [
"Cannot load Kryo serialization codec. Kryo serialization cannot be used in the Spark Connect client. Use Java serialization, provide a custom Codec, or use Spark Classic instead."
],
"sqlState" : "22KD3"
},
"CANNOT_WRITE_STATE_STORE" : {
"message" : [
"Error writing state store files for provider <providerClass>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection}
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, JavaSerializationCodec, RowEncoder => RowEncoderFactory}
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, JavaSerializationCodec, KryoSerializationCodec, RowEncoder => RowEncoderFactory}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -203,6 +203,28 @@ object Encoders {
*/
def javaSerialization[T](clazz: Class[T]): Encoder[T] = javaSerialization(ClassTag[T](clazz))

/**
* (Scala-specific) Creates an encoder that serializes objects of type T using Kryo. This
* encoder maps T into a single byte array (binary) field.
*
* T must be publicly accessible.
*
* @since 4.0.0
*/
def kryo[T: ClassTag]: Encoder[T] = {
TransformingEncoder(implicitly[ClassTag[T]], BinaryEncoder, KryoSerializationCodec)
}

/**
* Creates an encoder that serializes objects of type T using Kryo. This encoder maps T into a
* single byte array (binary) field.
*
* T must be publicly accessible.
*
* @since 4.0.0
*/
def kryo[T](clazz: Class[T]): Encoder[T] = kryo(ClassTag[T](clazz))

private def tupleEncoder[T](encoders: Encoder[_]*): Encoder[T] = {
ProductEncoder.tuple(encoders.asInstanceOf[Seq[AgnosticEncoder[_]]]).asInstanceOf[Encoder[T]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.arrow.memory.{BufferAllocator, RootAllocator}
import org.apache.arrow.vector.VarBinaryVector
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.{sql, SparkUnsupportedOperationException}
import org.apache.spark.{sql, SparkRuntimeException, SparkUnsupportedOperationException}
import org.apache.spark.sql.{AnalysisException, Encoders, Row}
import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, JavaTypeInference, ScalaReflection}
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, Codec, OuterScopes}
Expand Down Expand Up @@ -776,6 +776,16 @@ class ArrowEncoderSuite extends ConnectFunSuite with BeforeAndAfterAll {
}
}

test("kryo serialization") {
val e = intercept[SparkRuntimeException] {
val encoder = sql.encoderFor(Encoders.kryo[(Int, String)])
roundTripAndCheckIdentical(encoder) { () =>
Iterator.tabulate(10)(i => (i, "itr_" + i))
}
}
assert(e.getErrorClass == "CANNOT_USE_KRYO")
}

test("transforming encoder") {
val schema = new StructType()
.add("key", IntegerType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
*/
package org.apache.spark.sql.catalyst.encoders

import org.apache.spark.util.SparkSerDeUtils
import java.lang.invoke.{MethodHandle, MethodHandles, MethodType}

import org.apache.spark.sql.errors.ExecutionErrors
import org.apache.spark.util.{SparkClassUtils, SparkSerDeUtils}

/**
* Codec for doing conversions between two representations.
Expand All @@ -40,3 +43,29 @@ class JavaSerializationCodec[I] extends Codec[I, Array[Byte]] {
object JavaSerializationCodec extends (() => Codec[Any, Array[Byte]]) {
override def apply(): Codec[Any, Array[Byte]] = new JavaSerializationCodec[Any]
}

/**
* A codec that uses Kryo to (de)serialize arbitrary objects to and from a byte array.
*
* Please note that this is currently only supported for Classic Spark applications. The reason
* for this is that Connect applications can have a significantly different classpath than the
* driver or executor. This makes having a the same Kryo configuration on both the client and
* server (driver & executors) very tricky. As a workaround a user can define their own Codec
* which internalizes the Kryo configuration.
*/
object KryoSerializationCodec extends (() => Codec[Any, Array[Byte]]) {
private lazy val kryoCodecConstructor: MethodHandle = {
val cls = SparkClassUtils.classForName(
"org.apache.spark.sql.catalyst.encoders.KryoSerializationCodecImpl")
MethodHandles.lookup().findConstructor(cls, MethodType.methodType(classOf[Unit]))
}

override def apply(): Codec[Any, Array[Byte]] = {
try {
kryoCodecConstructor.invoke().asInstanceOf[Codec[Any, Array[Byte]]]
} catch {
case _: ClassNotFoundException =>
throw ExecutionErrors.cannotUseKryoSerialization()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ private[sql] trait ExecutionErrors extends DataTypeErrorsBase {
messageParameters = Map(
"innerCls" -> innerCls.getName))
}

def cannotUseKryoSerialization(): SparkRuntimeException = {
new SparkRuntimeException(
errorClass = "CANNOT_USE_KRYO",
messageParameters = Map.empty)
}
}

private[sql] object ExecutionErrors extends ExecutionErrors
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.encoders

import java.nio.ByteBuffer

import org.apache.spark.sql.catalyst.expressions.objects.SerializerSupport

/**
* A codec that uses Kryo to (de)serialize arbitrary objects to and from a byte array.
*/
class KryoSerializationCodecImpl extends Codec [Any, Array[Byte]] {
private val serializer = SerializerSupport.newSerializer(useKryo = true)
override def encode(in: Any): Array[Byte] =
serializer.serialize(in).array()

override def decode(out: Array[Byte]): Any =
serializer.deserialize(ByteBuffer.wrap(out))
}
Original file line number Diff line number Diff line change
Expand Up @@ -552,18 +552,24 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes
encodeDecodeTest(FooClassWithEnum(1, FooEnum.E1), "case class with Int and scala Enum")
encodeDecodeTest(FooEnum.E1, "scala Enum")

test("transforming encoder") {

private def testTransformingEncoder(
name: String,
provider: () => Codec[Any, Array[Byte]]): Unit = test(name) {
val encoder = ExpressionEncoder(TransformingEncoder(
classTag[(Long, Long)],
BinaryEncoder,
JavaSerializationCodec))
provider))
.resolveAndBind()
assert(encoder.schema == new StructType().add("value", BinaryType))
val toRow = encoder.createSerializer()
val fromRow = encoder.createDeserializer()
assert(fromRow(toRow((11, 14))) == (11, 14))
}

testTransformingEncoder("transforming java serialization encoder", JavaSerializationCodec)
testTransformingEncoder("transforming kryo encoder", KryoSerializationCodec)

// Scala / Java big decimals ----------------------------------------------------------

encodeDecodeTest(BigDecimal(("9" * 20) + "." + "9" * 18),
Expand Down

0 comments on commit e49e31d

Please sign in to comment.