Skip to content

[SPARK-49489][SQL][HIVE] HMS client respects hive.thrift.client.maxmessage.size #50022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class HiveMetastoreLazyInitializationSuite extends SparkFunSuite {
.master("local[2]")
.enableHiveSupport()
.config("spark.hadoop.hive.metastore.uris", "thrift://127.0.0.1:11111")
.config("spark.hadoop.hive.thrift.client.max.message.size", "1gb")
.getOrCreate()
val originalLevel = LogManager.getRootLogger.asInstanceOf[Logger].getLevel
val originalClassLoader = Thread.currentThread().getContextClassLoader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.lang.reflect.InvocationTargetException
import java.util
import java.util.Locale

import scala.annotation.tailrec
import scala.collection.mutable
import scala.util.control.NonFatal

Expand Down Expand Up @@ -81,14 +82,18 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* Due to classloader isolation issues, pattern matching won't work here so we need
* to compare the canonical names of the exceptions, which we assume to be stable.
*/
private def isClientException(e: Throwable): Boolean = {
var temp: Class[_] = e.getClass
var found = false
while (temp != null && !found) {
found = clientExceptions.contains(temp.getCanonicalName)
temp = temp.getSuperclass
}
found
@tailrec
private def isClientException(e: Throwable): Boolean = e match {
case re: RuntimeException if re.getCause != null =>
isClientException(re.getCause)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To cover RuntimeException throw by Hive.getMSC

Cause: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
   at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1742)
   at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:83)
   at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:133)
   at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
   at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3607)
   at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3659)
   at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3639)
   at org.apache.spark.sql.hive.client.HiveClientImpl$.getHive(HiveClientImpl.scala:1420)
   at org.apache.spark.sql.hive.client.HiveClientImpl.client(HiveClientImpl.scala:269)
   at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
   at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:236)
   at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:235)
   at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:285)
   at org.apache.spark.sql.hive.client.HiveClientImpl.databaseExists(HiveClientImpl.scala:420)
   at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:192)
   at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.scala:17)
   at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:100)
   at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:192)
   ...

case e =>
var temp: Class[_] = e.getClass
var found = false
while (temp != null && !found) {
found = clientExceptions.contains(temp.getCanonicalName)
temp = temp.getSuperclass
}
found
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package org.apache.spark.sql.hive.client

import java.io.{OutputStream, PrintStream}
import java.lang.{Iterable => JIterable}
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.{InvocationTargetException, Proxy => JdkProxy}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.{HashMap => JHashMap, Locale, Map => JMap}
import java.util.concurrent.TimeUnit._

import scala.annotation.tailrec
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
Expand All @@ -33,7 +34,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{IMetaStoreClient, TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.{HiveMetaStoreClient, IMetaStoreClient, RetryingMetaStoreClient, TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, Table => MetaStoreApiTable, _}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition => HivePartition, Table => HiveTable}
Expand All @@ -43,7 +44,9 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.hive.thrift.TFilterTransport
import org.apache.hadoop.security.UserGroupInformation
import org.apache.thrift.transport.{TEndpointTransport, TTransport}

import org.apache.spark.{SparkConf, SparkException, SparkThrowable}
import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK
Expand Down Expand Up @@ -1407,13 +1410,83 @@ private[hive] object HiveClientImpl extends Logging {
case _ =>
new HiveConf(conf, classOf[HiveConf])
}
try {
val hive = try {
Hive.getWithoutRegisterFns(hiveConf)
} catch {
// SPARK-37069: not all Hive versions have the above method (e.g., Hive 2.3.9 has it but
// 2.3.8 don't), therefore here we fallback when encountering the exception.
// 2.3.8 doesn't), therefore here we fallback when encountering the exception.
case _: NoSuchMethodError =>
Hive.get(hiveConf)
}

// Follow behavior of HIVE-26633 (4.0.0), only apply the max message size when
// `hive.thrift.client.max.message.size` is set and the value is positive
Option(hiveConf.get("hive.thrift.client.max.message.size"))
.map(HiveConf.toSizeBytes(_).toInt).filter(_ > 0)
.foreach { maxMessageSize =>
logDebug(s"Trying to set metastore client thrift max message to $maxMessageSize")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Madhukar525722 also this one

Copy link

@Madhukar525722 Madhukar525722 Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pan3793 , I have build using latest patch only.
And when I enabled the debug log and try to run, I can see
25/02/24 12:11:30 DEBUG HiveClientImpl: Trying to set metastore client thrift max message to 1073741824
But the logs related to Change the current metastore client thrift max message size from, is not there

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Madhukar525722 you may need to add some additional logs to debug what happened.

configureMaxThriftMessageSize(hiveConf, hive.getMSC, maxMessageSize)
}

hive
}

