Skip to content

[SPARK-51552] [SQL] Disallow temporary variables in persisted views when under identifier #50325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper, Expression}
import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper, Expression, SubqueryExpression, VariableReference}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{CreateView, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_IDENTIFIER
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StringType

/**
Expand All @@ -34,15 +38,70 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch]
override def batches: Seq[Batch] = earlyBatches.asInstanceOf[Seq[Batch]]
}

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
_.containsPattern(UNRESOLVED_IDENTIFIER)) {
case p: PlanWithUnresolvedIdentifier if p.identifierExpr.resolved && p.childrenResolved =>
executor.execute(p.planBuilder.apply(evalIdentifierExpr(p.identifierExpr), p.children))
case other =>
other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER)) {
case e: ExpressionWithUnresolvedIdentifier if e.identifierExpr.resolved =>
e.exprBuilder.apply(evalIdentifierExpr(e.identifierExpr), e.otherExprs)
}
override def apply(plan: LogicalPlan): LogicalPlan = {
plan match {
case createView: CreateView =>
if (conf.getConf(SQLConf.VARIABLES_UNDER_IDENTIFIER_IN_VIEW)) {
apply0(createView)
} else {
val referredTempVars = new mutable.ArrayBuffer[Seq[String]]
val analyzedChild = apply0(createView.child)
val analyzedQuery = apply0(createView.query, Some(referredTempVars))
if (referredTempVars.nonEmpty) {
throw QueryCompilationErrors.notAllowedToCreatePermanentViewByReferencingTempVarError(
Seq("unknown"),
referredTempVars.head
)
}
createView.copy(child = analyzedChild, query = analyzedQuery)
}
case _ => apply0(plan)
}
}

private def apply0(
plan: LogicalPlan,
referredTempVars: Option[mutable.ArrayBuffer[Seq[String]]] = None): LogicalPlan =
plan.resolveOperatorsUpWithPruning(_.containsPattern(UNRESOLVED_IDENTIFIER)) {
case p: PlanWithUnresolvedIdentifier if p.identifierExpr.resolved && p.childrenResolved =>

if (referredTempVars.isDefined) {
referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p)
}

executor.execute(p.planBuilder.apply(evalIdentifierExpr(p.identifierExpr), p.children))
case other =>
other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER)) {
case e: ExpressionWithUnresolvedIdentifier if e.identifierExpr.resolved =>

if (referredTempVars.isDefined) {
referredTempVars.get ++= collectTemporaryVariablesInExpressionTree(e)
}

e.exprBuilder.apply(evalIdentifierExpr(e.identifierExpr), e.otherExprs)
}
}

private def collectTemporaryVariablesInLogicalPlan(child: LogicalPlan): Seq[Seq[String]] = {
def collectTempVars(child: LogicalPlan): Seq[Seq[String]] = {
child.flatMap { plan =>
plan.expressions.flatMap { e => collectTemporaryVariablesInExpressionTree(e) }
}.distinct
}
collectTempVars(child)
}

private def collectTemporaryVariablesInExpressionTree(child: Expression): Seq[Seq[String]] = {
def collectTempVars(child: Expression): Seq[Seq[String]] = {
child.flatMap { expr =>
expr.children.flatMap(_.flatMap {
case e: SubqueryExpression => collectTemporaryVariablesInLogicalPlan(e.plan)
case r: VariableReference => Seq(r.originalNameParts)
case _ => Seq.empty
})
}.distinct
}
collectTempVars(child)
}

private def evalIdentifierExpr(expr: Expression): Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3179,13 +3179,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
}

def notAllowedToCreatePermanentViewByReferencingTempVarError(
name: TableIdentifier,
varName: String): Throwable = {
nameParts: Seq[String],
varName: Seq[String]): Throwable = {
new AnalysisException(
errorClass = "INVALID_TEMP_OBJ_REFERENCE",
messageParameters = Map(
"obj" -> "VIEW",
"objName" -> toSQLId(name.nameParts),
"objName" -> toSQLId(nameParts),
"tempObj" -> "VARIABLE",
"tempObjName" -> toSQLId(varName)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5651,6 +5651,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val VARIABLES_UNDER_IDENTIFIER_IN_VIEW =
buildConf("spark.sql.legacy.allowSessionVariableInPersistedView")
.internal()
.doc(
"When set to true, variables can be found under identifiers in a view query. Throw " +
"otherwise."
)
.version("4.1.0")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ object ViewHelper extends SQLConfHelper with Logging {
val tempVars = collectTemporaryVariables(child)
tempVars.foreach { nameParts =>
throw QueryCompilationErrors.notAllowedToCreatePermanentViewByReferencingTempVarError(
name, nameParts.quoted)
name.nameParts, nameParts)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,40 @@ class PersistedViewTestSuite extends SQLViewTestSuite with SharedSparkSession {
}
}

test("SPARK-51552: Temporary variables under identifiers are not allowed in persisted view") {
sql("declare table_name = 'table';")
sql("create table identifier(table_name) (c1 int);")
sql("create view v_table_1 as select * from table")
sql("create view identifier('v_' || table_name || '_2') as select * from table")
checkError(
exception = intercept[AnalysisException] {
sql("create view v_table_3 as select * from identifier(table_name)")
},
condition = "INVALID_TEMP_OBJ_REFERENCE",
parameters = Map(
"obj" -> "VIEW",
"objName" -> "`unknown`",
"tempObj" -> "VARIABLE",
"tempObjName" -> "`table_name`"
)
)
checkError(
exception = intercept[AnalysisException] {
sql(
"""create view identifier('v_' || table_name || '_4')
|as select * from identifier(table_name);
|""".stripMargin)
},
condition = "INVALID_TEMP_OBJ_REFERENCE",
parameters = Map(
"obj" -> "VIEW",
"objName" -> "`unknown`",
"tempObj" -> "VARIABLE",
"tempObjName" -> "`table_name`"
)
)
}

def getShowCreateDDL(view: String, serde: Boolean = false): String = {
val result = if (serde) {
sql(s"SHOW CREATE TABLE $view AS SERDE")
Expand Down