Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51438][SQL] Make CatalystDataToProtobuf and ProtobufDataToCatalyst properly comparable and hashable #50212

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,35 @@ private[sql] case class CatalystDataToProtobuf(

override protected def withNewChildInternal(newChild: Expression): CatalystDataToProtobuf =
copy(child = newChild)

override def equals(that: Any): Boolean = {
that match {
case that: CatalystDataToProtobuf =>
this.child == that.child &&
this.messageName == that.messageName &&
(
(this.binaryFileDescriptorSet.isEmpty && that.binaryFileDescriptorSet.isEmpty) ||
(
this.binaryFileDescriptorSet.nonEmpty && that.binaryFileDescriptorSet.nonEmpty &&
this.binaryFileDescriptorSet.get.sameElements(that.binaryFileDescriptorSet.get)
)
) &&
this.options == that.options
case _ => false
}
}

override def hashCode(): Int = {
val prime = 31
var result = 1
var i = 0
while (i < binaryFileDescriptorSet.map(_.length).getOrElse(0)) {
result = prime * result + binaryFileDescriptorSet.get.apply(i).hashCode
i += 1
}
result = prime * result + child.hashCode
result = prime * result + messageName.hashCode
result = prime * result + options.hashCode
result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,35 @@ private[sql] case class ProtobufDataToCatalyst(

override protected def withNewChildInternal(newChild: Expression): ProtobufDataToCatalyst =
copy(child = newChild)

override def equals(that: Any): Boolean = {
that match {
case that: ProtobufDataToCatalyst =>
this.child == that.child &&
this.messageName == that.messageName &&
(
(this.binaryFileDescriptorSet.isEmpty && that.binaryFileDescriptorSet.isEmpty) ||
(
this.binaryFileDescriptorSet.nonEmpty && that.binaryFileDescriptorSet.nonEmpty &&
this.binaryFileDescriptorSet.get.sameElements(that.binaryFileDescriptorSet.get)
)
) &&
this.options == that.options
case _ => false
}
}

override def hashCode(): Int = {
val prime = 31
var result = 1
var i = 0
while (i < binaryFileDescriptorSet.map(_.length).getOrElse(0)) {
result = prime * result + binaryFileDescriptorSet.get.apply(i).hashCode
i += 1
}
result = prime * result + child.hashCode
result = prime * result + messageName.hashCode
result = prime * result + options.hashCode
result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,177 @@ class ProtobufCatalystDataConversionSuite
testFileDesc, "org.apache.spark.sql.protobuf.protos.BytesMsg")
assert(withFullName.findFieldByName("bytes_type") != null)
}

test("CatalystDataToProtobuf equals") {
val catalystDataToProtobuf = generateCatalystDataToProtobuf()

assert(
catalystDataToProtobuf
== catalystDataToProtobuf.copy()
)
assert(
catalystDataToProtobuf
!= catalystDataToProtobuf.copy(options = Map("mode" -> "FAILFAST"))
)
assert(
catalystDataToProtobuf
!= catalystDataToProtobuf.copy(messageName = "otherMessage")
)
assert(
catalystDataToProtobuf
!= catalystDataToProtobuf.copy(child = Literal.create(0, IntegerType))
)
assert(
catalystDataToProtobuf
!= catalystDataToProtobuf.copy(binaryFileDescriptorSet = None)
)

val testFileDescCopy = new Array[Byte](testFileDesc.length)
testFileDesc.copyToArray(testFileDescCopy)
assert(
catalystDataToProtobuf
== catalystDataToProtobuf.copy(binaryFileDescriptorSet = Some(testFileDescCopy))
)

testFileDescCopy(0) = '0'
assert(
catalystDataToProtobuf
!= catalystDataToProtobuf.copy(binaryFileDescriptorSet = Some(testFileDescCopy))
)
}

test("CatalystDataToProtobuf hashCode") {
val catalystDataToProtobuf = generateCatalystDataToProtobuf()

assert(
catalystDataToProtobuf.hashCode == 18619165
)
assert(
catalystDataToProtobuf.copy(options = Map("mode" -> "FAILFAST")).hashCode == 1546556846
)
assert(
catalystDataToProtobuf.copy(messageName = "otherMessage").hashCode == -794759603
)
assert(
catalystDataToProtobuf.copy(child = Literal.create(0, IntegerType)).hashCode == 1936538518
)
assert(
catalystDataToProtobuf.copy(binaryFileDescriptorSet = None).hashCode == 1823277823
)

val testFileDescCopy = new Array[Byte](testFileDesc.length)
testFileDesc.copyToArray(testFileDescCopy)
assert(
catalystDataToProtobuf.copy(
binaryFileDescriptorSet = Some(testFileDescCopy)
).hashCode == 18619165
)

testFileDescCopy(0) = '0'
assert(
catalystDataToProtobuf.copy(
binaryFileDescriptorSet = Some(testFileDescCopy)
).hashCode == -536586429
)
}

test("ProtobufDataToCatalyst equals") {
val catalystDataToProtobuf = generateCatalystDataToProtobuf()
val protobufDataToCatalyst = ProtobufDataToCatalyst(
catalystDataToProtobuf,
"message",
Some(testFileDesc),
Map("mode" -> "PERMISSIVE")
)

assert(
protobufDataToCatalyst
== protobufDataToCatalyst.copy()
)
assert(
protobufDataToCatalyst
!= protobufDataToCatalyst.copy(options = Map("mode" -> "FAILFAST"))
)
assert(
protobufDataToCatalyst
!= protobufDataToCatalyst.copy(messageName = "otherMessage")
)
assert(
protobufDataToCatalyst
!= protobufDataToCatalyst.copy(child = Literal.create(0, IntegerType))
)
assert(
protobufDataToCatalyst
!= protobufDataToCatalyst.copy(binaryFileDescriptorSet = None)
)

val testFileDescCopy = new Array[Byte](testFileDesc.length)
testFileDesc.copyToArray(testFileDescCopy)
assert(
protobufDataToCatalyst
== protobufDataToCatalyst.copy(binaryFileDescriptorSet = Some(testFileDescCopy))
)

testFileDescCopy(0) = '0'
assert(
protobufDataToCatalyst
!= protobufDataToCatalyst.copy(binaryFileDescriptorSet = Some(testFileDescCopy))
)
}

test("ProtobufDataToCatalyst hashCode") {
val catalystDataToProtobuf = generateCatalystDataToProtobuf()
val protobufDataToCatalyst = ProtobufDataToCatalyst(
catalystDataToProtobuf,
"message",
Some(testFileDesc),
Map("mode" -> "PERMISSIVE")
)

assert(
protobufDataToCatalyst.hashCode == -937893175
)
assert(
protobufDataToCatalyst.copy(options = Map("mode" -> "FAILFAST")).hashCode == -1634963844
)
assert(
protobufDataToCatalyst.copy(messageName = "otherMessage").hashCode == -1751271943
)
assert(
protobufDataToCatalyst.copy(child = Literal.create(0, IntegerType)).hashCode == -133420428
)
assert(
protobufDataToCatalyst.copy(binaryFileDescriptorSet = None).hashCode == 866765483
)

val testFileDescCopy = new Array[Byte](testFileDesc.length)
testFileDesc.copyToArray(testFileDescCopy)
assert(
protobufDataToCatalyst.copy(
binaryFileDescriptorSet = Some(testFileDescCopy)
).hashCode == -937893175
)

testFileDescCopy(0) = '0'
assert(
protobufDataToCatalyst.copy(
binaryFileDescriptorSet = Some(testFileDescCopy)
).hashCode == -1493098769
)
}

private def generateCatalystDataToProtobuf() = {
val schema = StructType(
Seq(
StructField("a", StringType),
StructField("b", IntegerType)
)
)
val messageName = "message"
val data = RandomDataGenerator.randomRow(new scala.util.Random(3), schema)
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
val dataLiteral = Literal.create(converter(data), schema)

CatalystDataToProtobuf(dataLiteral, messageName, Some(testFileDesc))
}
}