Skip to content

Commit

Permalink
[SPARK-51416][CONNECT] Remove SPARK_CONNECT_MODE when starting Spark …
Browse files Browse the repository at this point in the history
…Connect server

### What changes were proposed in this pull request?

This PR proposes to remove `SPARK_CONNECT_MODE` environment variable when starting Spark Connect server. So SparkSubmit thinks no remote is set in order to start the regular session.

### Why are the changes needed?

To make Spark Connect version of distribution works with `bin/pyspark`. Currently it fails as below:

```

py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.ClassNotFoundException: org.apache.spark.sql.connect.SparkConnectPlugin
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:467)
	at org.apache.spark.util.SparkClassUtils.classForName(SparkClassUtils.scala:41)
	at org.apache.spark.util.SparkClassUtils.classForName$(SparkClassUtils.scala:36)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:99)
	at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:2828)
	at scala.collection.StrictOptimizedIterableOps.flatMap(StrictOptimizedIterableOps.scala:118)
	at scala.collection.StrictOptimizedIterableOps.flatMap$(StrictOptimizedIterableOps.scala:105)
	at scala.collection.immutable.ArraySeq.flatMap(ArraySeq.scala:35)
	at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:2826)
	at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:210)
	at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:196)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:588)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:840)
```

`bin/spark-shell` also fails due to the same reason.

### Does this PR introduce _any_ user-facing change?

No, the main change has not been released yet.

### How was this patch tested?

Manually with:

```
SPARK_CONNECT_MODE=1 ./bin/pyspark
SPARK_CONNECT_MODE=1 ./bin/spark-shell
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #50180 from HyukjinKwon/SPARK-51416.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
HyukjinKwon committed Mar 6, 2025
1 parent 24c51a2 commit e4cc116
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
9 changes: 7 additions & 2 deletions python/pyspark/sql/connect/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1074,11 +1074,14 @@ def _start_connect_server(master: str, opts: Dict[str, Any]) -> None:
overwrite_conf["spark.connect.grpc.binding.port"] = "0"

origin_remote = os.environ.get("SPARK_REMOTE", None)
origin_connect_mode = os.environ.get("SPARK_CONNECT_MODE", None)
try:
# So SparkSubmit thinks no remote is set in order to
# start the regular PySpark session.
if origin_remote is not None:
# So SparkSubmit thinks no remote is set in order to
# start the regular PySpark session.
del os.environ["SPARK_REMOTE"]
if origin_connect_mode is not None:
del os.environ["SPARK_CONNECT_MODE"]

# The regular PySpark session is registered as an active session
# so would not be garbage-collected.
Expand All @@ -1096,6 +1099,8 @@ def _start_connect_server(master: str, opts: Dict[str, Any]) -> None:
finally:
if origin_remote is not None:
os.environ["SPARK_REMOTE"] = origin_remote
if origin_connect_mode is not None:
os.environ["SPARK_CONNECT_MODE"] = origin_connect_mode
else:
raise PySparkRuntimeError(
errorClass="SESSION_OR_CONTEXT_EXISTS",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ object SparkSession extends SparkSessionCompanion with Logging {
lazy val isAPIModeConnect =
Option(System.getProperty(org.apache.spark.sql.SparkSessionBuilder.API_MODE_KEY))
.getOrElse("classic")
.toLowerCase(Locale.ROOT) == "connect"
.toLowerCase(Locale.ROOT) == "connect" || System.getenv("SPARK_CONNECT_MODE") == "1"
val remoteString = sparkOptions
.get("spark.remote")
.orElse(Option(System.getProperty("spark.remote"))) // Set from Spark Submit
Expand Down Expand Up @@ -778,6 +778,7 @@ object SparkSession extends SparkSessionCompanion with Logging {
val pb = new ProcessBuilder(args: _*)
// So don't exclude spark-sql jar in classpath
pb.environment().remove(SparkConnectClient.SPARK_REMOTE)
pb.environment().remove("SPARK_CONNECT_MODE")
pb.environment().put("SPARK_IDENT_STRING", serverId)
pb.environment().put("HOSTNAME", "local")
pb.environment().put("SPARK_CONNECT_AUTHENTICATE_TOKEN", token)
Expand Down

0 comments on commit e4cc116

Please sign in to comment.