Skip to content

Commit

Permalink
[SPARK-49560][SQL] Add SQL pipe syntax for the TABLESAMPLE operator
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

WIP

This PR adds SQL pipe syntax support for the TABLESAMPLE operator.

For example:

```
CREATE TABLE t(x INT, y STRING) USING CSV;
INSERT INTO t VALUES (0, 'abc'), (1, 'def');

TABLE t
|> TABLESAMPLE (100 PERCENT) REPEATABLE (0)
|> TABLESAMPLE (5 ROWS) REPEATABLE (0)
|> TABLESAMPLE (BUCKET 1 OUT OF 1) REPEATABLE (0);

0	abc
1	def
```

### Why are the changes needed?

The SQL pipe operator syntax will let users compose queries in a more flexible fashion.

### Does this PR introduce _any_ user-facing change?

Yes, see above.

### How was this patch tested?

This PR adds a few unit test cases, but mostly relies on golden file test coverage. I did this to make sure the answers are correct as this feature is implemented and also so we can look at the analyzer output plans to ensure they look right as well.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#48168 from dtenedor/pipe-tablesample.

Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
dtenedor authored and attilapiros committed Oct 4, 2024
1 parent 9d3267b commit aaec833
Show file tree
Hide file tree
Showing 6 changed files with 444 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,7 @@ operatorPipeRightSide
// messages in the event that both are present (this is not allowed).
| pivotClause unpivotClause?
| unpivotClause pivotClause?
| sample
;

// When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5903,7 +5903,9 @@ class AstBuilder extends DataTypeAstBuilder
throw QueryParsingErrors.unpivotWithPivotInFromClauseNotAllowedError(ctx)
}
withUnpivot(c, left)
}.get)))
}.getOrElse(Option(ctx.sample).map { c =>
withSample(c, left)
}.get))))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,190 @@ org.apache.spark.sql.catalyst.parser.ParseException
}


-- !query
table t
|> tablesample (100 percent) repeatable (0)
-- !query analysis
Sample 0.0, 1.0, false, 0
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> tablesample (2 rows) repeatable (0)
-- !query analysis
GlobalLimit 2
+- LocalLimit 2
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> tablesample (bucket 1 out of 1) repeatable (0)
-- !query analysis
Sample 0.0, 1.0, false, 0
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> tablesample (100 percent) repeatable (0)
|> tablesample (5 rows) repeatable (0)
|> tablesample (bucket 1 out of 1) repeatable (0)
-- !query analysis
Sample 0.0, 1.0, false, 0
+- GlobalLimit 5
+- LocalLimit 5
+- Sample 0.0, 1.0, false, 0
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> tablesample ()
-- !query analysis
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "_LEGACY_ERROR_TEMP_0014",
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 12,
"stopIndex" : 25,
"fragment" : "tablesample ()"
} ]
}


-- !query
table t
|> tablesample (-100 percent)
-- !query analysis
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "_LEGACY_ERROR_TEMP_0064",
"messageParameters" : {
"msg" : "Sampling fraction (-1.0) must be on interval [0, 1]"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 12,
"stopIndex" : 37,
"fragment" : "tablesample (-100 percent)"
} ]
}


-- !query
table t
|> tablesample (-5 rows)
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "INVALID_LIMIT_LIKE_EXPRESSION.IS_NEGATIVE",
"sqlState" : "42K0E",
"messageParameters" : {
"expr" : "\"-5\"",
"name" : "limit",
"v" : "-5"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 25,
"stopIndex" : 26,
"fragment" : "-5"
} ]
}


-- !query
table t
|> tablesample (x rows)
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "INVALID_LIMIT_LIKE_EXPRESSION.IS_UNFOLDABLE",
"sqlState" : "42K0E",
"messageParameters" : {
"expr" : "\"x\"",
"name" : "limit"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 25,
"stopIndex" : 25,
"fragment" : "x"
} ]
}


-- !query
table t
|> tablesample (bucket 2 out of 1)
-- !query analysis
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "_LEGACY_ERROR_TEMP_0064",
"messageParameters" : {
"msg" : "Sampling fraction (2.0) must be on interval [0, 1]"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 12,
"stopIndex" : 42,
"fragment" : "tablesample (bucket 2 out of 1)"
} ]
}


-- !query
table t
|> tablesample (200b) repeatable (0)
-- !query analysis
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "_LEGACY_ERROR_TEMP_0015",
"messageParameters" : {
"msg" : "byteLengthLiteral"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 12,
"stopIndex" : 44,
"fragment" : "tablesample (200b) repeatable (0)"
} ]
}


-- !query
table t
|> tablesample (200) repeatable (0)
-- !query analysis
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "_LEGACY_ERROR_TEMP_0016",
"messageParameters" : {
"bytesStr" : "200"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 12,
"stopIndex" : 43,
"fragment" : "tablesample (200) repeatable (0)"
} ]
}


-- !query
drop table t
-- !query analysis
Expand Down
49 changes: 49 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,55 @@ table courseSales
for `year` in (2012, 2013)
);

-- Sampling operators: positive tests.
--------------------------------------

-- We will use the REPEATABLE clause and/or adjust the sampling options to either remove no rows or
-- all rows to help keep the tests deterministic.
table t
|> tablesample (100 percent) repeatable (0);

table t
|> tablesample (2 rows) repeatable (0);

table t
|> tablesample (bucket 1 out of 1) repeatable (0);

table t
|> tablesample (100 percent) repeatable (0)
|> tablesample (5 rows) repeatable (0)
|> tablesample (bucket 1 out of 1) repeatable (0);

-- Sampling operators: negative tests.
--------------------------------------

-- The sampling method is required.
table t
|> tablesample ();

-- Negative sampling options are not supported.
table t
|> tablesample (-100 percent);

table t
|> tablesample (-5 rows);

-- The sampling method may not refer to attribute names from the input relation.
table t
|> tablesample (x rows);

-- The bucket number is invalid.
table t
|> tablesample (bucket 2 out of 1);

-- Byte literals are not supported.
table t
|> tablesample (200b) repeatable (0);

-- Invalid byte literal syntax.
table t
|> tablesample (200) repeatable (0);

-- Cleanup.
-----------
drop table t;
Expand Down
Loading

0 comments on commit aaec833

Please sign in to comment.