From 9269a0bfed56429e999269dfdfd89aefcb1b7261 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Fri, 6 Sep 2024 15:10:41 +0900 Subject: [PATCH] [SPARK-49525][SS][CONNECT] Minor log improvement to Server Side Streaming Query ListenerBus Listener ### What changes were proposed in this pull request? Change the log of onQueryStarted and onQueryTerminated from `logDebug` to `logInfo`. They would be useful for debugging as they indicate the events are indeed being fired from the server. It won't add more logging burden as there won't be so many queries start / end, at least much less than `onQueryProgress`. ### Why are the changes needed? Debug improvement ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No need ### Was this patch authored or co-authored using generative AI tooling? No Closes #48002 from WweiL/SPARK-49525-listener-bus-improvement. Authored-by: Wei Liu Signed-off-by: Hyukjin Kwon --- .../sql/connect/planner/SparkConnectPlanner.scala | 13 ++++++++----- .../service/SparkConnectListenerBusListener.scala | 8 +++++--- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index b6abab6ef7661..bb6d52308c192 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -40,7 +40,7 @@ import org.apache.spark.connect.proto.ExecutePlanResponse.SqlCommandResult import org.apache.spark.connect.proto.Parse.ParseFormat import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.StreamingQueryInstance import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.{DATAFRAME_ID, SESSION_ID} import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, TaskResourceProfile, TaskResourceRequest} import org.apache.spark.sql.{Dataset, Encoders, ForeachWriter, Observation, RelationalGroupedDataset, Row, SparkSession} @@ -3052,10 +3052,13 @@ class SparkConnectPlanner( sessionHolder.streamingServersideListenerHolder.streamingQueryStartedEventCache.remove( query.runId.toString)) queryStartedEvent.foreach { - logDebug( - s"[SessionId: $sessionId][UserId: $userId][operationId: " + - s"${executeHolder.operationId}][query id: ${query.id}][query runId: ${query.runId}] " + - s"Adding QueryStartedEvent to response") + logInfo( + log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" + + log"[UserId: ${MDC(LogKeys.USER_ID, userId)}] " + + log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " + + log"[query id: ${MDC(LogKeys.QUERY_ID, query.id)}]" + + log"[query runId: ${MDC(LogKeys.QUERY_RUN_ID, query.runId)}] " + + log"Adding QueryStartedEvent to response") e => resultBuilder.setQueryStartedEventJson(e.json) } diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala index 5b2205757648f..7a0c067ab430b 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala @@ -160,9 +160,11 @@ private[sql] class SparkConnectListenerBusListener( } override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { - logDebug( - s"[SessionId: ${sessionHolder.sessionId}][UserId: ${sessionHolder.userId}] " + - s"Sending QueryTerminatedEvent to client, id: ${event.id} runId: ${event.runId}.") + logInfo( + log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionHolder.sessionId)}]" + + log"[UserId: ${MDC(LogKeys.USER_ID, sessionHolder.userId)}] " + + log"Sending QueryTerminatedEvent to client, id: ${MDC(LogKeys.QUERY_ID, event.id)} " + + log"runId: ${MDC(LogKeys.QUERY_RUN_ID, event.runId)}.") send(event.json, StreamingQueryEventType.QUERY_TERMINATED_EVENT) }