-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-49489][SQL][HIVE] HMS client respects hive.thrift.client.maxmessage.size
#50022
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@shrprasa @wangyum @Madhukar525722 I just made it to work, would be great if you could have a test with your internal cases, I will polish the code according to the GHA result and your feedback |
Thanks @pan3793. We will test and update |
case proxy if JdkProxy.isProxyClass(proxy.getClass) => | ||
JdkProxy.getInvocationHandler(proxy) match { | ||
case syncHandler if syncHandler.getClass.getName.endsWith("SynchronizedHandler") => | ||
val realMscField = SparkClassUtils.classForName( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can directly use syncHandler.getClass().getDeclaredField("client")
here, and there should be no practical difference in effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, perhaps we can try to refactor a bit here, maybe like
JdkProxy.getInvocationHandler(proxy) match {
case syncHandler if syncHandler.getClass.getName.endsWith("SynchronizedHandler") =>
val realMsc = getFieldValue(syncHandler, "client").asInstanceOf[IMetaStoreClient]
configureMaxThriftMessageSize(hiveConf, realMsc)
case retryHandler: RetryingMetaStoreClient =>
val realMsc = getFieldValue(retryHandler, "base").asInstanceOf[IMetaStoreClient]
configureMaxThriftMessageSize(hiveConf, realMsc)
case _ => // do nothing
}
private def getFieldValue(obj: AnyRef, fieldName: String): AnyRef = {
val field = obj.getClass.getDeclaredField(fieldName)
field.setAccessible(true)
field.get(obj)
}
hmm... Perhaps the exception type caught in the catch block needs to be changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@LuciferYang thanks for the review, I simplify the reflection call by following your suggestions.
For concerns about the reflection code itself, I tune the code to make the added code only takes effect when the user configures hive.thrift.client.max.message.size
explicitly, in case users compile the Spark with their own modified Hive version.
@tailrec | ||
private def isClientException(e: Throwable): Boolean = e match { | ||
case re: RuntimeException if re.getCause != null => | ||
isClientException(re.getCause) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To cover RuntimeException
throw by Hive.getMSC
Cause: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1742)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:83)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:133)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3607)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3659)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3639)
at org.apache.spark.sql.hive.client.HiveClientImpl$.getHive(HiveClientImpl.scala:1420)
at org.apache.spark.sql.hive.client.HiveClientImpl.client(HiveClientImpl.scala:269)
at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:236)
at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:235)
at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:285)
at org.apache.spark.sql.hive.client.HiveClientImpl.databaseExists(HiveClientImpl.scala:420)
at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:192)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.scala:17)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:100)
at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:192)
...
@shrprasa Does it work after applying the patch? |
HI @pan3793 , while testing we are facing a warning
We have already defined the conf in hive-site.xml So, the error message for table persists. |
@Madhukar525722 so it works? it's a warning message, not an error, and it seems reasonable to me.
BTW, use three backticks to quote the code blocks |
It didnt worked @pan3793 , I am suspecting that the config setup didnt happend. Thats why it resulted in the same old behaviour. Apart from that other error logs are still same |
case t: TEndpointTransport => | ||
val currentMaxMessageSize = t.getConfiguration.getMaxMessageSize | ||
if (currentMaxMessageSize != maxMessageSize) { | ||
logDebug("Change the current metastore client thrift max message size from " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Madhukar525722 could you please try the latest patch, and monitor if this log is printed?
Option(hiveConf.get("hive.thrift.client.max.message.size")) | ||
.map(HiveConf.toSizeBytes(_).toInt).filter(_ > 0) | ||
.foreach { maxMessageSize => | ||
logDebug(s"Trying to set metastore client thrift max message to $maxMessageSize") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Madhukar525722 also this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pan3793 , I have build using latest patch only.
And when I enabled the debug log and try to run, I can see
25/02/24 12:11:30 DEBUG HiveClientImpl: Trying to set metastore client thrift max message to 1073741824
But the logs related to Change the current metastore client thrift max message size from, is not there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Madhukar525722 you may need to add some additional logs to debug what happened.
HI @pan3793 . The flow was able to reach the case msc:
and found corresponding value to be Does it require, more unwrapping before, getting the exact thrift TEndpointTransport As when I tried it without unwrapping
Gives error- Wanted to understand as well, why this might be coming null - org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.getConfiguration() |
@tailrec | ||
def configure(t: TTransport): Unit = t match { | ||
// Unwrap and access the underlying TTransport when security enabled (Kerberos) | ||
case tTransport: TFilterTransport => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Madhukar525722 kerberos cases are addressed here
@Madhukar525722 thanks for testing, I updated the code and verified on both kerberized and simple hadoop clusters, and I confirm those two DEBUG logs are printed.
|
Thanks @pan3793 for the implementation. I validated my queries, it went through |
@Madhukar525722 Thank you for testing and confirming! cc @LuciferYang @wangyum it's ready for the next round of review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM
@wangyum Does it work after applying the patch? This may block the release of version 4.0. |
It seems not work. Hive metastore version is 3.1. |
@wangyum thank you for testing, the change should only affect client side, it does not matter which version of HMS you used. you can enable the debug log to see if these logs are printed
|
It works after adding one config: |
…essage.size` ### What changes were proposed in this pull request? Partly port HIVE-26633 for Spark HMS client - respect `hive.thrift.client.max.message.size` if present and the value is positive. > Thrift client configuration for max message size. 0 or -1 will use the default defined in the Thrift library. The upper limit is 2147483648 bytes (or 2gb). Note: it's a Hive configuration, I follow the convention to not document on the Spark side. ### Why are the changes needed? 1. THRIFT-5237 (0.14.0) changes the max thrift message size from 2GiB to 100MiB 2. HIVE-25098 (4.0.0) upgrades Thrift from 0.13.0 to 0.14.1 3. HIVE-25996 (2.3.10) backports HIVE-25098 to branch-2.3 4. HIVE-26633 (4.0.0) introduces `hive.thrift.client.max.message.size` 5. SPARK-47018 (4.0.0) upgrades Hive from 2.3.9 to 2.3.10 Thus, Spark's HMS client does not respect `hive.thrift.client.max.message.size` and has a fixed max thrift message size 100MiB, users may hit the "MaxMessageSize reached" exception on accessing Hive tables with a large number of partitions. See discussion in #46468 (comment) ### Does this PR introduce _any_ user-facing change? No, it tackles the regression introduced by an unreleased change, namely SPARK-47018. The added code only takes effect when the user configures `hive.thrift.client.max.message.size` explicitly. ### How was this patch tested? This must be tested manually, as the current Spark UT does not cover the remote HMS cases. I constructed a test case in a testing Hadoop cluster with a remote HMS. Firstly, create a table with a large number of partitions. ``` $ spark-sql --num-executors=6 --executor-cores=4 --executor-memory=1g \ --conf spark.hive.exec.dynamic.partition.mode=nonstrict \ --conf spark.hive.exec.max.dynamic.partitions=1000000 spark-sql (default)> CREATE TABLE p PARTITIONED BY (year, month, day) STORED AS PARQUET AS SELECT /*+ REPARTITION(200) */ * FROM ( (SELECT CAST(id AS STRING) AS year FROM range(2000, 2100)) JOIN (SELECT CAST(id AS STRING) AS month FROM range(1, 13)) JOIN (SELECT CAST(id AS STRING) AS day FROM range(1, 31)) JOIN (SELECT 'this is some data' AS data) ); ``` Then try to tune `hive.thrift.client.max.message.size` and run a query that would trigger `getPartitions` thrift call. For example, when set to `1kb`, it throws `TTransportException: MaxMessageSize reached`, and the exception disappears after boosting the value. ``` $ spark-sql --conf spark.hive.thrift.client.max.message.size=1kb spark-sql (default)> SHOW PARTITIONS p; ... 2025-02-20 15:18:49 WARN RetryingMetaStoreClient: MetaStoreClient lost connection. Attempting to reconnect (1 of 1) after 1s. listPartitionNames org.apache.thrift.transport.TTransportException: MaxMessageSize reached at org.apache.thrift.transport.TEndpointTransport.checkReadBytesAvailable(TEndpointTransport.java:81) ~[libthrift-0.16.0.jar:0.16.0] at org.apache.thrift.protocol.TProtocol.checkReadBytesAvailable(TProtocol.java:67) ~[libthrift-0.16.0.jar:0.16.0] at org.apache.thrift.protocol.TBinaryProtocol.readListBegin(TBinaryProtocol.java:297) ~[libthrift-0.16.0.jar:0.16.0] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_names_result$get_partition_names_resultStandardScheme.read(ThriftHiveMetastore.java) ~[hive-metastore-2.3.10.jar:2.3.10] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_names_result$get_partition_names_resultStandardScheme.read(ThriftHiveMetastore.java) ~[hive-metastore-2.3.10.jar:2.3.10] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_names_result.read(ThriftHiveMetastore.java) ~[hive-metastore-2.3.10.jar:2.3.10] at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:88) ~[libthrift-0.16.0.jar:0.16.0] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition_names(ThriftHiveMetastore.java:2458) ~[hive-metastore-2.3.10.jar:2.3.10] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition_names(ThriftHiveMetastore.java:2443) ~[hive-metastore-2.3.10.jar:2.3.10] at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionNames(HiveMetaStoreClient.java:1487) ~[hive-metastore-2.3.10.jar:2.3.10] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[?:?] at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:173) ~[hive-metastore-2.3.10.jar:2.3.10] at jdk.proxy2/jdk.proxy2.$Proxy54.listPartitionNames(Unknown Source) ~[?:?] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[?:?] at org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2349) ~[hive-metastore-2.3.10.jar:2.3.10] at jdk.proxy2/jdk.proxy2.$Proxy54.listPartitionNames(Unknown Source) ~[?:?] at org.apache.hadoop.hive.ql.metadata.Hive.getPartitionNames(Hive.java:2461) ~[hive-exec-2.3.10-core.jar:2.3.10] at org.apache.spark.sql.hive.client.Shim_v2_0.getPartitionNames(HiveShim.scala:976) ~[spark-hive_2.13-4.1.0-SNAPSHOT.jar:4.1.0-SNAPSHOT] ... ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50022 from pan3793/SPARK-49489. Authored-by: Cheng Pan <[email protected]> Signed-off-by: yangjie01 <[email protected]> (cherry picked from commit 2ea5621) Signed-off-by: yangjie01 <[email protected]>
Merged into master and branch-4.0. Thanks @pan3793 @wangyum and @Madhukar525722 |
…essage.size` ### What changes were proposed in this pull request? Partly port HIVE-26633 for Spark HMS client - respect `hive.thrift.client.max.message.size` if present and the value is positive. > Thrift client configuration for max message size. 0 or -1 will use the default defined in the Thrift library. The upper limit is 2147483648 bytes (or 2gb). Note: it's a Hive configuration, I follow the convention to not document on the Spark side. ### Why are the changes needed? 1. THRIFT-5237 (0.14.0) changes the max thrift message size from 2GiB to 100MiB 2. HIVE-25098 (4.0.0) upgrades Thrift from 0.13.0 to 0.14.1 3. HIVE-25996 (2.3.10) backports HIVE-25098 to branch-2.3 4. HIVE-26633 (4.0.0) introduces `hive.thrift.client.max.message.size` 5. SPARK-47018 (4.0.0) upgrades Hive from 2.3.9 to 2.3.10 Thus, Spark's HMS client does not respect `hive.thrift.client.max.message.size` and has a fixed max thrift message size 100MiB, users may hit the "MaxMessageSize reached" exception on accessing Hive tables with a large number of partitions. See discussion in apache#46468 (comment) ### Does this PR introduce _any_ user-facing change? No, it tackles the regression introduced by an unreleased change, namely SPARK-47018. The added code only takes effect when the user configures `hive.thrift.client.max.message.size` explicitly. ### How was this patch tested? This must be tested manually, as the current Spark UT does not cover the remote HMS cases. I constructed a test case in a testing Hadoop cluster with a remote HMS. Firstly, create a table with a large number of partitions. ``` $ spark-sql --num-executors=6 --executor-cores=4 --executor-memory=1g \ --conf spark.hive.exec.dynamic.partition.mode=nonstrict \ --conf spark.hive.exec.max.dynamic.partitions=1000000 spark-sql (default)> CREATE TABLE p PARTITIONED BY (year, month, day) STORED AS PARQUET AS SELECT /*+ REPARTITION(200) */ * FROM ( (SELECT CAST(id AS STRING) AS year FROM range(2000, 2100)) JOIN (SELECT CAST(id AS STRING) AS month FROM range(1, 13)) JOIN (SELECT CAST(id AS STRING) AS day FROM range(1, 31)) JOIN (SELECT 'this is some data' AS data) ); ``` Then try to tune `hive.thrift.client.max.message.size` and run a query that would trigger `getPartitions` thrift call. For example, when set to `1kb`, it throws `TTransportException: MaxMessageSize reached`, and the exception disappears after boosting the value. ``` $ spark-sql --conf spark.hive.thrift.client.max.message.size=1kb spark-sql (default)> SHOW PARTITIONS p; ... 2025-02-20 15:18:49 WARN RetryingMetaStoreClient: MetaStoreClient lost connection. Attempting to reconnect (1 of 1) after 1s. listPartitionNames org.apache.thrift.transport.TTransportException: MaxMessageSize reached at org.apache.thrift.transport.TEndpointTransport.checkReadBytesAvailable(TEndpointTransport.java:81) ~[libthrift-0.16.0.jar:0.16.0] at org.apache.thrift.protocol.TProtocol.checkReadBytesAvailable(TProtocol.java:67) ~[libthrift-0.16.0.jar:0.16.0] at org.apache.thrift.protocol.TBinaryProtocol.readListBegin(TBinaryProtocol.java:297) ~[libthrift-0.16.0.jar:0.16.0] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_names_result$get_partition_names_resultStandardScheme.read(ThriftHiveMetastore.java) ~[hive-metastore-2.3.10.jar:2.3.10] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_names_result$get_partition_names_resultStandardScheme.read(ThriftHiveMetastore.java) ~[hive-metastore-2.3.10.jar:2.3.10] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_names_result.read(ThriftHiveMetastore.java) ~[hive-metastore-2.3.10.jar:2.3.10] at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:88) ~[libthrift-0.16.0.jar:0.16.0] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition_names(ThriftHiveMetastore.java:2458) ~[hive-metastore-2.3.10.jar:2.3.10] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition_names(ThriftHiveMetastore.java:2443) ~[hive-metastore-2.3.10.jar:2.3.10] at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionNames(HiveMetaStoreClient.java:1487) ~[hive-metastore-2.3.10.jar:2.3.10] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[?:?] at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:173) ~[hive-metastore-2.3.10.jar:2.3.10] at jdk.proxy2/jdk.proxy2.$Proxy54.listPartitionNames(Unknown Source) ~[?:?] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[?:?] at org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2349) ~[hive-metastore-2.3.10.jar:2.3.10] at jdk.proxy2/jdk.proxy2.$Proxy54.listPartitionNames(Unknown Source) ~[?:?] at org.apache.hadoop.hive.ql.metadata.Hive.getPartitionNames(Hive.java:2461) ~[hive-exec-2.3.10-core.jar:2.3.10] at org.apache.spark.sql.hive.client.Shim_v2_0.getPartitionNames(HiveShim.scala:976) ~[spark-hive_2.13-4.1.0-SNAPSHOT.jar:4.1.0-SNAPSHOT] ... ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#50022 from pan3793/SPARK-49489. Authored-by: Cheng Pan <[email protected]> Signed-off-by: yangjie01 <[email protected]>
What changes were proposed in this pull request?
Partly port HIVE-26633 for Spark HMS client - respect
hive.thrift.client.max.message.size
if present and the value is positive.Note: it's a Hive configuration, I follow the convention to not document on the Spark side.
Why are the changes needed?
hive.thrift.client.max.message.size
Thus, Spark's HMS client does not respect
hive.thrift.client.max.message.size
and has a fixed max thrift message size 100MiB, users may hit the "MaxMessageSize reached" exception on accessing Hive tables with a large number of partitions.See discussion in #46468 (comment)
Does this PR introduce any user-facing change?
No, it tackles the regression introduced by an unreleased change, namely SPARK-47018. The added code only takes effect when the user configures
hive.thrift.client.max.message.size
explicitly.How was this patch tested?
This must be tested manually, as the current Spark UT does not cover the remote HMS cases.
I constructed a test case in a testing Hadoop cluster with a remote HMS.
Firstly, create a table with a large number of partitions.
Then try to tune
hive.thrift.client.max.message.size
and run a query that would triggergetPartitions
thrift call. For example, when set to1kb
, it throwsTTransportException: MaxMessageSize reached
, and the exception disappears after boosting the value.Was this patch authored or co-authored using generative AI tooling?
No.