private def getFieldValue[T](obj: Any, fieldName: String): T = {
val field = obj.getClass.getDeclaredField(fieldName)
field.setAccessible(true)
field.get(obj).asInstanceOf[T]
}

private def getFieldValue[T](obj: Any, clazz: Class[_], fieldName: String): T = {
val field = clazz.getDeclaredField(fieldName)
field.setAccessible(true)
field.get(obj).asInstanceOf[T]
}

// SPARK-49489: a surgery for Hive 2.3.10 due to lack of HIVE-26633
private def configureMaxThriftMessageSize(
hiveConf: HiveConf, msClient: IMetaStoreClient, maxMessageSize: Int): Unit = try {
msClient match {
// Hive uses Java Dynamic Proxy to enhance the MetaStoreClient to support synchronization
// and retrying, we should unwrap and access the underlying MetaStoreClient instance firstly
case proxy if JdkProxy.isProxyClass(proxy.getClass) =>
JdkProxy.getInvocationHandler(proxy) match {
case syncHandler if syncHandler.getClass.getName.endsWith("SynchronizedHandler") =>
val wrappedMsc = getFieldValue[IMetaStoreClient](syncHandler, "client")
configureMaxThriftMessageSize(hiveConf, wrappedMsc, maxMessageSize)
case retryHandler: RetryingMetaStoreClient =>
val wrappedMsc = getFieldValue[IMetaStoreClient](retryHandler, "base")
configureMaxThriftMessageSize(hiveConf, wrappedMsc, maxMessageSize)
case _ =>
}
case msc: HiveMetaStoreClient if !msc.isLocalMetaStore =>
@tailrec
def configure(t: TTransport): Unit = t match {
// Unwrap and access the underlying TTransport when security enabled (Kerberos)
case tTransport: TFilterTransport =>
Copy link
Member Author

@pan3793 pan3793 Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Madhukar525722 kerberos cases are addressed here

val wrappedTTransport = getFieldValue[TTransport](
tTransport, classOf[TFilterTransport], "wrapped")
configure(wrappedTTransport)
case tTransport: TEndpointTransport =>
val tConf = tTransport.getConfiguration
val currentMaxMessageSize = tConf.getMaxMessageSize
if (currentMaxMessageSize != maxMessageSize) {
logDebug("Change the current metastore client thrift max message size from " +
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Madhukar525722 could you please try the latest patch, and monitor if this log is printed?

s"$currentMaxMessageSize to $maxMessageSize")
tConf.setMaxMessageSize(maxMessageSize)
// This internally call TEndpointTransport#resetConsumedMessageSize(-1L) to
// apply the updated maxMessageSize
tTransport.updateKnownMessageSize(0L)
}
case _ =>
}
configure(msc.getTTransport)
case _ => // do nothing
}
} catch {
// TEndpointTransport is added in THRIFT-5237 (0.14.0), for Hive versions that use older
// Thrift library (e.g. Hive 2.3.9 uses Thrift 0.9.3), which aren't affected by THRIFT-5237
// and don't need to apply HIVE-26633
case _: NoClassDefFoundError => // do nothing
}
}