Skip to content

Commit

Permalink
[KYUUBI #6681] Log the delete batch request in batch operation log
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

As title, log the delete batch request in operation log.

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6681 from turboFei/audit_kill.

Closes #6681

8550868 [Wang, Fei] withOperationLog

Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
  • Loading branch information
turboFei committed Sep 9, 2024
1 parent db5ce0c commit edbe3f3
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SQL_EXECUTION_ID_KEY}
import org.apache.kyuubi.engine.spark.operation.ExecuteStatement
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.log.OperationLog

/**
* A [[SparkListener]] based on spark's DeveloperApi [[StatsReportListener]], used to appending
Expand Down Expand Up @@ -78,15 +77,6 @@ class SQLOperationListener(
properties != null && properties.getProperty(KYUUBI_STATEMENT_ID_KEY) == operationId
}

private def withOperationLog(f: => Unit): Unit = {
try {
operation.getOperationLog.foreach(OperationLog.setCurrentOperationLog)
f
} finally {
OperationLog.removeCurrentOperationLog()
}
}

override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
if (sameGroupId(jobStart.properties)) {
val jobId = jobStart.jobId
Expand All @@ -105,7 +95,7 @@ class SQLOperationListener(
activeJobs.put(
jobId,
new SparkJobInfo(stageSize, stageIds))
withOperationLog {
operation.withOperationLog {
info(s"Query [$operationId]: Job $jobId started with $stageSize stages," +
s" ${activeJobs.size()} active jobs running")
}
Expand All @@ -119,7 +109,7 @@ class SQLOperationListener(
case JobSucceeded => "succeeded"
case _ => "failed" // TODO: Handle JobFailed(exception: Exception)
}
withOperationLog {
operation.withOperationLog {
info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running")
}
}
Expand All @@ -135,7 +125,7 @@ class SQLOperationListener(
activeStages.put(
stageAttempt,
new SparkStageInfo(stageId, stageInfo.numTasks))
withOperationLog {
operation.withOperationLog {
info(s"Query [$operationId]: Stage $stageId.$attemptNumber started " +
s"with ${stageInfo.numTasks} tasks, ${activeStages.size()} active stages running")
}
Expand Down Expand Up @@ -166,7 +156,7 @@ class SQLOperationListener(
operationRunTime.getAndAdd(taskMetrics.executorRunTime)
operationCpuTime.getAndAdd(taskMetrics.executorCpuTime)
}
withOperationLog(super.onStageCompleted(stageCompleted))
operation.withOperationLog(super.onStageCompleted(stageCompleted))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin

override def getOperationLog: Option[OperationLog] = None

override def withOperationLog(f: => Unit): Unit = {
try {
getOperationLog.foreach(OperationLog.setCurrentOperationLog)
f
} finally {
OperationLog.removeCurrentOperationLog()
}
}

OperationAuditLogger.audit(this, OperationState.INITIALIZED)
@volatile protected var state: OperationState = INITIALIZED
@volatile protected var startTime: Long = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ trait Operation {
def getHandle: OperationHandle
def getStatus: OperationStatus
def getOperationLog: Option[OperationLog]
def withOperationLog(f: => Unit): Unit

def getBackgroundHandle: Future[_]
def shouldRunAsync: Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,14 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {

val sessionHandle = formatSessionHandle(batchId)
sessionManager.getBatchSession(sessionHandle).map { batchSession =>
fe.getSessionUser(batchSession.user)
val userName = fe.getSessionUser(batchSession.user)
val ipAddress = fe.getIpAddress
sessionManager.closeSession(batchSession.handle)
val (killed, msg) = batchSession.batchJobSubmissionOp.getKillMessage
batchSession.batchJobSubmissionOp.withOperationLog {
warn(s"Received kill batch request from $userName/$ipAddress")
warn(s"Kill batch response: killed: $killed, msg: $msg.")
}
new CloseBatchResponse(killed, msg)
}.getOrElse {
sessionManager.getBatchMetadata(batchId).map { metadata =>
Expand Down

0 comments on commit edbe3f3

Please sign in to comment.