diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala index 1ab5f26f90b13..73a20e448be87 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala @@ -31,6 +31,15 @@ class SparkConnectExecutePlanHandler(responseObserver: StreamObserver[proto.Exec try { executeHolder.eventsManager.postStarted() executeHolder.start() + } catch { + // Errors raised before the execution holder has finished spawning a thread are considered + // plan execution failure, and the client should not try reattaching it afterwards. + case t: Throwable => + SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key) + throw t + } + + try { val responseSender = new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, responseObserver) executeHolder.runGrpcResponseSender(responseSender) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala index 25e6cc48a1998..2606284c25bd5 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala @@ -429,4 +429,27 @@ class ReattachableExecuteSuite extends SparkConnectServerTest { val abandonedExecutions = manager.listAbandonedExecutions assert(abandonedExecutions.forall(_.operationId != dummyOpId)) } + + test("SPARK-49492: reattach must not succeed on an inactive execution holder") { + withRawBlockingStub { stub => + val operationId = UUID.randomUUID().toString + + // supply an invalid plan so that the execute plan handler raises an error + val iter = stub.executePlan( + buildExecutePlanRequest(proto.Plan.newBuilder().build(), operationId = operationId)) + + // expect that the execution fails before spawning an execute thread + val ee = intercept[StatusRuntimeException] { + iter.next() + } + assert(ee.getMessage.contains("INTERNAL")) + + // reattach must fail + val reattach = stub.reattachExecute(buildReattachExecuteRequest(operationId, None)) + val re = intercept[StatusRuntimeException] { + reattach.hasNext() + } + assert(re.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND")) + } + } }