Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50892][SQL]Add UnionLoopExec, physical operator for recursion, to perform execution of recursive queries #49955

Closed

Conversation

Pajaraja
Copy link
Contributor

@Pajaraja Pajaraja commented Feb 14, 2025

What changes were proposed in this pull request?

This PR introduces UnionLoopExec, physical operator for recursion: UnionLoop is converted to UnionLoopExec during execution.
For now only UNION ALL case is supported.
The execution is performed by iteratively substituting UnionLoopRef with the plan obtained in previous step, as long as we are still generating new elements

In addition, small changes to Optimizer.scala are added to push down the Limit to UnionLoopExec in case it is present in the query.

Why are the changes needed?

Support for recursive CTE.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added golden files tests for various use cases of recursive CTEs: cte-recursion.sql and with.sql (tests are run with SQLQueryTestSuite). The outputs of the tests are checked with the outputs of the same (or syntactically slightly adapted) queries in Snowflake and PostgreSQL engines.
Added two tests with parameterized identifier with recursive CTEs to ParametersSuite.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Feb 14, 2025
@Pajaraja Pajaraja changed the title Apply Milan's already existing changes [WIP][SPARK-50892][SQL]Add UnionLoopExec, physical operator for recursion, to perform execution of recursive queries Feb 14, 2025
override val output: Seq[Attribute],
limit: Option[Int] = None) extends LeafExecNode {

override def innerChildren: Seq[QueryPlan[_]] = Seq(anchor, recursion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do they have to be inner children?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate what you mean by this, please? To me this makes sense that these are inner children as they represent subqueries, but I don't completely understand the difference between children and innerChildren.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They can't be "regular" children because they're logical, and the exec node is physical.

var currentLevel = 1

// Main loop for obtaining the result of the recursive query.
while (prevCount > 0 && (limit.isEmpty || currentLimit > 0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one idea: the key here is to get the row count of the current iteration, so that we can decide if we should keep iterating or not. The shuffle is only to save recomputing of the query. But for very simple queries (e.g. local scan with simple filter/project), shuffle is probably more expensive than recomputing. We should detect such case and avoid shuffle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added skipping shuffle when the recursion is simple in some cases. Would appreciate it if we could have more discussion on detecting these cases!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could use InjectRuntimeFilter.isSimpleExpression() to check filter and project expressions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more, I think a better and simpler idea is to skip shuffle if the optimized plan is LocalRelation, where the data is already materialized in memory.

@@ -1032,6 +1044,42 @@ object ColumnPruning extends Rule[LogicalPlan] {
p
}

case p @ Project(_, ul: UnionLoop) =>
if (!ul.outputSet.subsetOf(p.references)) {
val newAnchorChildProj = prunedChild(ul.anchor, p.references)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val newAnchorChildProj = prunedChild(ul.anchor, p.references)
val newAnchor = prunedChild(ul.anchor, p.references)

case p @ Project(_, ul: UnionLoop) =>
if (!ul.outputSet.subsetOf(p.references)) {
val newAnchorChildProj = prunedChild(ul.anchor, p.references)
val neededIndicesListRef = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you briefly describe the logic here? I thought we could follow how Union is handled

val anchor = ul.anchor
val newAnchor = prunedChild(anchor, p.references)
val newOutput = newAnchor.output
val selected = ul.recursion.output.zipWithIndex.filter { case (a, i) =>
  newOutput.contains(anchor.output(i))
}.map(_._1)
val newRecursion = Project(selected, ul.recursion)
p.copy(child = ul.withNewChildren(Seq(newAnchor, newRecursion)))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work because at every step we need to produce something readable for the next iteration of UnionLoopRef (except if we truly do not need it). An example is the Fibonacci generation:

WITH RECURSIVE fibonacci AS (
  VALUES (0, 1) AS t(a, b)
  UNION ALL
  SELECT b, a + b FROM fibonacci WHERE a < 10
)
SELECT a FROM fibonacci ORDER BY a;

This will fail with the above approach, as it will prune the recursive child to only return the next fibonacci number in the first iteration of the recursion. However, when trying to calculate the next fibonacci number, since we only memorized the last one, we won't be able to preform the calculation.

Now as for my approach, its pretty similar, just that we merge two sets of projection references to obtain everything we need to keep: the project right above UnionLoop (which we also consider in your approach), OR a project right above UnionLoopRef (which may not exist in which case we don't prune at all).

For this I create two sets of indices (one for each project) and merge them. Then I take make the pruned children in a way similar to what you suggested (I also started from modifying how pruning Union is handled). The reason I opted for sets of indices over sets of objects is because the columns down from UnionLoopRef might get renamed or given a different id in the process, but since the output of UnionLoopRef and UnionLoop should be of the same arity (and have the same types - they should logically correspond to each other!), the indices should correspond to the things we need accordingly.

I do notice that I made a small mistake in creating indicesForRef which should be fixed now.

// condition of while loop down (limit.isEmpty will be true).
var globalLimitNum = globalLimit.getOrElse(0)
var localLimitNum = localLimit.getOrElse(0)
var currentLimit = Math.max(globalLimitNum, localLimitNum * numPartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think localLimitNum * numPartitions can be treated as a global limit. Think about this case:

  1. The query result has 2 partitions. In each iteration, the first partition produces 1 million rows, and the second partition produces 1 row.
  2. Let's say the local limit is 100. We need to iterate 100 times so that the second partition produces enough data, but the current code stops at the first iteration because 1 million already exceeds 100 * 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true even though I apply repartition at the end? Not really sure how to proceed here, because it seems that if I don't apply repartition at the end, we get one partition for each iteration, and then the localLimit can never apply.

I don't really understand how the partitions interact with the iterations. Would love to discuss more about this local limit, and see whether we can come to a way to patch it.

"unlimited.")
.version("4.0.0")
.intConf
.createWithDefault(1000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need a row limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Peter suggested it in the case that the num of rows grows exponentially, which I think makes sense. The default value should probably be bigger though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea 1000 is too small

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced it with 1.000.000 now, and replaced the golden file test to indeed grow exponentially.

@Pajaraja Pajaraja requested a review from cloud-fan March 18, 2025 12:23
Copy link
Contributor

@peter-toth peter-toth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me, thanks for the changes and new test cases.

The only adjustment I would suggest is set the default cteRecursionRowLimit=1000 to higher as you mentioned.

Dataset.ofRows(session, Union(unionChildren.toSeq))
}
}
val coalescedDF = df.coalesce(numPartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't need to do coalesce for if (unionChildren.length == 1) branch.

@cloud-fan
Copy link
Contributor

thanks, merging to master! Note: it's not merged to 4.0 as we need to solve the perf issues by supporting column pruning and optimizing small queries, which we are unlikely to make before the 4.0 release.

@cloud-fan cloud-fan closed this in 94f3f38 Mar 19, 2025
kazemaksOG pushed a commit to kazemaksOG/spark-custom-scheduler that referenced this pull request Mar 27, 2025
…rsive CTE Subqueries

### What changes were proposed in this pull request?

Change the place where we check whether there is a recursive CTE within a subquery. Also, change implementation to be instead of collecting all subqueries into one array, we do an in-place traversal of everything to check.

### Why are the changes needed?

It's more efficient to do in-place traversal instead of collecting subqueries to an array and traverse, so this change is a small optimization.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Will be tested in [49955](apache#49955).

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#50208 from Pajaraja/pavle-martinovic_data/SmallOptimizeToCTERefIllegal.

Authored-by: pavle-martinovic_data <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
kazemaksOG pushed a commit to kazemaksOG/spark-custom-scheduler that referenced this pull request Mar 27, 2025
…, to perform execution of recursive queries

# What changes were proposed in this pull request?

This PR introduces UnionLoopExec, physical operator for recursion: UnionLoop is converted to UnionLoopExec during execution.
For now only UNION ALL case is supported.
The execution is performed by iteratively substituting UnionLoopRef with the plan obtained in previous step, as long as we are still generating new elements

In addition, small changes to Optimizer.scala are added to push down the Limit to UnionLoopExec in case it is present in the query.

# Why are the changes needed?

Support for recursive CTE.

# Does this PR introduce any user-facing change?

No.

# How was this patch tested?

Added golden files tests for various use cases of recursive CTEs: cte-recursion.sql and with.sql (tests are run with SQLQueryTestSuite). The outputs of the tests are checked with the outputs of the same (or syntactically slightly adapted) queries in Snowflake and PostgreSQL engines.
Added two tests with parameterized identifier with recursive CTEs to ParametersSuite.

# Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#49955 from Pajaraja/pavle-martinovic_data/UnionLoopExecCont.

Lead-authored-by: pavle-martinovic_data <[email protected]>
Co-authored-by: Pavle Martinovic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants