Skip to content

Commit

Permalink
[SPARK-49525][SS][CONNECT] Minor log improvement to Server Side Strea…
Browse files Browse the repository at this point in the history
…ming Query ListenerBus Listener

### What changes were proposed in this pull request?

Change the log of onQueryStarted and onQueryTerminated from `logDebug` to `logInfo`. They would be useful for debugging as they indicate the events are indeed being fired from the server. It won't add more logging burden as there won't be so many queries start / end, at least much less than `onQueryProgress`.

### Why are the changes needed?

Debug improvement

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

No need

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48002 from WweiL/SPARK-49525-listener-bus-improvement.

Authored-by: Wei Liu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
WweiL authored and HyukjinKwon committed Sep 6, 2024
1 parent e49e31d commit 9269a0b
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.connect.proto.ExecutePlanResponse.SqlCommandResult
import org.apache.spark.connect.proto.Parse.ParseFormat
import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.StreamingQueryInstance
import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{DATAFRAME_ID, SESSION_ID}
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, TaskResourceProfile, TaskResourceRequest}
import org.apache.spark.sql.{Dataset, Encoders, ForeachWriter, Observation, RelationalGroupedDataset, Row, SparkSession}
Expand Down Expand Up @@ -3052,10 +3052,13 @@ class SparkConnectPlanner(
sessionHolder.streamingServersideListenerHolder.streamingQueryStartedEventCache.remove(
query.runId.toString))
queryStartedEvent.foreach {
logDebug(
s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}][query id: ${query.id}][query runId: ${query.runId}] " +
s"Adding QueryStartedEvent to response")
logInfo(
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}] " +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"[query id: ${MDC(LogKeys.QUERY_ID, query.id)}]" +
log"[query runId: ${MDC(LogKeys.QUERY_RUN_ID, query.runId)}] " +
log"Adding QueryStartedEvent to response")
e => resultBuilder.setQueryStartedEventJson(e.json)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,11 @@ private[sql] class SparkConnectListenerBusListener(
}

override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
logDebug(
s"[SessionId: ${sessionHolder.sessionId}][UserId: ${sessionHolder.userId}] " +
s"Sending QueryTerminatedEvent to client, id: ${event.id} runId: ${event.runId}.")
logInfo(
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionHolder.sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, sessionHolder.userId)}] " +
log"Sending QueryTerminatedEvent to client, id: ${MDC(LogKeys.QUERY_ID, event.id)} " +
log"runId: ${MDC(LogKeys.QUERY_RUN_ID, event.runId)}.")
send(event.json, StreamingQueryEventType.QUERY_TERMINATED_EVENT)
}

Expand Down

0 comments on commit 9269a0b

Please sign in to comment.