Skip to content

[SPARK-51884][SQL]Part 1.a Add outer scope attributes for SubqueryExpression #50822

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2326,7 +2326,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
private def resolveSubQuery(
e: SubqueryExpression,
outer: LogicalPlan)(
f: (LogicalPlan, Seq[Expression]) => SubqueryExpression): SubqueryExpression = {
f: (LogicalPlan, Seq[(Expression, Boolean)]) => SubqueryExpression): SubqueryExpression = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment here to explain what this boolean means?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a separate class to store this would be even better

val newSubqueryPlan = AnalysisContext.withOuterPlan(outer) {
executeSameContext(e.plan)
}
Expand All @@ -2335,7 +2335,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
// them as children of SubqueryExpression.
if (newSubqueryPlan.resolved) {
// Record the outer references as children of subquery expression.
f(newSubqueryPlan, SubExprUtils.getOuterReferences(newSubqueryPlan))
f(newSubqueryPlan, SubExprUtils.getOuterReferences(newSubqueryPlan).map((_, true)))
} else {
e.withNewPlan(newSubqueryPlan)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ object ValidateSubqueryExpression
checkOuterReference(plan, expr)

expr match {
case ScalarSubquery(query, outerAttrs, _, _, _, _, _) =>
case ScalarSubquery(query, rawOuterAttrs, _, _, _, _, _) =>
val outerAttrs = rawOuterAttrs.map(_._1)
// Scalar subquery must return one column as output.
if (query.output.size != 1) {
throw QueryCompilationErrors.subqueryReturnMoreThanOneColumn(query.output.size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class ExpressionResolutionValidator(resolutionValidator: ResolutionValidator) {
resolutionValidator.validate(scalarSubquery.plan)
}

for (outerAttribute <- scalarSubquery.outerAttrs) {
for (outerAttribute <- scalarSubquery.getOuterAttrs) {
validate(outerAttribute)
}

Expand All @@ -163,7 +163,7 @@ class ExpressionResolutionValidator(resolutionValidator: ResolutionValidator) {
resolutionValidator.validate(listQuery.plan)
}

for (outerAttribute <- listQuery.outerAttrs) {
for (outerAttribute <- listQuery.getOuterAttrs) {
validate(outerAttribute)
}
}
Expand All @@ -173,7 +173,7 @@ class ExpressionResolutionValidator(resolutionValidator: ResolutionValidator) {
resolutionValidator.validate(exists.plan)
}

for (outerAttribute <- exists.outerAttrs) {
for (outerAttribute <- exists.getOuterAttrs) {
validate(outerAttribute)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class SubqueryExpressionResolver(expressionResolver: ExpressionResolver, resolve

val resolvedScalarSubquery = unresolvedScalarSubquery.copy(
plan = resolvedSubqueryExpressionPlan.plan,
outerAttrs = resolvedSubqueryExpressionPlan.outerExpressions
outerAttrs = resolvedSubqueryExpressionPlan.outerExpressions.map((_, true))
)

val coercedScalarSubquery =
Expand Down Expand Up @@ -108,7 +108,7 @@ class SubqueryExpressionResolver(expressionResolver: ExpressionResolver, resolve

unresolvedListQuery.copy(
plan = resolvedSubqueryExpressionPlan.plan,
outerAttrs = resolvedSubqueryExpressionPlan.outerExpressions,
outerAttrs = resolvedSubqueryExpressionPlan.outerExpressions.map((_, true)),
numCols = resolvedSubqueryExpressionPlan.output.size
)
}
Expand All @@ -125,7 +125,7 @@ class SubqueryExpressionResolver(expressionResolver: ExpressionResolver, resolve

val resolvedExists = unresolvedExists.copy(
plan = resolvedSubqueryExpressionPlan.plan,
outerAttrs = resolvedSubqueryExpressionPlan.outerExpressions
outerAttrs = resolvedSubqueryExpressionPlan.outerExpressions.map((_, true))
)

val coercedExists = typeCoercionResolver.resolve(resolvedExists).asInstanceOf[Exists]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ case class DynamicPruningSubquery(
onlyInBroadcast: Boolean,
exprId: ExprId = NamedExpression.newExprId,
hint: Option[HintInfo] = None)
extends SubqueryExpression(buildQuery, Seq(pruningKey), exprId, Seq.empty, hint)
extends SubqueryExpression(buildQuery, Seq(pruningKey).map((_, true)), exprId, Seq.empty, hint)
with DynamicPruning
with Unevaluable
with UnaryLike[Expression] {
Expand All @@ -60,10 +60,12 @@ case class DynamicPruningSubquery(

override def withNewPlan(plan: LogicalPlan): DynamicPruningSubquery = copy(buildQuery = plan)

override def withNewOuterAttrs(outerAttrs: Seq[Expression]): DynamicPruningSubquery = {
override def withNewOuterAttrs(outerAttrs: Seq[(Expression, Boolean)]): DynamicPruningSubquery = {
// Updating outer attrs of DynamicPruningSubquery is unsupported; assert that they match
// pruningKey and return a copy without any changes.
assert(outerAttrs.size == 1 && outerAttrs.head.semanticEquals(pruningKey))
assert(outerAttrs.size == 1)
val (expr, notOuterScope) = outerAttrs.head
assert(expr.semanticEquals(pruningKey) && notOuterScope)
copy()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ import org.apache.spark.sql.types.DataType
*
* @param plan the logical plan provided as input for the table argument as either a logical
* relation or as a more complex logical plan in the event of a table subquery.
* @param outerAttrs outer references of this subquery plan, generally empty since these table
* arguments do not allow correlated references currently
* @param outerAttrs the outer references in the subquery plan and a boolean flag marking whether
* the outer reference can be resolved in its immediate parent plan or not,
* generally empty since these table arguments do not allow correlated
* references currently.
* @param exprId expression ID of this subquery expression, generally generated afresh each time
* @param partitionByExpressions if non-empty, the TABLE argument included the PARTITION BY clause
* to indicate that the input relation should be repartitioned by the
Expand All @@ -66,7 +68,7 @@ import org.apache.spark.sql.types.DataType
*/
case class FunctionTableSubqueryArgumentExpression(
plan: LogicalPlan,
outerAttrs: Seq[Expression] = Seq.empty,
outerAttrs: Seq[(Expression, Boolean)] = Seq.empty,
exprId: ExprId = NamedExpression.newExprId,
partitionByExpressions: Seq[Expression] = Seq.empty,
withSinglePartition: Boolean = false,
Expand All @@ -78,28 +80,37 @@ case class FunctionTableSubqueryArgumentExpression(
"WITH SINGLE PARTITION is mutually exclusive with PARTITION BY")

override def dataType: DataType = plan.schema

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we revert these new-line diffs?

override def nullable: Boolean = false

override def withNewPlan(plan: LogicalPlan): FunctionTableSubqueryArgumentExpression =
copy(plan = plan)
override def withNewOuterAttrs(outerAttrs: Seq[Expression])

override def withNewOuterAttrs(outerAttrs: Seq[(Expression, Boolean)])
: FunctionTableSubqueryArgumentExpression = copy(outerAttrs = outerAttrs)

override def hint: Option[HintInfo] = None

override def withNewHint(hint: Option[HintInfo]): FunctionTableSubqueryArgumentExpression =
copy()

override def toString: String = s"table-argument#${exprId.id} $conditionString"

override lazy val canonicalized: Expression = {
FunctionTableSubqueryArgumentExpression(
plan.canonicalized,
outerAttrs.map(_.canonicalized),
outerAttrs.map {case (expr, b) => (expr.canonicalized, b)},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not use b, but give it a meaningful name

ExprId(0),
partitionByExpressions,
withSinglePartition,
orderByExpressions)
}

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): FunctionTableSubqueryArgumentExpression =
copy(outerAttrs = newChildren)
newChildren: IndexedSeq[Expression]): FunctionTableSubqueryArgumentExpression = {
val newOuterAttrs = newChildren.take(outerAttrs.size).zip(outerAttrs.map(_._2))
copy(outerAttrs = newOuterAttrs)
}

final override def nodePatternsInternal(): Seq[TreePattern] =
Seq(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION)
Expand Down
Loading