diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala index cb85993e5e099..da44aa642dae5 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala @@ -34,6 +34,7 @@ class HiveMetastoreLazyInitializationSuite extends SparkFunSuite { .master("local[2]") .enableHiveSupport() .config("spark.hadoop.hive.metastore.uris", "thrift://127.0.0.1:11111") + .config("spark.hadoop.hive.thrift.client.max.message.size", "1gb") .getOrCreate() val originalLevel = LogManager.getRootLogger.asInstanceOf[Logger].getLevel val originalClassLoader = Thread.currentThread().getContextClassLoader diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index ca8b5369a9cbb..72fea06936a10 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -22,6 +22,7 @@ import java.lang.reflect.InvocationTargetException import java.util import java.util.Locale +import scala.annotation.tailrec import scala.collection.mutable import scala.util.control.NonFatal @@ -81,14 +82,18 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat * Due to classloader isolation issues, pattern matching won't work here so we need * to compare the canonical names of the exceptions, which we assume to be stable. */ - private def isClientException(e: Throwable): Boolean = { - var temp: Class[_] = e.getClass - var found = false - while (temp != null && !found) { - found = clientExceptions.contains(temp.getCanonicalName) - temp = temp.getSuperclass - } - found + @tailrec + private def isClientException(e: Throwable): Boolean = e match { + case re: RuntimeException if re.getCause != null => + isClientException(re.getCause) + case e => + var temp: Class[_] = e.getClass + var found = false + while (temp != null && !found) { + found = clientExceptions.contains(temp.getCanonicalName) + temp = temp.getSuperclass + } + found } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 3e7e81d25d943..90f8a3a85d70c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.hive.client import java.io.{OutputStream, PrintStream} import java.lang.{Iterable => JIterable} -import java.lang.reflect.InvocationTargetException +import java.lang.reflect.{InvocationTargetException, Proxy => JdkProxy} import java.nio.charset.StandardCharsets.UTF_8 import java.util.{HashMap => JHashMap, Locale, Map => JMap} import java.util.concurrent.TimeUnit._ +import scala.annotation.tailrec import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ @@ -33,7 +34,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.{IMetaStoreClient, TableType => HiveTableType} +import org.apache.hadoop.hive.metastore.{HiveMetaStoreClient, IMetaStoreClient, RetryingMetaStoreClient, TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, Table => MetaStoreApiTable, _} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition => HivePartition, Table => HiveTable} @@ -43,7 +44,9 @@ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe +import org.apache.hadoop.hive.thrift.TFilterTransport import org.apache.hadoop.security.UserGroupInformation +import org.apache.thrift.transport.{TEndpointTransport, TTransport} import org.apache.spark.{SparkConf, SparkException, SparkThrowable} import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK @@ -1407,13 +1410,83 @@ private[hive] object HiveClientImpl extends Logging { case _ => new HiveConf(conf, classOf[HiveConf]) } - try { + val hive = try { Hive.getWithoutRegisterFns(hiveConf) } catch { // SPARK-37069: not all Hive versions have the above method (e.g., Hive 2.3.9 has it but - // 2.3.8 don't), therefore here we fallback when encountering the exception. + // 2.3.8 doesn't), therefore here we fallback when encountering the exception. case _: NoSuchMethodError => Hive.get(hiveConf) } + + // Follow behavior of HIVE-26633 (4.0.0), only apply the max message size when + // `hive.thrift.client.max.message.size` is set and the value is positive + Option(hiveConf.get("hive.thrift.client.max.message.size")) + .map(HiveConf.toSizeBytes(_).toInt).filter(_ > 0) + .foreach { maxMessageSize => + logDebug(s"Trying to set metastore client thrift max message to $maxMessageSize") + configureMaxThriftMessageSize(hiveConf, hive.getMSC, maxMessageSize) + } + + hive + } + + private def getFieldValue[T](obj: Any, fieldName: String): T = { + val field = obj.getClass.getDeclaredField(fieldName) + field.setAccessible(true) + field.get(obj).asInstanceOf[T] + } + + private def getFieldValue[T](obj: Any, clazz: Class[_], fieldName: String): T = { + val field = clazz.getDeclaredField(fieldName) + field.setAccessible(true) + field.get(obj).asInstanceOf[T] + } + + // SPARK-49489: a surgery for Hive 2.3.10 due to lack of HIVE-26633 + private def configureMaxThriftMessageSize( + hiveConf: HiveConf, msClient: IMetaStoreClient, maxMessageSize: Int): Unit = try { + msClient match { + // Hive uses Java Dynamic Proxy to enhance the MetaStoreClient to support synchronization + // and retrying, we should unwrap and access the underlying MetaStoreClient instance firstly + case proxy if JdkProxy.isProxyClass(proxy.getClass) => + JdkProxy.getInvocationHandler(proxy) match { + case syncHandler if syncHandler.getClass.getName.endsWith("SynchronizedHandler") => + val wrappedMsc = getFieldValue[IMetaStoreClient](syncHandler, "client") + configureMaxThriftMessageSize(hiveConf, wrappedMsc, maxMessageSize) + case retryHandler: RetryingMetaStoreClient => + val wrappedMsc = getFieldValue[IMetaStoreClient](retryHandler, "base") + configureMaxThriftMessageSize(hiveConf, wrappedMsc, maxMessageSize) + case _ => + } + case msc: HiveMetaStoreClient if !msc.isLocalMetaStore => + @tailrec + def configure(t: TTransport): Unit = t match { + // Unwrap and access the underlying TTransport when security enabled (Kerberos) + case tTransport: TFilterTransport => + val wrappedTTransport = getFieldValue[TTransport]( + tTransport, classOf[TFilterTransport], "wrapped") + configure(wrappedTTransport) + case tTransport: TEndpointTransport => + val tConf = tTransport.getConfiguration + val currentMaxMessageSize = tConf.getMaxMessageSize + if (currentMaxMessageSize != maxMessageSize) { + logDebug("Change the current metastore client thrift max message size from " + + s"$currentMaxMessageSize to $maxMessageSize") + tConf.setMaxMessageSize(maxMessageSize) + // This internally call TEndpointTransport#resetConsumedMessageSize(-1L) to + // apply the updated maxMessageSize + tTransport.updateKnownMessageSize(0L) + } + case _ => + } + configure(msc.getTTransport) + case _ => // do nothing + } + } catch { + // TEndpointTransport is added in THRIFT-5237 (0.14.0), for Hive versions that use older + // Thrift library (e.g. Hive 2.3.9 uses Thrift 0.9.3), which aren't affected by THRIFT-5237 + // and don't need to apply HIVE-26633 + case _: NoClassDefFoundError => // do nothing } }