Skip to content

Commit

Permalink
[SPARK-49492][CONNECT] Reattach attempted on inactive ExecutionHolder
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Check the status of the ExecutionHolder before reattaching it to the client.

### Why are the changes needed?

An ExecutionHolder may fail to spawn an ExecuteThreadRunner if an exception is thrown before doing so. In that case, reattaching the ExecutionHolder will succeed but no progress will be made.
-> *As a result, the job is hanging.*

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add a test case to ReattachableExecuteSuite.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47955 from changgyoopark-db/SPARK-49492.

Authored-by: Changgyoo Park <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
changgyoopark-db authored and HyukjinKwon committed Sep 6, 2024
1 parent 4cc34bf commit 26e59f2
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ class SparkConnectExecutePlanHandler(responseObserver: StreamObserver[proto.Exec
try {
executeHolder.eventsManager.postStarted()
executeHolder.start()
} catch {
// Errors raised before the execution holder has finished spawning a thread are considered
// plan execution failure, and the client should not try reattaching it afterwards.
case t: Throwable =>
SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key)
throw t
}

try {
val responseSender =
new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, responseObserver)
executeHolder.runGrpcResponseSender(responseSender)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,4 +429,27 @@ class ReattachableExecuteSuite extends SparkConnectServerTest {
val abandonedExecutions = manager.listAbandonedExecutions
assert(abandonedExecutions.forall(_.operationId != dummyOpId))
}

test("SPARK-49492: reattach must not succeed on an inactive execution holder") {
withRawBlockingStub { stub =>
val operationId = UUID.randomUUID().toString

// supply an invalid plan so that the execute plan handler raises an error
val iter = stub.executePlan(
buildExecutePlanRequest(proto.Plan.newBuilder().build(), operationId = operationId))

// expect that the execution fails before spawning an execute thread
val ee = intercept[StatusRuntimeException] {
iter.next()
}
assert(ee.getMessage.contains("INTERNAL"))

// reattach must fail
val reattach = stub.reattachExecute(buildReattachExecuteRequest(operationId, None))
val re = intercept[StatusRuntimeException] {
reattach.hasNext()
}
assert(re.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND"))
}
}
}

0 comments on commit 26e59f2

Please sign in to comment.