Skip to content

[SPARK-51655][SQL] Fix metric collection in UnionLoopExec and add test #50449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ case class UnionLoopExec(

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numIterations" -> SQLMetrics.createMetric(sparkContext, "number of recursive iterations"))
"numIterations" -> SQLMetrics.createMetric(sparkContext, "number of recursive iterations"),
"numAnchorOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of anchor output rows"))

/**
* This function executes the plan (optionally with appended limit node) and caches the result,
Expand Down Expand Up @@ -123,6 +124,7 @@ case class UnionLoopExec(
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val numOutputRows = longMetric("numOutputRows")
val numIterations = longMetric("numIterations")
val numAnchorOutputRows = longMetric("numAnchorOutputRows")
val levelLimit = conf.getConf(SQLConf.CTE_RECURSION_LEVEL_LIMIT)
val rowLimit = conf.getConf(SQLConf.CTE_RECURSION_ROW_LIMIT)

Expand All @@ -136,6 +138,8 @@ case class UnionLoopExec(

var (prevDF, prevCount) = executeAndCacheAndCount(anchor, currentLimit)

numAnchorOutputRows += prevCount

var currentLevel = 1

var currentNumRows = 0
Expand Down Expand Up @@ -177,7 +181,6 @@ case class UnionLoopExec(
// Update metrics
numOutputRows += prevCount
numIterations += 1
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)

if (!limitReached) {
// the current plan is created by substituting UnionLoopRef node with the project node of
Expand All @@ -200,6 +203,8 @@ case class UnionLoopExec(
}
}

SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)

if (unionChildren.isEmpty) {
new EmptyRDD[InternalRow](sparkContext)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,24 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}
}

test("Recursive CTEs metrics") {
val df = sql("""WITH RECURSIVE t(n) AS(
| VALUES 1, 2
| UNION ALL
| SELECT n+1 FROM t WHERE n < 20
| )
| SELECT * FROM t""".stripMargin)
val unionLoopExec = df.queryExecution.executedPlan.collect {
case ule: UnionLoopExec => ule
}
sparkContext.listenerBus.waitUntilEmpty()
assert(unionLoopExec.size == 1)
val expected = Map("number of output rows" -> 39L, "number of recursive iterations" -> 20L,
"number of anchor output rows" -> 2L)
testSparkPlanMetrics(df, 22, Map(
2L -> (("UnionLoop", expected))))
}

test("Filter metrics") {
// Assume the execution plan is
// PhysicalRDD(nodeId = 1) -> Filter(nodeId = 0)
Expand Down