-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-46679][SQL]: Handling of generic parameter with bounds while creating encoders #48158
Conversation
…e/SPARK-46679 CDPD-58844. Upgrade janino to 3.1.10 Change-Id: I8744bb020e5fedcc0e9e4bc08c556c98a80406ba Sync workflow files | Triggered by Kitchen/RE-github-workflows Sync workflow files | Triggered by Kitchen/RE-github-workflows CDPD-58844. Upgrade janino to 3.1.10 Change-Id: I8744bb020e5fedcc0e9e4bc08c556c98a80406ba Sync workflow files | Triggered by Kitchen/RE-github-workflows Sync workflow files | Triggered by Kitchen/RE-github-workflows
…ng exception in creating encoder
…anges. added tests. cleanup of tests
…anges. added tests. cleanup of tests
03eff84
to
2100681
Compare
typeVariables: Map[TypeVariable[_], Type] = Map.empty, | ||
forGenericBound: Boolean = false): AgnosticEncoder[_] = t match { | ||
|
||
case c: Class[_] if !forGenericBound && c == java.lang.Boolean.TYPE => PrimitiveBooleanEncoder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For readability purposes I would create a branch at the beginning where you handle case tv: TypeVariable[_] if forGenericBound =>
This way the rest of the code is less impacted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I will do that. was thinking how to do that.. . This neat idea did not struck to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have sort of done it now..
// should not consider class as bean, as otherwise it will be treated as empty schema | ||
// and loose the data on deser. | ||
if (properties.isEmpty && seenTypeSet.nonEmpty) { | ||
if (classOf[KryoSerializable].isAssignableFrom(c)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be an issue for Connect. While the API supports Kryo, Connect can't support Kryo in its current form. Either we have detect whether we are in connect mode, or we have to just fall back to java serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. I was not aware of that.. will write test for it and check.
will be checking in a code with some refactoring..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Added check for client connect while creating encoders, to not use kryo based encoder. Using threadlocal to detect the same, instead of api change.
} | ||
|
||
private def getAllSuperClasses(typee: Type): Array[Class[_]] = Option(typee) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sort of begs for a queue and a loop. I am reasonable sure that is more readable...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will flatten the recursion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Done. Kindly check.
def getJavaBeanReadableProperties(beanClass: Class[_]): Array[PropertyDescriptor] = { | ||
val beanInfo = Introspector.getBeanInfo(beanClass) | ||
beanInfo.getPropertyDescriptors | ||
.filterNot(_.getName == "class") | ||
.filterNot(_.getName == "declaringClass") | ||
.filter(_.getReadMethod != null) | ||
} | ||
|
||
object UseSerializationEncoder { | ||
def unapply(th: Throwable): Option[Class[_] => AgnosticEncoder[_]] = th match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the actual superclasses here instead of going through the error message? I'd also prefer if you unify this with the other java serialization code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unifying it will other java serialziation code, is something which I intend to do in my refactoring..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had thought about using the Serializable encoders as part of the match/case statement, but it appears tricky as the idea is that the Serializable encoders are to be used as last resort. And the way current logic works ( if I am not wrong), it collects all the interfaces/classes via the CommonUtils method, and it is possible , I think, that Serializable interface may show early as the introspection data is converted to a map in the Bean class
val parentClassesTypeMap =
JavaTypeUtils.getTypeArguments(c, classOf[Object]).asScala.toMap
I am hesitant at this point to further complicate the code, unless you all think that its worthwhile to do it in this PR.
…ests for edge cases. Flattened the recursive call
…ests for edge cases. Flattened the recursive call
…lient Connect, as per review feedback
What changes were proposed in this pull request?
If a bean has generic types with bounds ( eg T <: SomeClass>) , as getters/setters, then depending upon the nature of the bounds, if it is java Serializable, KryoSerializable or a UDT Type, then appropriate encoder is created. If the bound is of first two types, then the data is represented as a BinaryType, while if the bound is of UDT type then schema / behaviour follows UDT Type.
Following things are considered while fixing the issue:
Following cases are considered which are sort of boundary cases:
`
Bean[T <: UDTClass] {
@BeanProperty var udt: T = _
}
Then the UDTEncoder will be created for the field
But if the Bean is of type
Bean[T <: UDTDerivedClass] {
@BeanProperty var udt: T = _
}
where UDTDerivedClass <: UDTClass
Then a JavaSerializable encoder will be created , even though the class hierarchy of UDTDerivedClass contains UDTClass. The reason being that concrete instance created by UDTType would be of UDTClass which is not assignable to
UDTDerivedClass
`
similarly for non generic bean class having UDTDerivedClass as bean property will also use Java Serialization encoder. ( added test for the same). The reason for JavaSerializationEncoder is same as that for Generic one.
Why are the changes needed?
To fix the regression in spark 3.3 onwards, where the bean having a generic type as return value, throws EncoderNotFoundException.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added bug tests..
Was this patch authored or co-authored using generative AI tooling?
No