Skip to content

[SPARK-52024][SQL] Support cancel ShuffleQueryStage when propagate empty relations #50814

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

summaryzb
Copy link
Contributor

@summaryzb summaryzb commented May 7, 2025

What changes were proposed in this pull request?

1.This pr introduce cancel queryStage mechanism.
2.Apply this mechanism to AQEPropagateEmptyRelation, cancel the running queryStages which are unnecessary since propagate empty relations.

Why are the changes needed?

  1. Cancel queryStage mechanism can make AQE more flexible, we can add more CBO feature by using this mechanism after this pr merged.
  2. Tasks corresponding to unnecessary running queryStages occupy the executor cores, thus wasting compute resource

Does this PR introduce any user-facing change?

Yes, user will see stage failure because of optimized stage cancellation , but this failure takes no effect to the query result

How was this patch tested?

Manual test, since we can not guarantee the completion order of query stages, it's not reliable to put it in unit test

./bin/spark-shell --master local[4]

scala> case class TestData(key: Int, value: String)
defined class TestData

scala> case class TestData2(a: Int, b: Int)
defined class TestData2

scala> spark.sparkContext.parallelize(Seq.empty[Int].map(i => TestData(i, i.toString))).toDF().createOrReplaceTempView("emptyTestData")

scala> spark.sparkContext.parallelize((1 to 100).map(i => TestData(i, i.toString))).toDF().createOrReplaceTempView("testData")

scala> spark.sparkContext.parallelize(TestData2(1, 1) ::TestData2(1, 2) ::TestData2(2, 1) ::TestData2(2, 2) ::TestData2(3, 1) ::TestData2(3, 2) :: Nil,2).toDF().createOrReplaceTempView("testData2")

scala>     spark.udf.register("fake_udf", (input: Int) => {
     |       Thread.sleep(100)
     |       input
     |     })

scala> spark.sql("SELECT t.key1 FROM emptyTestData join (SELECT testData.key as key1 FROM testData join testData2 ON fake_udf(testData.key)=fake_udf(testData2.a) ) t on t.key1 = emptyTestData.key union SELECT testData.key FROM testData join testData2 ON testData.key=testData2.a ").collect

before this pr
image
after this pr
image

Was this patch authored or co-authored using generative AI tooling?

No.

@summaryzb
Copy link
Contributor Author

@cloud-fan @LuciferYang @panbingkun PTAL

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant