Skip to content

Commit

Permalink
[SPARK-46575][SQL][HIVE] Make HiveThriftServer2.startWithContext Deve…
Browse files Browse the repository at this point in the history
…lopApi retriable and fix flakiness of ThriftServerWithSparkContextInHttpSuite

### What changes were proposed in this pull request?

This PR adds an new param to HiveThriftServer2.startWithContext` to tell the `ThriftCLIService`s whether to call `System exit` or not when encountering errors. When developers call `HiveThriftServer2.startWithContext` and if an error occurs, `System exit` will be performed, stop the existing `SqlContext/SparkContext`, and crash the user app.

There is also such a use case in our tests. We intended to retry starting a thrift server three times in total but it might stop the underlying SparkContext early and fail the rest.

For example
https://github.com/apache/spark/actions/runs/7271496487/job/19812142981

```java
06:21:12.854 ERROR org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInHttpSuite: Error start hive server with Context
org.scalatest.exceptions.TestFailedException: SharedThriftServer.this.tempScratchDir.exists() was true
	at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
	at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
	at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
	at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.startThriftServer(SharedThriftServer.scala:151)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.$anonfun$beforeAll$1(SharedThriftServer.scala:59)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
06:21:12.854 ERROR org.apache.hive.service.cli.thrift.ThriftCLIService: Error starting HiveServer2: could not start ThriftBinaryCLIService
java.lang.NullPointerException: Cannot invoke "org.apache.thrift.server.TServer.serve()" because "this.server" is null
	at org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:135)
	at java.base/java.lang.Thread.run(Thread.java:840)
06:21:12.941 ERROR org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInHttpSuite: Error start hive server with Context
java.lang.IllegalStateException: LiveListenerBus is stopped.
	at org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:92)
	at org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:75)
	at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.createListenerAndUI(HiveThriftServer2.scala:74)
	at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.startWithContext(HiveThriftServer2.scala:66)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.startThriftServer(SharedThriftServer.scala:141)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.$anonfun$beforeAll$4(SharedThriftServer.scala:60)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
06:21:12.958 WARN org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInHttpSuite:

[info] org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInHttpSuite *** ABORTED *** (151 milliseconds)
[info]   java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
[info] This stopped SparkContext was created at:
[info]
[info] org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInHttpSuite.beforeAll(ThriftServerWithSparkContextSuite.scala:279)
[info] org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
[info] org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
[info] org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info] java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info] java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] java.base/java.lang.Thread.run(Thread.java:840)
[info]
[info] The currently active SparkContext was created at:
[info]
[info] (No active SparkContext.)
[info]   at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:122)
[info]   at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:115)
[info]   at org.apache.spark.sql.SparkSession.newSession(SparkSession.scala:274)
[info]   at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.startThriftServer(SharedThriftServer.scala:130)
```

### Why are the changes needed?

- Improve the programmability of `HiveThriftServer2.startWithContext`
- Fix flakiness in tests

### Does this PR introduce _any_ user-facing change?

no, developer API change and the default behavior is AS-IS.
### How was this patch tested?

Verified ThriftServerWithSparkContextInHttpSuite locally
```
18:20:02.840 ERROR org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInHttpSuite: A previous Hive's SessionState is leaked, aborting this retry
18:20:02.840 ERROR org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInHttpSuite: Error start hive server with Context
java.lang.IllegalStateException: HiveThriftServer2 started in binary mode while the test case is expecting HTTP mode
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.$anonfun$startThriftServer$2(SharedThriftServer.scala:149)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.$anonfun$startThriftServer$2$adapted(SharedThriftServer.scala:144)
	at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
	at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.startThriftServer(SharedThriftServer.scala:144)
	at org.apache.spark.sql.hive.thriftserver.SharedThriftServer.$anonfun$beforeAll$1(SharedThriftServer.scala:60)
18:20:04.114 WARN org.apache.hadoop.hive.metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
18:20:04.114 WARN org.apache.hadoop.hive.metastore.ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore hzyaoqin127.0.0.1
18:20:04.119 WARN org.apache.hadoop.hive.metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException
[info] - the scratch dir will not be exist (1 millisecond)
[info] - SPARK-29911: Uncache cached tables when session closed (376 milliseconds)
```
### Was this patch authored or co-authored using generative AI tooling?

no

Closes #44575 from yaooqinn/SPARK-46575.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
yaooqinn committed Jan 11, 2024
1 parent 514ecc6 commit 5c3b36a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportFactory;

import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$;

public class ThriftBinaryCLIService extends ThriftCLIService {

Expand Down Expand Up @@ -137,7 +138,11 @@ public void run() {
LOG.error(
"Error starting HiveServer2: could not start "
+ ThriftBinaryCLIService.class.getSimpleName(), t);
System.exit(-1);
if (HiveThriftServer2$.MODULE$.systemExitOnError().get()) {
System.exit(-1);
} else {
throw new ServiceException(t);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hive.service.rpc.thrift.TCLIService;
import org.apache.hive.service.rpc.thrift.TCLIService.Iface;
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
Expand All @@ -47,7 +48,6 @@
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;


public class ThriftHttpCLIService extends ThriftCLIService {

protected org.eclipse.jetty.server.Server httpServer;
Expand Down Expand Up @@ -182,7 +182,11 @@ public void run() {
} else {
LOG.error("Error starting HiveServer2: could not start "
+ ThriftHttpCLIService.class.getSimpleName(), t);
System.exit(-1);
if (HiveThriftServer2$.MODULE$.systemExitOnError().get()) {
System.exit(-1);
} else {
throw new ServiceException(t);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,21 @@ object HiveThriftServer2 extends Logging {
var uiTab: Option[ThriftServerTab] = None
var listener: HiveThriftServer2Listener = _
var eventManager: HiveThriftServer2EventManager = _
val systemExitOnError = new AtomicBoolean(true)

/**
* :: DeveloperApi ::
* Starts a new thrift server with the given context.
*
* @param sqlContext SQLContext to use for the server
* @param exitOnError Whether to exit the JVM if HiveThriftServer2 fails to initialize. When true,
* the call logs the error and exits the JVM with exit code -1. When false, the
* call throws an exception instead.
*/
@DeveloperApi
def startWithContext(sqlContext: SQLContext): HiveThriftServer2 = {
def startWithContext(sqlContext: SQLContext, exitOnError: Boolean = true): HiveThriftServer2 = {
systemExitOnError.set(exitOnError)

val executionHive = HiveUtils.newClientForExecution(
sqlContext.sparkContext.conf,
sqlContext.sessionState.newHadoopConf())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hive.jdbc.HttpBasicAuthInterceptor
import org.apache.hive.service.auth.PlainSaslHelper
import org.apache.hive.service.cli.thrift.{ThriftCLIService, ThriftCLIServiceClient}
import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftCLIService, ThriftCLIServiceClient}
import org.apache.hive.service.rpc.thrift.TCLIService.Client
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.{THttpClient, TSocket}

import org.apache.spark.SparkException
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -138,10 +139,18 @@ trait SharedThriftServer extends SharedSparkSession {
sqlContext.setConf(ConfVars.HIVE_START_CLEANUP_SCRATCHDIR.varname, "true")

try {
hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
// Set exitOnError to false to avoid exiting the JVM process and tearing down the SparkContext
// instance in case of any exceptions here. Otherwise, the following retries are doomed to
// fail on a stopped context.
hiveServer2 = HiveThriftServer2.startWithContext(sqlContext, exitOnError = false)
hiveServer2.getServices.asScala.foreach {
case t: ThriftCLIService =>
serverPort = t.getPortNumber
if (t.isInstanceOf[ThriftBinaryCLIService] && mode == ServerMode.http) {
logError("A previous Hive's SessionState is leaked, aborting this retry")
throw SparkException.internalError("HiveThriftServer2 started in binary mode " +
"while the test case is expecting HTTP mode")
}
logInfo(s"Started HiveThriftServer2: mode=$mode, port=$serverPort, attempt=$attempt")
case _ =>
}
Expand Down

0 comments on commit 5c3b36a

Please sign in to comment.