Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49557][SQL] Add SQL pipe syntax for the WHERE operator #48091

Closed
wants to merge 16 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,7 @@ version

operatorPipeRightSide
: selectClause
| whereClause
;

// When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5854,7 +5854,20 @@ class AstBuilder extends DataTypeAstBuilder
windowClause = null,
relation = left,
isPipeOperatorSelect = true)
}.get
}.getOrElse(Option(ctx.whereClause).map { c =>
// Add a table subquery boundary between the new filter and the input plan if one does not
// already exist. This helps the analyzer behave as if we had added the WHERE clause after a
// table subquery containing the input plan.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! This skips the tricky aggregate function pushdown stuff from Filter/Sort which complicates the analyzer quite a bit. We also don't need this with pipe syntax, as it's quite easy for users to filter on the aggregated query.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that being said, seems like we don't need to add subquery alias if the child plan is UnresolvedRelation. We don't need to isolate the table scan node here.

Copy link
Contributor Author

@dtenedor dtenedor Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked offline and found that updating the UnresolvedRelation pattern match to this fixes the problem:

        case u: UnresolvedRelation =>
          u

In this way we don't add another redundant SubqueryAlias when ResolveRelations will already add one. Looking at the commit that performs this update, we see the analyzer plans improve accordingly.

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, it also fixes a regression. We can add a test for table t |> where spark_catalog.default.t.x = 1, which didn't work before this fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, this is done.

val withSubqueryAlias = left match {
case s: SubqueryAlias =>
s
case u: UnresolvedRelation =>
u
case _ =>
SubqueryAlias(SubqueryAlias.generateSubqueryName(), left)
}
withWhereClause(c, withSubqueryAlias)
}.get)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,55 @@ Distinct
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select *
-- !query analysis
Project [x#x, y#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select * except (y)
-- !query analysis
Project [x#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select /*+ repartition(3) */ *
-- !query analysis
Repartition 3, true
+- Project [x#x, y#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select /*+ repartition(3) */ distinct x
-- !query analysis
Repartition 3, true
+- Distinct
+- Project [x#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select /*+ repartition(3) */ all x
-- !query analysis
Repartition 3, true
+- Project [x#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select sum(x) as result
Expand Down Expand Up @@ -297,6 +346,229 @@ org.apache.spark.sql.AnalysisException
}


-- !query
table t
|> where true
-- !query analysis
Filter true
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> where x + length(y) < 4
-- !query analysis
Filter ((x#x + length(y#x)) < 4)
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> where x + length(y) < 4
|> where x + length(y) < 3
-- !query analysis
Filter ((x#x + length(y#x)) < 3)
+- SubqueryAlias __auto_generated_subquery_name
+- Filter ((x#x + length(y#x)) < 4)
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
(select x, sum(length(y)) as sum_len from t group by x)
|> where x = 1
-- !query analysis
Filter (x#x = 1)
+- SubqueryAlias __auto_generated_subquery_name
+- Aggregate [x#x], [x#x, sum(length(y#x)) AS sum_len#xL]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> where t.x = 1
-- !query analysis
Filter (x#x = 1)
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> where spark_catalog.default.t.x = 1
-- !query analysis
Filter (x#x = 1)
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
(select col from st)
|> where col.i1 = 1
-- !query analysis
Filter (col#x.i1 = 1)
+- SubqueryAlias __auto_generated_subquery_name
+- Project [col#x]
+- SubqueryAlias spark_catalog.default.st
+- Relation spark_catalog.default.st[x#x,col#x] parquet


-- !query
table st
|> where st.col.i1 = 2
-- !query analysis
Filter (col#x.i1 = 2)
+- SubqueryAlias spark_catalog.default.st
+- Relation spark_catalog.default.st[x#x,col#x] parquet


-- !query
table t
|> where exists (select a from other where x = a limit 1)
-- !query analysis
Filter exists#x [x#x]
: +- GlobalLimit 1
: +- LocalLimit 1
: +- Project [a#x]
: +- Filter (outer(x#x) = a#x)
: +- SubqueryAlias spark_catalog.default.other
: +- Relation spark_catalog.default.other[a#x,b#x] json
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> where (select any_value(a) from other where x = a limit 1) = 1
-- !query analysis
Filter (scalar-subquery#x [x#x] = 1)
: +- GlobalLimit 1
: +- LocalLimit 1
: +- Aggregate [any_value(a#x, false) AS any_value(a)#x]
: +- Filter (outer(x#x) = a#x)
: +- SubqueryAlias spark_catalog.default.other
: +- Relation spark_catalog.default.other[a#x,b#x] json
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> where sum(x) = 1
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "INVALID_WHERE_CONDITION",
"sqlState" : "42903",
"messageParameters" : {
"condition" : "\"(sum(x) = 1)\"",
"expressionList" : "sum(spark_catalog.default.t.x)"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 1,
"stopIndex" : 27,
"fragment" : "table t\n|> where sum(x) = 1"
} ]
}


-- !query
table t
|> where y = 'abc' or length(y) + sum(x) = 1
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "INVALID_WHERE_CONDITION",
"sqlState" : "42903",
"messageParameters" : {
"condition" : "\"((y = abc) OR ((length(y) + sum(x)) = 1))\"",
"expressionList" : "sum(spark_catalog.default.t.x)"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 1,
"stopIndex" : 52,
"fragment" : "table t\n|> where y = 'abc' or length(y) + sum(x) = 1"
} ]
}


-- !query
table t
|> where first_value(x) over (partition by y) = 1
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "_LEGACY_ERROR_TEMP_1034",
"messageParameters" : {
"clauseName" : "WHERE"
}
}


-- !query
select * from t where first_value(x) over (partition by y) = 1
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "_LEGACY_ERROR_TEMP_1034",
"messageParameters" : {
"clauseName" : "WHERE"
}
}


-- !query
table t
|> select x, length(y) as z
|> where x + length(y) < 4
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
"sqlState" : "42703",
"messageParameters" : {
"objectName" : "`y`",
"proposal" : "`x`, `z`"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 57,
"stopIndex" : 57,
"fragment" : "y"
} ]
}


-- !query
(select x, sum(length(y)) as sum_len from t group by x)
|> where sum(length(y)) = 3
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
"sqlState" : "42703",
"messageParameters" : {
"objectName" : "`y`",
"proposal" : "`x`, `sum_len`"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 77,
"stopIndex" : 77,
"fragment" : "y"
} ]
}


-- !query
drop table t
-- !query analysis
Expand Down
Loading