From 2522fb2eeec2a6193dd5ce0b697ce5c3b3d65b0d Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Mon, 9 Sep 2024 14:41:37 -0700 Subject: [PATCH 01/10] commit commit commit --- .../resources/error/error-conditions.json | 6 ++ .../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 1 + .../sql/catalyst/parser/SqlBaseParser.g4 | 5 + .../sql/catalyst/analysis/Analyzer.scala | 19 +++- .../sql/catalyst/parser/AstBuilder.scala | 25 +++++ .../plans/logical/PipeOperatorSelect.scala | 32 ++++++ .../sql/catalyst/rules/RuleIdCollection.scala | 1 + .../sql/catalyst/trees/TreePatterns.scala | 1 + .../sql/errors/QueryCompilationErrors.scala | 8 ++ .../apache/spark/sql/internal/SQLConf.scala | 9 ++ .../analyzer-results/pipe-operators.sql.out | 93 +++++++++++++++++ .../sql-tests/inputs/pipe-operators.sql | 24 +++++ .../sql-tests/results/pipe-operators.sql.out | 99 +++++++++++++++++++ .../apache/spark/sql/SQLQueryTestSuite.scala | 3 + .../sql/execution/SparkSqlParserSuite.scala | 11 ++- 15 files changed, 335 insertions(+), 2 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PipeOperatorSelect.scala create mode 100644 sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out create mode 100644 sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 6476c7fc9c5e2..aed16e065ef4d 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3707,6 +3707,12 @@ ], "sqlState" : "42K03" }, + "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION" : { + "message" : [ + "Aggregate function is not allowed when using the pipe operator |> SELECT clause; please use the pipe operator |> AGGREGATE clause instead" + ], + "sqlState" : "0A000" + }, "PIVOT_VALUE_DATA_TYPE_MISMATCH" : { "message" : [ "Invalid pivot value '': value data type does not match pivot column data type ." diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 6793cb46852be..d00d946d1c19d 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -503,6 +503,7 @@ TILDE: '~'; AMPERSAND: '&'; PIPE: '|'; CONCAT_PIPE: '||'; +OPERATOR_PIPE: '|>'; HAT: '^'; COLON: ':'; DOUBLE_COLON: '::'; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 6a23bd394c8ca..a8a201e59b94f 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -599,6 +599,7 @@ queryTerm operator=INTERSECT setQuantifier? right=queryTerm #setOperation | left=queryTerm {!legacy_setops_precedence_enabled}? operator=(UNION | EXCEPT | SETMINUS) setQuantifier? right=queryTerm #setOperation + | left=queryTerm OPERATOR_PIPE operatorPipeRightSide #operatorPipeStatement ; queryPrimary @@ -1457,6 +1458,10 @@ version | stringLit ; +operatorPipeRightSide + : selectClause + ; + // When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL. // - Reserved keywords: // Keywords that are reserved and can't be used as identifiers for table, view, column, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0164af945ca28..e58a79fb4a39b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -344,7 +344,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor extendedResolutionRules : _*), Batch("Remove TempResolvedColumn", Once, RemoveTempResolvedColumn), Batch("Post-Hoc Resolution", Once, - Seq(ResolveCommandsWithIfExists) ++ + Seq(ResolveCommandsWithIfExists, RemovePipeOperators) ++ postHocResolutionRules: _*), Batch("Remove Unresolved Hints", Once, new ResolveHints.RemoveAllHints), @@ -2727,6 +2727,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor t => t.containsAnyPattern(AGGREGATE_EXPRESSION, PYTHON_UDF) && t.containsPattern(PROJECT), ruleId) { case Project(projectList, child) if containsAggregates(projectList) => + if (child.isInstanceOf[PipeOperatorSelect]) { + // If we used the pipe operator |> SELECT clause to specify an aggregate function, this is + // invalid; return an error message instructing the user to use the pipe operator + // |> AGGREGATE clause for this purpose instead. + throw QueryCompilationErrors.pipeOperatorSelectContainsAggregateFunction(projectList.head) + } Aggregate(Nil, projectList, child) } @@ -2747,6 +2753,17 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor } } + /** + * Removes placeholder PipeOperator* logical plan nodes and checks invariants. + */ + object RemovePipeOperators extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( + _.containsPattern(PIPE_OPERATOR_SELECT), ruleId) { + case PipeOperatorSelect(child) => + child + } + } + /** * This rule finds aggregate expressions that are not in an aggregate operator. For example, * those in a HAVING clause or ORDER BY clause. These expressions are pushed down to the diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f4638920af3c4..0ca3a0afffe51 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -5677,4 +5677,29 @@ class AstBuilder extends DataTypeAstBuilder withOrigin(ctx) { visitSetVariableImpl(ctx.query(), ctx.multipartIdentifierList(), ctx.assignmentList()) } + + override def visitOperatorPipeStatement(ctx: OperatorPipeStatementContext): LogicalPlan = { + visitOperatorPipeRightSide(ctx.operatorPipeRightSide(), plan(ctx.left)) + } + + private def visitOperatorPipeRightSide( + ctx: OperatorPipeRightSideContext, left: LogicalPlan): LogicalPlan = { + if (!SQLConf.get.getConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED)) { + operationNotAllowed("Operator pipe SQL syntax using |>", ctx) + } + Option(ctx.selectClause).map { c => + withSelectQuerySpecification( + ctx = ctx, + selectClause = c, + lateralView = new java.util.ArrayList[LateralViewContext](), + whereClause = null, + aggregationClause = null, + havingClause = null, + windowClause = null, + left) match { + case p: Project => + p.copy(child = PipeOperatorSelect(p.child)) + } + }.get + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PipeOperatorSelect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PipeOperatorSelect.scala new file mode 100644 index 0000000000000..cbd2ffe9c920e --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PipeOperatorSelect.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.trees.TreePattern.{PIPE_OPERATOR_SELECT, TreePattern} + +/** + * Represents a SELECT clause when used with the |> SQL pipe operator. + * We use this operator to make sure that no aggregate functions exist in the SELECT expressions. + */ +case class PipeOperatorSelect(child: LogicalPlan) extends UnaryNode { + final override val nodePatterns: Seq[TreePattern] = Seq(PIPE_OPERATOR_SELECT) + override def output: Seq[Attribute] = child.output + override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + PipeOperatorSelect(newChild) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index c70b43f0db173..4db7fa23c5df7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -47,6 +47,7 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator" :: "org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions" :: "org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates" :: + "org.apache.spark.sql.catalyst.analysis.Analyzer$RemovePipeOperators" :: "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggAliasInGroupBy" :: "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions" :: "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases" :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index cbbfccfcab5e8..826ac52c2b817 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -72,6 +72,7 @@ object TreePattern extends Enumeration { val NOT: Value = Value val NULL_CHECK: Value = Value val NULL_LITERAL: Value = Value + val PIPE_OPERATOR_SELECT: Value = Value val SERIALIZE_FROM_OBJECT: Value = Value val OR: Value = Value val OUTER_REFERENCE: Value = Value diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index fa8ea2f5289fa..5eb38e9f3b7d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -4096,4 +4096,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("functionName" -> functionName) ) } + + def pipeOperatorSelectContainsAggregateFunction(expr: Expression): Throwable = { + new AnalysisException( + errorClass = "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", + messageParameters = Map( + "expr" -> expr.toString), + origin = expr.origin) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a87b0613292c9..2f4cb8235363e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4989,6 +4989,15 @@ object SQLConf { .stringConf .createWithDefault("versionAsOf") + val OPERATOR_PIPE_SYNTAX_ENABLED = + buildConf("spark.sql.operatorPipeSyntaxEnabled") + .doc("If true, enable operator pipe syntax for Apache Spark SQL. This uses the operator " + + "pipe marker |> to indicate separation between clauses of SQL in a manner that describes " + + "the sequence of steps that the query performs in a composable fashion.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + val LEGACY_PERCENTILE_DISC_CALCULATION = buildConf("spark.sql.legacy.percentileDiscCalculation") .internal() .doc("If true, the old bogus percentile_disc calculation is used. The old calculation " + diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out new file mode 100644 index 0000000000000..f34379c614926 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out @@ -0,0 +1,93 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +create table t(x int, y string) using csv +-- !query analysis +CreateDataSourceTableCommand `spark_catalog`.`default`.`t`, false + + +-- !query +insert into t values (0, 'abc'), (1, 'def') +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/t, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/t], Append, `spark_catalog`.`default`.`t`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/t), [x, y] ++- Project [cast(col1#x as int) AS x#x, cast(col2#x as string) AS y#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +table t +|> select 1 as x +-- !query analysis +Project [1 AS x#x] ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select x, y +-- !query analysis +Project [x#x, y#x] ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select x, y +|> select x + length(y) as z +-- !query analysis +Project [(x#x + length(y#x)) AS z#x] ++- Project [x#x, y#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +values (0), (1) tab(col) +|> select col * 2 as result +-- !query analysis +Project [(col#x * 2) AS result#x] ++- SubqueryAlias tab + +- LocalRelation [col#x] + + +-- !query +(select * from t union all select * from t) +|> select x + length(y) as result +-- !query analysis +Project [(x#x + length(y#x)) AS result#x] ++- Union false, false + :- Project [x#x, y#x] + : +- SubqueryAlias spark_catalog.default.t + : +- Relation spark_catalog.default.t[x#x,y#x] csv + +- Project [x#x, y#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select sum(x) as result +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "expr" : "sum(x#x) AS result#xL" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 19, + "stopIndex" : 34, + "fragment" : "sum(x) as result" + } ] +} + + +-- !query +drop table t +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql new file mode 100644 index 0000000000000..cff7be0b894fe --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql @@ -0,0 +1,24 @@ +create table t(x int, y string) using csv; +insert into t values (0, 'abc'), (1, 'def'); + +-- Selection operators. +table t +|> select 1 as x; + +table t +|> select x, y; + +table t +|> select x, y +|> select x + length(y) as z; + +values (0), (1) tab(col) +|> select col * 2 as result; + +(select * from t union all select * from t) +|> select x + length(y) as result; + +table t +|> select sum(x) as result; + +drop table t; \ No newline at end of file diff --git a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out new file mode 100644 index 0000000000000..fa006d1d3c12f --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out @@ -0,0 +1,99 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +create table t(x int, y string) using csv +-- !query schema +struct<> +-- !query output + + + +-- !query +insert into t values (0, 'abc'), (1, 'def') +-- !query schema +struct<> +-- !query output + + + +-- !query +table t +|> select 1 as x +-- !query schema +struct +-- !query output +1 +1 + + +-- !query +table t +|> select x, y +-- !query schema +struct +-- !query output +0 abc +1 def + + +-- !query +table t +|> select x, y +|> select x + length(y) as z +-- !query schema +struct +-- !query output +3 +4 + + +-- !query +values (0), (1) tab(col) +|> select col * 2 as result +-- !query schema +struct +-- !query output +0 +2 + + +-- !query +(select * from t union all select * from t) +|> select x + length(y) as result +-- !query schema +struct +-- !query output +3 +3 +4 +4 + + +-- !query +table t +|> select sum(x) as result +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "expr" : "sum(x#x) AS result#xL" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 19, + "stopIndex" : 34, + "fragment" : "sum(x) as result" + } ] +} + + +-- !query +drop table t +-- !query schema +struct<> +-- !query output + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index b031f45ddbf34..a14a0e20f05cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -156,6 +156,9 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper // SPARK-39564: don't print out serde to avoid introducing complicated and error-prone // regex magic. .set("spark.test.noSerdeInExplain", "true") + // Enable operator pipe syntax in SQL golden file tests to get test coverage even if the feature + // is not otherwise enabled by default. + .set(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED, true) // SPARK-32106 Since we add SQL test 'transform.sql' will use `cat` command, // here we need to ignore it. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index f60df77b7e9bd..70c444b6b7697 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, RefreshResource} -import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StringType import org.apache.spark.util.ArrayImplicits._ @@ -880,4 +880,13 @@ class SparkSqlParserSuite extends AnalysisTest with SharedSparkSession { parser.parsePlan("SELECT\u30001") // Unicode ideographic space } // scalastyle:on + + test("Operator pipe SQL syntax") { + withSQLConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED.key -> "true") { + // Basic selection. + parser.parsePlan("TABLE t |> SELECT 1 AS X") + parser.parsePlan("TABLE t |> SELECT 1 AS X, 2 AS Y |> SELECT X + Y AS Z") + parser.parsePlan("VALUES (0), (1) tab(col) |> SELECT col * 2 AS result") + } + } } From 0cd4f2ab01b86fc3a2141463fc75c1c64a895e6f Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Tue, 10 Sep 2024 11:56:35 -0700 Subject: [PATCH 02/10] add more testing --- .../analyzer-results/pipe-operators.sql.out | 173 ++++++++++++++++++ .../sql-tests/inputs/pipe-operators.sql | 64 ++++++- .../sql-tests/results/pipe-operators.sql.out | 167 +++++++++++++++++ 3 files changed, 402 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out index f34379c614926..ec64b2c1a7a5d 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out @@ -1,4 +1,11 @@ -- Automatically generated by SQLQueryTestSuite +-- !query +drop table if exists t +-- !query analysis +DropTable true, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t + + -- !query create table t(x int, y string) using csv -- !query analysis @@ -13,6 +20,48 @@ InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_d +- LocalRelation [col1#x, col2#x] +-- !query +drop table if exists other +-- !query analysis +DropTable true, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.other + + +-- !query +create table other(a int, b int) using json +-- !query analysis +CreateDataSourceTableCommand `spark_catalog`.`default`.`other`, false + + +-- !query +insert into other values (1, 1), (1, 2), (2, 4) +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/other, false, JSON, [path=file:[not included in comparison]/{warehouse_dir}/other], Append, `spark_catalog`.`default`.`other`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/other), [a, b] ++- Project [cast(col1#x as int) AS a#x, cast(col2#x as int) AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +drop table if exists st +-- !query analysis +DropTable true, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.st + + +-- !query +create table st(x int, col struct) using parquet +-- !query analysis +CreateDataSourceTableCommand `spark_catalog`.`default`.`st`, false + + +-- !query +insert into st values (1, (2, 3)) +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/st, false, Parquet, [path=file:[not included in comparison]/{warehouse_dir}/st], Append, `spark_catalog`.`default`.`st`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/st), [x, col] ++- Project [cast(col1#x as int) AS x#x, named_struct(i1, cast(col2#x.col1 as int), i2, cast(col2#x.col2 as int)) AS col#x] + +- LocalRelation [col1#x, col2#x] + + -- !query table t |> select 1 as x @@ -65,6 +114,95 @@ Project [(x#x + length(y#x)) AS result#x] +- Relation spark_catalog.default.t[x#x,y#x] csv +-- !query +(table t + |> select x, y + |> select x) +union all +select x from t where x < 1 +-- !query analysis +Union false, false +:- Project [x#x] +: +- Project [x#x, y#x] +: +- SubqueryAlias spark_catalog.default.t +: +- Relation spark_catalog.default.t[x#x,y#x] csv ++- Project [x#x] + +- Filter (x#x < 1) + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +(select col from st) +|> select col.i1 +-- !query analysis +Project [col#x.i1 AS i1#x] ++- Project [col#x] + +- SubqueryAlias spark_catalog.default.st + +- Relation spark_catalog.default.st[x#x,col#x] parquet + + +-- !query +table st +|> select st.col.i1 +-- !query analysis +Project [col#x.i1 AS i1#x] ++- SubqueryAlias spark_catalog.default.st + +- Relation spark_catalog.default.st[x#x,col#x] parquet + + +-- !query +table t +|> select (select a from other where x = a limit 1) +-- !query analysis +Project [scalar-subquery#x [x#x] AS scalarsubquery(x)#x] +: +- GlobalLimit 1 +: +- LocalLimit 1 +: +- Project [a#x] +: +- Filter (outer(x#x) = a#x) +: +- SubqueryAlias spark_catalog.default.other +: +- Relation spark_catalog.default.other[a#x,b#x] json ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select (select any_value(a) from other where x = a limit 1) +-- !query analysis +Project [scalar-subquery#x [x#x] AS scalarsubquery(x)#x] +: +- GlobalLimit 1 +: +- LocalLimit 1 +: +- Aggregate [any_value(a#x, false) AS any_value(a)#x] +: +- Filter (outer(x#x) = a#x) +: +- SubqueryAlias spark_catalog.default.other +: +- Relation spark_catalog.default.other[a#x,b#x] json ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select x + length(x) as z, z + 1 as plus_one +-- !query analysis +Project [z#x, (z#x + 1) AS plus_one#x] ++- Project [x#x, y#x, (x#x + length(cast(x#x as string))) AS z#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select first_value(x) over (partition by y) +-- !query analysis +Project [first_value(x) OVER (PARTITION BY y ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x] ++- Project [x#x, y#x, first_value(x) OVER (PARTITION BY y ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x, first_value(x) OVER (PARTITION BY y ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x] + +- Window [first_value(x#x, false) windowspecdefinition(y#x, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS first_value(x) OVER (PARTITION BY y ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x], [y#x] + +- Project [x#x, y#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + -- !query table t |> select sum(x) as result @@ -86,8 +224,43 @@ org.apache.spark.sql.AnalysisException } +-- !query +table t +|> select y, length(y) + sum(x) as result +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "expr" : "y#x" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 19, + "stopIndex" : 19, + "fragment" : "y" + } ] +} + + -- !query drop table t -- !query analysis DropTable false, false +- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t + + +-- !query +drop table other +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.other + + +-- !query +drop table st +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.st diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql index cff7be0b894fe..364968e83c02f 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql @@ -1,24 +1,84 @@ +-- Prepare some test data. +-------------------------- +drop table if exists t; create table t(x int, y string) using csv; insert into t values (0, 'abc'), (1, 'def'); --- Selection operators. +drop table if exists other; +create table other(a int, b int) using json; +insert into other values (1, 1), (1, 2), (2, 4); + +drop table if exists st; +create table st(x int, col struct) using parquet; +insert into st values (1, (2, 3)); + +-- Selection operators: positive tests. +--------------------------------------- + +-- Selecting a constant. table t |> select 1 as x; +-- Selecting attributes. table t |> select x, y; +-- Chained pipe SELECT operators. table t |> select x, y |> select x + length(y) as z; +-- Using the VALUES list as the source relation. values (0), (1) tab(col) |> select col * 2 as result; +-- Using a table subquery as the source relation. (select * from t union all select * from t) |> select x + length(y) as result; +-- Enclosing the result of a pipe SELECT operation in a table subquery. +(table t + |> select x, y + |> select x) +union all +select x from t where x < 1; + +-- Selecting struct fields. +(select col from st) +|> select col.i1; + +table st +|> select st.col.i1; + +-- Expression subqueries in the pipe operator SELECT list. +table t +|> select (select a from other where x = a limit 1); + +-- Aggregations are allowed within expression subqueries in the pipe operator SELECT list as long as +-- no aggregate functions exist in the top-level select list. +table t +|> select (select any_value(a) from other where x = a limit 1); + +-- Lateral column aliases in the pipe operator SELECT list. +table t +|> select x + length(x) as z, z + 1 as plus_one; + +-- Window functions are allowed in the pipe operator SELECT list. +table t +|> select first_value(x) over (partition by y); + +-- Selection operators: negative tests. +--------------------------------------- + +-- Aggregate functions are not allowed in the pipe operator SELECT list. table t |> select sum(x) as result; -drop table t; \ No newline at end of file +table t +|> select y, length(y) + sum(x) as result; + +-- Cleanup. +----------- +drop table t; +drop table other; +drop table st; \ No newline at end of file diff --git a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out index fa006d1d3c12f..a14ceee256ad4 100644 --- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out @@ -1,4 +1,12 @@ -- Automatically generated by SQLQueryTestSuite +-- !query +drop table if exists t +-- !query schema +struct<> +-- !query output + + + -- !query create table t(x int, y string) using csv -- !query schema @@ -15,6 +23,54 @@ struct<> +-- !query +drop table if exists other +-- !query schema +struct<> +-- !query output + + + +-- !query +create table other(a int, b int) using json +-- !query schema +struct<> +-- !query output + + + +-- !query +insert into other values (1, 1), (1, 2), (2, 4) +-- !query schema +struct<> +-- !query output + + + +-- !query +drop table if exists st +-- !query schema +struct<> +-- !query output + + + +-- !query +create table st(x int, col struct) using parquet +-- !query schema +struct<> +-- !query output + + + +-- !query +insert into st values (1, (2, 3)) +-- !query schema +struct<> +-- !query output + + + -- !query table t |> select 1 as x @@ -68,6 +124,78 @@ struct 4 +-- !query +(table t + |> select x, y + |> select x) +union all +select x from t where x < 1 +-- !query schema +struct +-- !query output +0 +0 +1 + + +-- !query +(select col from st) +|> select col.i1 +-- !query schema +struct +-- !query output +2 + + +-- !query +table st +|> select st.col.i1 +-- !query schema +struct +-- !query output +2 + + +-- !query +table t +|> select (select a from other where x = a limit 1) +-- !query schema +struct +-- !query output +1 +NULL + + +-- !query +table t +|> select (select any_value(a) from other where x = a limit 1) +-- !query schema +struct +-- !query output +1 +NULL + + +-- !query +table t +|> select x + length(x) as z, z + 1 as plus_one +-- !query schema +struct +-- !query output +1 2 +2 3 + + +-- !query +table t +|> select first_value(x) over (partition by y) +-- !query schema +struct +-- !query output +0 +1 + + -- !query table t |> select sum(x) as result @@ -91,9 +219,48 @@ org.apache.spark.sql.AnalysisException } +-- !query +table t +|> select y, length(y) + sum(x) as result +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "expr" : "y#x" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 19, + "stopIndex" : 19, + "fragment" : "y" + } ] +} + + -- !query drop table t -- !query schema struct<> -- !query output + + +-- !query +drop table other +-- !query schema +struct<> +-- !query output + + + +-- !query +drop table st +-- !query schema +struct<> +-- !query output + From 599b2943b38ea8485ac01623287f5bc2ae57a5a5 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Tue, 10 Sep 2024 11:59:46 -0700 Subject: [PATCH 03/10] add more testing --- .../analyzer-results/pipe-operators.sql.out | 16 ++++++++-------- .../sql-tests/inputs/pipe-operators.sql | 8 ++++---- .../sql-tests/results/pipe-operators.sql.out | 12 ++++++------ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out index ec64b2c1a7a5d..359d02308f0b2 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out @@ -153,9 +153,9 @@ Project [col#x.i1 AS i1#x] -- !query table t -|> select (select a from other where x = a limit 1) +|> select (select a from other where x = a limit 1) as result -- !query analysis -Project [scalar-subquery#x [x#x] AS scalarsubquery(x)#x] +Project [scalar-subquery#x [x#x] AS result#x] : +- GlobalLimit 1 : +- LocalLimit 1 : +- Project [a#x] @@ -168,9 +168,9 @@ Project [scalar-subquery#x [x#x] AS scalarsubquery(x)#x] -- !query table t -|> select (select any_value(a) from other where x = a limit 1) +|> select (select any_value(a) from other where x = a limit 1) as result -- !query analysis -Project [scalar-subquery#x [x#x] AS scalarsubquery(x)#x] +Project [scalar-subquery#x [x#x] AS result#x] : +- GlobalLimit 1 : +- LocalLimit 1 : +- Aggregate [any_value(a#x, false) AS any_value(a)#x] @@ -193,11 +193,11 @@ Project [z#x, (z#x + 1) AS plus_one#x] -- !query table t -|> select first_value(x) over (partition by y) +|> select first_value(x) over (partition by y) as result -- !query analysis -Project [first_value(x) OVER (PARTITION BY y ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x] -+- Project [x#x, y#x, first_value(x) OVER (PARTITION BY y ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x, first_value(x) OVER (PARTITION BY y ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x] - +- Window [first_value(x#x, false) windowspecdefinition(y#x, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS first_value(x) OVER (PARTITION BY y ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x], [y#x] +Project [result#x] ++- Project [x#x, y#x, result#x, result#x] + +- Window [first_value(x#x, false) windowspecdefinition(y#x, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS result#x], [y#x] +- Project [x#x, y#x] +- SubqueryAlias spark_catalog.default.t +- Relation spark_catalog.default.t[x#x,y#x] csv diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql index 364968e83c02f..087a3d322d080 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql @@ -52,12 +52,12 @@ table st -- Expression subqueries in the pipe operator SELECT list. table t -|> select (select a from other where x = a limit 1); +|> select (select a from other where x = a limit 1) as result; -- Aggregations are allowed within expression subqueries in the pipe operator SELECT list as long as -- no aggregate functions exist in the top-level select list. table t -|> select (select any_value(a) from other where x = a limit 1); +|> select (select any_value(a) from other where x = a limit 1) as result; -- Lateral column aliases in the pipe operator SELECT list. table t @@ -65,7 +65,7 @@ table t -- Window functions are allowed in the pipe operator SELECT list. table t -|> select first_value(x) over (partition by y); +|> select first_value(x) over (partition by y) as result; -- Selection operators: negative tests. --------------------------------------- @@ -81,4 +81,4 @@ table t ----------- drop table t; drop table other; -drop table st; \ No newline at end of file +drop table st; diff --git a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out index a14ceee256ad4..2b22e49dec72b 100644 --- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out @@ -158,9 +158,9 @@ struct -- !query table t -|> select (select a from other where x = a limit 1) +|> select (select a from other where x = a limit 1) as result -- !query schema -struct +struct -- !query output 1 NULL @@ -168,9 +168,9 @@ NULL -- !query table t -|> select (select any_value(a) from other where x = a limit 1) +|> select (select any_value(a) from other where x = a limit 1) as result -- !query schema -struct +struct -- !query output 1 NULL @@ -188,9 +188,9 @@ struct -- !query table t -|> select first_value(x) over (partition by y) +|> select first_value(x) over (partition by y) as result -- !query schema -struct +struct -- !query output 0 1 From fac88af886b46eb72f33b0294a7f3ba107ca9d1b Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Tue, 10 Sep 2024 16:30:18 -0700 Subject: [PATCH 04/10] exclude flaky ThriftServerQueryTestSuite for new golden file exclude flaky ThriftServerQueryTestSuite for new golden file --- .../sql/hive/thriftserver/ThriftServerQueryTestSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 026b2388c593c..331572e62f566 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -103,7 +103,8 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServ // SPARK-42921 "timestampNTZ/datetime-special-ansi.sql", // SPARK-47264 - "collations.sql" + "collations.sql", + "pipe-operators.sql" ) override def runQueries( From 51a01d122400e601c2a085520d559aeefb61ba72 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Tue, 10 Sep 2024 16:32:20 -0700 Subject: [PATCH 05/10] respond to code review comments --- .../scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 0ca3a0afffe51..b902cb4e063b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -5697,6 +5697,8 @@ class AstBuilder extends DataTypeAstBuilder havingClause = null, windowClause = null, left) match { + // The input should always be a projection since we only pass a context for the SELECT + // clause here and pass "null" for all other clauses. case p: Project => p.copy(child = PipeOperatorSelect(p.child)) } From 0ee5fc4bdb645e41043d16421ed936c9513fa45f Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Wed, 11 Sep 2024 11:41:01 -0700 Subject: [PATCH 06/10] respond to code review comments --- .../scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index b902cb4e063b1..576dc9da539a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -5701,6 +5701,8 @@ class AstBuilder extends DataTypeAstBuilder // clause here and pass "null" for all other clauses. case p: Project => p.copy(child = PipeOperatorSelect(p.child)) + case other => + throw SparkException.internalError(s"Unrecognized matched logical plan: $other") } }.get } From 557bd0c016cda1c1992295013a7fbfbaf6f90deb Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Wed, 11 Sep 2024 20:15:24 -0700 Subject: [PATCH 07/10] switch to expression switch to expression switch to expression switch to expression moving error checking to checkanalysis --- .../sql/catalyst/analysis/Analyzer.scala | 19 +---- .../sql/catalyst/expressions/PipeSelect.scala | 47 ++++++++++++ .../sql/catalyst/parser/AstBuilder.scala | 16 +++- .../plans/logical/PipeOperatorSelect.scala | 32 -------- .../sql/catalyst/rules/RuleIdCollection.scala | 1 - .../analyzer-results/pipe-operators.sql.out | 76 ++++++++++++++----- .../sql-tests/inputs/pipe-operators.sql | 15 ++++ .../sql-tests/results/pipe-operators.sql.out | 48 ++++++++++-- 8 files changed, 177 insertions(+), 77 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PipeSelect.scala delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PipeOperatorSelect.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e58a79fb4a39b..0164af945ca28 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -344,7 +344,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor extendedResolutionRules : _*), Batch("Remove TempResolvedColumn", Once, RemoveTempResolvedColumn), Batch("Post-Hoc Resolution", Once, - Seq(ResolveCommandsWithIfExists, RemovePipeOperators) ++ + Seq(ResolveCommandsWithIfExists) ++ postHocResolutionRules: _*), Batch("Remove Unresolved Hints", Once, new ResolveHints.RemoveAllHints), @@ -2727,12 +2727,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor t => t.containsAnyPattern(AGGREGATE_EXPRESSION, PYTHON_UDF) && t.containsPattern(PROJECT), ruleId) { case Project(projectList, child) if containsAggregates(projectList) => - if (child.isInstanceOf[PipeOperatorSelect]) { - // If we used the pipe operator |> SELECT clause to specify an aggregate function, this is - // invalid; return an error message instructing the user to use the pipe operator - // |> AGGREGATE clause for this purpose instead. - throw QueryCompilationErrors.pipeOperatorSelectContainsAggregateFunction(projectList.head) - } Aggregate(Nil, projectList, child) } @@ -2753,17 +2747,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor } } - /** - * Removes placeholder PipeOperator* logical plan nodes and checks invariants. - */ - object RemovePipeOperators extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( - _.containsPattern(PIPE_OPERATOR_SELECT), ruleId) { - case PipeOperatorSelect(child) => - child - } - } - /** * This rule finds aggregate expressions that are not in an aggregate operator. For example, * those in a HAVING clause or ORDER BY clause. These expressions are pushed down to the diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PipeSelect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PipeSelect.scala new file mode 100644 index 0000000000000..af29704ae955f --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PipeSelect.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction +import org.apache.spark.sql.catalyst.trees.TreePattern.{PIPE_OPERATOR_SELECT, RUNTIME_REPLACEABLE, TreePattern} +import org.apache.spark.sql.errors.QueryCompilationErrors + +/** + * Represents a SELECT clause when used with the |> SQL pipe operator. + * We use this to make sure that no aggregate functions exist in the SELECT expressions. + */ +case class PipeSelect(child: Expression) + extends UnaryExpression with RuntimeReplaceable { + final override val nodePatterns: Seq[TreePattern] = Seq(PIPE_OPERATOR_SELECT, RUNTIME_REPLACEABLE) + override def withNewChildInternal(newChild: Expression): Expression = PipeSelect(newChild) + override def replacement: Expression = { + def visit(e: Expression): Unit = e match { + case a: AggregateFunction => + // If we used the pipe operator |> SELECT clause to specify an aggregate function, this is + // invalid; return an error message instructing the user to use the pipe operator + // |> AGGREGATE clause for this purpose instead. + throw QueryCompilationErrors.pipeOperatorSelectContainsAggregateFunction(a) + case _: WindowExpression => + // Window functions are allowed in pipe SELECT operators, so do not traverse into children. + case _ => + e.children.foreach(visit) + } + visit(child) + child + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 95e039688ee0b..7d77b7be8ab76 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -5721,6 +5721,16 @@ class AstBuilder extends DataTypeAstBuilder operationNotAllowed("Operator pipe SQL syntax using |>", ctx) } Option(ctx.selectClause).map { c => + def updateProject(p: Project): Project = { + val newProjectList: Seq[NamedExpression] = p.projectList.map { + case a: Alias => + a.withNewChildren(Seq(PipeSelect(a.child))) + .asInstanceOf[NamedExpression] + case other => + other + } + p.copy(projectList = newProjectList) + } withSelectQuerySpecification( ctx = ctx, selectClause = c, @@ -5730,10 +5740,12 @@ class AstBuilder extends DataTypeAstBuilder havingClause = null, windowClause = null, left) match { - // The input should always be a projection since we only pass a context for the SELECT + // The input should generally be a projection since we only pass a context for the SELECT // clause here and pass "null" for all other clauses. case p: Project => - p.copy(child = PipeOperatorSelect(p.child)) + updateProject(p) + case d @ Distinct(p: Project) => + d.copy(child = updateProject(p)) case other => throw SparkException.internalError(s"Unrecognized matched logical plan: $other") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PipeOperatorSelect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PipeOperatorSelect.scala deleted file mode 100644 index cbd2ffe9c920e..0000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PipeOperatorSelect.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.plans.logical - -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.trees.TreePattern.{PIPE_OPERATOR_SELECT, TreePattern} - -/** - * Represents a SELECT clause when used with the |> SQL pipe operator. - * We use this operator to make sure that no aggregate functions exist in the SELECT expressions. - */ -case class PipeOperatorSelect(child: LogicalPlan) extends UnaryNode { - final override val nodePatterns: Seq[TreePattern] = Seq(PIPE_OPERATOR_SELECT) - override def output: Seq[Attribute] = child.output - override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = - PipeOperatorSelect(newChild) -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index 4db7fa23c5df7..c70b43f0db173 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -47,7 +47,6 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator" :: "org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions" :: "org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates" :: - "org.apache.spark.sql.catalyst.analysis.Analyzer$RemovePipeOperators" :: "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggAliasInGroupBy" :: "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions" :: "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases" :: diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out index 359d02308f0b2..384fc6aa84712 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out @@ -66,7 +66,7 @@ InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_d table t |> select 1 as x -- !query analysis -Project [1 AS x#x] +Project [pipeselect(1) AS x#x] +- SubqueryAlias spark_catalog.default.t +- Relation spark_catalog.default.t[x#x,y#x] csv @@ -85,7 +85,7 @@ table t |> select x, y |> select x + length(y) as z -- !query analysis -Project [(x#x + length(y#x)) AS z#x] +Project [pipeselect((x#x + length(y#x))) AS z#x] +- Project [x#x, y#x] +- SubqueryAlias spark_catalog.default.t +- Relation spark_catalog.default.t[x#x,y#x] csv @@ -95,7 +95,7 @@ Project [(x#x + length(y#x)) AS z#x] values (0), (1) tab(col) |> select col * 2 as result -- !query analysis -Project [(col#x * 2) AS result#x] +Project [pipeselect((col#x * 2)) AS result#x] +- SubqueryAlias tab +- LocalRelation [col#x] @@ -104,7 +104,7 @@ Project [(col#x * 2) AS result#x] (select * from t union all select * from t) |> select x + length(y) as result -- !query analysis -Project [(x#x + length(y#x)) AS result#x] +Project [pipeselect((x#x + length(y#x))) AS result#x] +- Union false, false :- Project [x#x, y#x] : +- SubqueryAlias spark_catalog.default.t @@ -155,7 +155,7 @@ Project [col#x.i1 AS i1#x] table t |> select (select a from other where x = a limit 1) as result -- !query analysis -Project [scalar-subquery#x [x#x] AS result#x] +Project [pipeselect(scalar-subquery#x [x#x]) AS result#x] : +- GlobalLimit 1 : +- LocalLimit 1 : +- Project [a#x] @@ -170,7 +170,7 @@ Project [scalar-subquery#x [x#x] AS result#x] table t |> select (select any_value(a) from other where x = a limit 1) as result -- !query analysis -Project [scalar-subquery#x [x#x] AS result#x] +Project [pipeselect(scalar-subquery#x [x#x]) AS result#x] : +- GlobalLimit 1 : +- LocalLimit 1 : +- Aggregate [any_value(a#x, false) AS any_value(a)#x] @@ -185,8 +185,8 @@ Project [scalar-subquery#x [x#x] AS result#x] table t |> select x + length(x) as z, z + 1 as plus_one -- !query analysis -Project [z#x, (z#x + 1) AS plus_one#x] -+- Project [x#x, y#x, (x#x + length(cast(x#x as string))) AS z#x] +Project [z#x, pipeselect((z#x + 1)) AS plus_one#x] ++- Project [x#x, y#x, pipeselect((x#x + length(cast(x#x as string)))) AS z#x] +- SubqueryAlias spark_catalog.default.t +- Relation spark_catalog.default.t[x#x,y#x] csv @@ -196,13 +196,55 @@ table t |> select first_value(x) over (partition by y) as result -- !query analysis Project [result#x] -+- Project [x#x, y#x, result#x, result#x] - +- Window [first_value(x#x, false) windowspecdefinition(y#x, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS result#x], [y#x] ++- Project [x#x, y#x, _we0#x, pipeselect(_we0#x) AS result#x] + +- Window [first_value(x#x, false) windowspecdefinition(y#x, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#x], [y#x] +- Project [x#x, y#x] +- SubqueryAlias spark_catalog.default.t +- Relation spark_catalog.default.t[x#x,y#x] csv +-- !query +select 1 x, 2 y, 3 z +|> select 1 + sum(x) over (), + avg(y) over (), + x, + avg(x+1) over (partition by y order by z) AS a2 +|> select a2 +-- !query analysis +Project [a2#x] ++- Project [(1 + sum(x) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))#xL, avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x, x#x, a2#x] + +- Project [x#x, y#x, _w1#x, z#x, _we0#xL, avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x, _we2#x, (cast(1 as bigint) + _we0#xL) AS (1 + sum(x) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))#xL, avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x, pipeselect(_we2#x) AS a2#x] + +- Window [avg(_w1#x) windowspecdefinition(y#x, z#x ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we2#x], [y#x], [z#x ASC NULLS FIRST] + +- Window [sum(x#x) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#xL, avg(y#x) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x] + +- Project [x#x, y#x, (x#x + 1) AS _w1#x, z#x] + +- Project [1 AS x#x, 2 AS y#x, 3 AS z#x] + +- OneRowRelation + + +-- !query +table t +|> select x, count(*) over () +|> select x +-- !query analysis +Project [x#x] ++- Project [x#x, count(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#xL] + +- Project [x#x, count(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#xL, count(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#xL] + +- Window [count(1) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS count(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#xL] + +- Project [x#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select distinct x, y +-- !query analysis +Distinct ++- Project [x#x, y#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + -- !query table t |> select sum(x) as result @@ -212,14 +254,14 @@ org.apache.spark.sql.AnalysisException "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", "sqlState" : "0A000", "messageParameters" : { - "expr" : "sum(x#x) AS result#xL" + "expr" : "sum(x#x)" }, "queryContext" : [ { "objectType" : "", "objectName" : "", "startIndex" : 19, - "stopIndex" : 34, - "fragment" : "sum(x) as result" + "stopIndex" : 24, + "fragment" : "sum(x)" } ] } @@ -233,14 +275,14 @@ org.apache.spark.sql.AnalysisException "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", "sqlState" : "0A000", "messageParameters" : { - "expr" : "y#x" + "expr" : "sum(x#x)" }, "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 19, - "stopIndex" : 19, - "fragment" : "y" + "startIndex" : 34, + "stopIndex" : 39, + "fragment" : "sum(x)" } ] } diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql index 087a3d322d080..1eb2595c46c3d 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql @@ -67,6 +67,21 @@ table t table t |> select first_value(x) over (partition by y) as result; +select 1 x, 2 y, 3 z +|> select 1 + sum(x) over (), + avg(y) over (), + x, + avg(x+1) over (partition by y order by z) AS a2 +|> select a2; + +table t +|> select x, count(*) over () +|> select x; + +-- DISTINCT is supported. +table t +|> select distinct x, y; + -- Selection operators: negative tests. --------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out index 2b22e49dec72b..1900653e189b9 100644 --- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out @@ -196,6 +196,40 @@ struct 1 +-- !query +select 1 x, 2 y, 3 z +|> select 1 + sum(x) over (), + avg(y) over (), + x, + avg(x+1) over (partition by y order by z) AS a2 +|> select a2 +-- !query schema +struct +-- !query output +2.0 + + +-- !query +table t +|> select x, count(*) over () +|> select x +-- !query schema +struct +-- !query output +0 +1 + + +-- !query +table t +|> select distinct x, y +-- !query schema +struct +-- !query output +0 abc +1 def + + -- !query table t |> select sum(x) as result @@ -207,14 +241,14 @@ org.apache.spark.sql.AnalysisException "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", "sqlState" : "0A000", "messageParameters" : { - "expr" : "sum(x#x) AS result#xL" + "expr" : "sum(x#x)" }, "queryContext" : [ { "objectType" : "", "objectName" : "", "startIndex" : 19, - "stopIndex" : 34, - "fragment" : "sum(x) as result" + "stopIndex" : 24, + "fragment" : "sum(x)" } ] } @@ -230,14 +264,14 @@ org.apache.spark.sql.AnalysisException "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", "sqlState" : "0A000", "messageParameters" : { - "expr" : "y#x" + "expr" : "sum(x#x)" }, "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 19, - "stopIndex" : 19, - "fragment" : "y" + "startIndex" : 34, + "stopIndex" : 39, + "fragment" : "sum(x)" } ] } From d0c375d8adb4b1a19eb5f9799c974bec0393ec51 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Thu, 12 Sep 2024 21:14:40 -0700 Subject: [PATCH 08/10] respond to code review comments respond to code review comments respond to code review comments --- .../sql/catalyst/expressions/PipeSelect.scala | 2 +- .../sql/catalyst/parser/AstBuilder.scala | 57 ++++++++++--------- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../analyzer-results/pipe-operators.sql.out | 10 ++++ .../sql-tests/inputs/pipe-operators.sql | 3 + .../sql-tests/results/pipe-operators.sql.out | 8 +++ .../apache/spark/sql/SQLQueryTestSuite.scala | 3 - .../sql/execution/SparkSqlParserSuite.scala | 14 ++++- 8 files changed, 64 insertions(+), 35 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PipeSelect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PipeSelect.scala index af29704ae955f..0b5479cc8f0ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PipeSelect.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PipeSelect.scala @@ -29,7 +29,7 @@ case class PipeSelect(child: Expression) extends UnaryExpression with RuntimeReplaceable { final override val nodePatterns: Seq[TreePattern] = Seq(PIPE_OPERATOR_SELECT, RUNTIME_REPLACEABLE) override def withNewChildInternal(newChild: Expression): Expression = PipeSelect(newChild) - override def replacement: Expression = { + override lazy val replacement: Expression = { def visit(e: Expression): Unit = e match { case a: AggregateFunction => // If we used the pipe operator |> SELECT clause to specify an aggregate function, this is diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 7d77b7be8ab76..dc6530592443e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -425,7 +425,8 @@ class AstBuilder extends DataTypeAstBuilder ctx.aggregationClause, ctx.havingClause, ctx.windowClause, - plan + plan, + isPipeOperatorSelect = false ) } } @@ -1013,7 +1014,8 @@ class AstBuilder extends DataTypeAstBuilder ctx.aggregationClause, ctx.havingClause, ctx.windowClause, - from + from, + isPipeOperatorSelect = false ) } @@ -1100,7 +1102,8 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause, havingClause, windowClause, - isDistinct = false) + isDistinct = false, + isPipeOperatorSelect = false) ScriptTransformation( string(visitStringLit(transformClause.script)), @@ -1121,6 +1124,8 @@ class AstBuilder extends DataTypeAstBuilder * Add a regular (SELECT) query specification to a logical plan. The query specification * is the core of the logical plan, this is where sourcing (FROM clause), projection (SELECT), * aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place. + * If 'isPipeOperatorSelect' is true, wraps each projected expression with a [[PipeSelect]] + * expression for future validation of the expressions during analysis. * * Note that query hints are ignored (both by the parser and the builder). */ @@ -1132,7 +1137,8 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause: AggregationClauseContext, havingClause: HavingClauseContext, windowClause: WindowClauseContext, - relation: LogicalPlan): LogicalPlan = withOrigin(ctx) { + relation: LogicalPlan, + isPipeOperatorSelect: Boolean): LogicalPlan = withOrigin(ctx) { val isDistinct = selectClause.setQuantifier() != null && selectClause.setQuantifier().DISTINCT() != null @@ -1144,7 +1150,8 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause, havingClause, windowClause, - isDistinct) + isDistinct, + isPipeOperatorSelect) // Hint selectClause.hints.asScala.foldRight(plan)(withHints) @@ -1158,7 +1165,8 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause: AggregationClauseContext, havingClause: HavingClauseContext, windowClause: WindowClauseContext, - isDistinct: Boolean): LogicalPlan = { + isDistinct: Boolean, + isPipeOperatorSelect: Boolean): LogicalPlan = { // Add lateral views. val withLateralView = lateralView.asScala.foldLeft(relation)(withGenerate) @@ -1172,7 +1180,20 @@ class AstBuilder extends DataTypeAstBuilder } def createProject() = if (namedExpressions.nonEmpty) { - Project(namedExpressions, withFilter) + val newProjectList: Seq[NamedExpression] = if (isPipeOperatorSelect) { + // If this is a pipe operator |> SELECT clause, add a [[PipeSelect]] expression wrapping + // each alias in the project list, so the analyzer can check invariants later. + namedExpressions.map { + case a: Alias => + a.withNewChildren(Seq(PipeSelect(a.child))) + .asInstanceOf[NamedExpression] + case other => + other + } + } else { + namedExpressions + } + Project(newProjectList, withFilter) } else { withFilter } @@ -5721,16 +5742,6 @@ class AstBuilder extends DataTypeAstBuilder operationNotAllowed("Operator pipe SQL syntax using |>", ctx) } Option(ctx.selectClause).map { c => - def updateProject(p: Project): Project = { - val newProjectList: Seq[NamedExpression] = p.projectList.map { - case a: Alias => - a.withNewChildren(Seq(PipeSelect(a.child))) - .asInstanceOf[NamedExpression] - case other => - other - } - p.copy(projectList = newProjectList) - } withSelectQuerySpecification( ctx = ctx, selectClause = c, @@ -5739,16 +5750,8 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause = null, havingClause = null, windowClause = null, - left) match { - // The input should generally be a projection since we only pass a context for the SELECT - // clause here and pass "null" for all other clauses. - case p: Project => - updateProject(p) - case d @ Distinct(p: Project) => - d.copy(child = updateProject(p)) - case other => - throw SparkException.internalError(s"Unrecognized matched logical plan: $other") - } + relation = left, + isPipeOperatorSelect = true) }.get } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2f4cb8235363e..094fb8f050bc8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4996,7 +4996,7 @@ object SQLConf { "the sequence of steps that the query performs in a composable fashion.") .version("4.0.0") .booleanConf - .createWithDefault(false) + .createWithDefault(Utils.isTesting) val LEGACY_PERCENTILE_DISC_CALCULATION = buildConf("spark.sql.legacy.percentileDiscCalculation") .internal() diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out index 384fc6aa84712..ab0635fef048b 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out @@ -166,6 +166,16 @@ Project [pipeselect(scalar-subquery#x [x#x]) AS result#x] +- Relation spark_catalog.default.t[x#x,y#x] csv +-- !query +select (values (0) tab(col) |> select col) as result +-- !query analysis +Project [scalar-subquery#x [] AS result#x] +: +- Project [col#x] +: +- SubqueryAlias tab +: +- LocalRelation [col#x] ++- OneRowRelation + + -- !query table t |> select (select any_value(a) from other where x = a limit 1) as result diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql index 1eb2595c46c3d..7d0966e7f2095 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql @@ -54,6 +54,9 @@ table st table t |> select (select a from other where x = a limit 1) as result; +-- Pipe operator SELECT inside expression subqueries. +select (values (0) tab(col) |> select col) as result; + -- Aggregations are allowed within expression subqueries in the pipe operator SELECT list as long as -- no aggregate functions exist in the top-level select list. table t diff --git a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out index 1900653e189b9..7e0b7912105c2 100644 --- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out @@ -166,6 +166,14 @@ struct NULL +-- !query +select (values (0) tab(col) |> select col) as result +-- !query schema +struct +-- !query output +0 + + -- !query table t |> select (select any_value(a) from other where x = a limit 1) as result diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index a14a0e20f05cd..b031f45ddbf34 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -156,9 +156,6 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper // SPARK-39564: don't print out serde to avoid introducing complicated and error-prone // regex magic. .set("spark.test.noSerdeInExplain", "true") - // Enable operator pipe syntax in SQL golden file tests to get test coverage even if the feature - // is not otherwise enabled by default. - .set(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED, true) // SPARK-32106 Since we add SQL test 'transform.sql' will use `cat` command, // here we need to ignore it. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index 211c5023686b0..a80444feb68ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAlias, Un import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Concat, GreaterThan, Literal, NullsFirst, SortOrder, UnresolvedWindowExpression, UnspecifiedFrame, WindowSpecDefinition, WindowSpecReference} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, PROJECT, UNRESOLVED_RELATION} import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, RefreshResource} @@ -884,9 +885,16 @@ class SparkSqlParserSuite extends AnalysisTest with SharedSparkSession { test("Operator pipe SQL syntax") { withSQLConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED.key -> "true") { // Basic selection. - parser.parsePlan("TABLE t |> SELECT 1 AS X") - parser.parsePlan("TABLE t |> SELECT 1 AS X, 2 AS Y |> SELECT X + Y AS Z") - parser.parsePlan("VALUES (0), (1) tab(col) |> SELECT col * 2 AS result") + // Here we check that every parsed plan contains a projection and a source relation or + // inline table. + def checkPipeSelect(query: String): Unit = { + val plan: LogicalPlan = parser.parsePlan(query) + assert(plan.containsPattern(PROJECT)) + assert(plan.containsAnyPattern(UNRESOLVED_RELATION, LOCAL_RELATION)) + } + checkPipeSelect("TABLE t |> SELECT 1 AS X") + checkPipeSelect("TABLE t |> SELECT 1 AS X, 2 AS Y |> SELECT X + Y AS Z") + checkPipeSelect("VALUES (0), (1) tab(col) |> SELECT col * 2 AS result") } } } From 9340a373dbbff2d3c43d433646da99f0a8a4f1a6 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Fri, 13 Sep 2024 11:54:51 -0700 Subject: [PATCH 09/10] respond to code review comments --- .../sql/hive/thriftserver/ThriftServerQueryTestSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 331572e62f566..026b2388c593c 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -103,8 +103,7 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServ // SPARK-42921 "timestampNTZ/datetime-special-ansi.sql", // SPARK-47264 - "collations.sql", - "pipe-operators.sql" + "collations.sql" ) override def runQueries( From bb8c706758de2967ffcdaa5b41d78dc12f78c4f1 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Fri, 13 Sep 2024 17:25:34 -0700 Subject: [PATCH 10/10] re-enable ThriftServerQueryTestSuite blocklist --- .../sql/hive/thriftserver/ThriftServerQueryTestSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 026b2388c593c..331572e62f566 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -103,7 +103,8 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServ // SPARK-42921 "timestampNTZ/datetime-special-ansi.sql", // SPARK-47264 - "collations.sql" + "collations.sql", + "pipe-operators.sql" ) override def runQueries(