diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index b6abab6ef7661..bb6d52308c192 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -40,7 +40,7 @@ import org.apache.spark.connect.proto.ExecutePlanResponse.SqlCommandResult import org.apache.spark.connect.proto.Parse.ParseFormat import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.StreamingQueryInstance import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.{DATAFRAME_ID, SESSION_ID} import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, TaskResourceProfile, TaskResourceRequest} import org.apache.spark.sql.{Dataset, Encoders, ForeachWriter, Observation, RelationalGroupedDataset, Row, SparkSession} @@ -3052,10 +3052,13 @@ class SparkConnectPlanner( sessionHolder.streamingServersideListenerHolder.streamingQueryStartedEventCache.remove( query.runId.toString)) queryStartedEvent.foreach { - logDebug( - s"[SessionId: $sessionId][UserId: $userId][operationId: " + - s"${executeHolder.operationId}][query id: ${query.id}][query runId: ${query.runId}] " + - s"Adding QueryStartedEvent to response") + logInfo( + log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" + + log"[UserId: ${MDC(LogKeys.USER_ID, userId)}] " + + log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " + + log"[query id: ${MDC(LogKeys.QUERY_ID, query.id)}]" + + log"[query runId: ${MDC(LogKeys.QUERY_RUN_ID, query.runId)}] " + + log"Adding QueryStartedEvent to response") e => resultBuilder.setQueryStartedEventJson(e.json) } diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala index 5b2205757648f..7a0c067ab430b 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala @@ -160,9 +160,11 @@ private[sql] class SparkConnectListenerBusListener( } override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { - logDebug( - s"[SessionId: ${sessionHolder.sessionId}][UserId: ${sessionHolder.userId}] " + - s"Sending QueryTerminatedEvent to client, id: ${event.id} runId: ${event.runId}.") + logInfo( + log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionHolder.sessionId)}]" + + log"[UserId: ${MDC(LogKeys.USER_ID, sessionHolder.userId)}] " + + log"Sending QueryTerminatedEvent to client, id: ${MDC(LogKeys.QUERY_ID, event.id)} " + + log"runId: ${MDC(LogKeys.QUERY_RUN_ID, event.runId)}.") send(event.json, StreamingQueryEventType.QUERY_TERMINATED_EVENT) }