Skip to content

Commit 19fe6bd

Browse files
committed
handle streams
1 parent 71637d7 commit 19fe6bd

File tree

1 file changed

+32
-1
lines changed

1 file changed

+32
-1
lines changed

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,23 @@ class SparkConnectAddArtifactsHandler(val responseObserver: StreamObserver[AddAr
142142
}
143143

144144
protected def cleanUpStagedArtifacts(): Unit = {
145+
// Close all staged artifacts to release file handles and stream resources
146+
stagedArtifacts.foreach { artifact =>
147+
try {
148+
artifact.close()
149+
} catch {
150+
case NonFatal(_) => // Ignore errors during cleanup
151+
}
152+
}
153+
// Close the chunked artifact if it's still active
154+
if (chunkedArtifact != null) {
155+
try {
156+
chunkedArtifact.close()
157+
} catch {
158+
case NonFatal(_) => // Ignore errors during cleanup
159+
}
160+
chunkedArtifact = null
161+
}
145162
Utils.deleteRecursively(stagingDir.toFile)
146163
stagedArtifacts.clear()
147164
}
@@ -262,11 +279,25 @@ class SparkConnectAddArtifactsHandler(val responseObserver: StreamObserver[AddAr
262279

263280
def close(): Unit = {
264281
if (artifactSummary == null) {
265-
checksumOut.close()
282+
// Close streams defensively: even if outer stream close fails, try inner streams.
283+
// Normally checksumOut.close() cascades to inner streams, but if it throws an
284+
// exception, we still attempt to close the inner streams to prevent resource leaks.
285+
var closeException: Throwable = null
286+
try {
287+
checksumOut.close()
288+
} catch {
289+
case NonFatal(e) =>
290+
closeException = e
291+
// Try to close inner streams directly as fallback
292+
try countingOut.close() catch { case NonFatal(_) => }
293+
try fileOut.close() catch { case NonFatal(_) => }
294+
}
266295
artifactSummary = builder
267296
.setName(name)
268297
.setIsCrcSuccessful(getCrcStatus.getOrElse(false))
269298
.build()
299+
// Re-throw the close exception after building summary
300+
if (closeException != null) throw closeException
270301
}
271302
}
272303

0 commit comments

Comments
 (0)