Skip to content

Commit 00eafbd

Browse files
committed
[KYUUBI #7078] Make data masking and row filter configurable
1 parent cad5a39 commit 00eafbd

File tree

5 files changed

+104
-38
lines changed

5 files changed

+104
-38
lines changed

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/config/AuthzConfigurationChecker.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,16 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2222
import org.apache.spark.sql.execution.command.SetCommand
2323

2424
import org.apache.kyuubi.plugin.spark.authz.AccessControlException
25+
import org.apache.spark.authz.AuthzConf.CONF_RESTRICTED_LIST
2526

2627
/**
2728
* For banning end-users from set restricted spark configurations
2829
*/
2930
case class AuthzConfigurationChecker(spark: SparkSession) extends (LogicalPlan => Unit) {
3031

31-
final val RESTRICT_LIST_KEY = "spark.kyuubi.conf.restricted.list"
32-
3332
private val restrictedConfList: Set[String] =
34-
Set(RESTRICT_LIST_KEY, "spark.sql.runSQLOnFiles", "spark.sql.extensions") ++
35-
spark.conf.getOption(RESTRICT_LIST_KEY).map(_.split(',').toSet).getOrElse(Set.empty)
33+
Set(CONF_RESTRICTED_LIST.key, "spark.sql.runSQLOnFiles", "spark.sql.extensions") ++
34+
spark.conf.getOption(CONF_RESTRICTED_LIST.key).map(_.split(',').toSet).getOrElse(Set.empty)
3635

3736
override def apply(plan: LogicalPlan): Unit = plan match {
3837
case SetCommand(Some((

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/datamasking/RuleApplyDataMaskingStage0.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.kyuubi.plugin.spark.authz.rule.datamasking
1919

20+
import org.apache.spark.authz.AuthzConf.dataMaskingEnabled
2021
import org.apache.spark.sql.SparkSession
2122
import org.apache.spark.sql.catalyst.expressions.Alias
2223
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -45,15 +46,19 @@ import org.apache.kyuubi.plugin.spark.authz.serde._
4546
case class RuleApplyDataMaskingStage0(spark: SparkSession) extends RuleHelper {
4647

4748
override def apply(plan: LogicalPlan): LogicalPlan = {
48-
val newPlan = mapChildren(plan) {
49-
case p: DataMaskingStage0Marker => p
50-
case p: DataMaskingStage1Marker => p
51-
case scan if isKnownScan(scan) && scan.resolved =>
52-
val tables = getScanSpec(scan).tables(scan, spark)
53-
tables.headOption.map(applyMasking(scan, _)).getOrElse(scan)
54-
case other => apply(other)
49+
if (!dataMaskingEnabled(conf)) {
50+
plan
51+
} else {
52+
val newPlan = mapChildren(plan) {
53+
case p: DataMaskingStage0Marker => p
54+
case p: DataMaskingStage1Marker => p
55+
case scan if isKnownScan(scan) && scan.resolved =>
56+
val tables = getScanSpec(scan).tables(scan, spark)
57+
tables.headOption.map(applyMasking(scan, _)).getOrElse(scan)
58+
case other => apply(other)
59+
}
60+
newPlan
5561
}
56-
newPlan
5762
}
5863

5964
private def applyMasking(

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/datamasking/RuleApplyDataMaskingStage1.scala

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.kyuubi.plugin.spark.authz.rule.datamasking
1919

20+
import org.apache.spark.authz.AuthzConf.dataMaskingEnabled
2021
import org.apache.spark.sql.SparkSession
2122
import org.apache.spark.sql.catalyst.expressions.NamedExpression
2223
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
@@ -33,25 +34,28 @@ import org.apache.kyuubi.plugin.spark.authz.serde._
3334
case class RuleApplyDataMaskingStage1(spark: SparkSession) extends RuleHelper {
3435

3536
override def apply(plan: LogicalPlan): LogicalPlan = {
36-
37-
plan match {
38-
case marker0: DataMaskingStage0Marker => marker0
39-
case marker1: DataMaskingStage1Marker => marker1
40-
case cmd if isKnownTableCommand(cmd) =>
41-
val tableCommandSpec = getTableCommandSpec(cmd)
42-
val queries = tableCommandSpec.queries(cmd)
43-
cmd.mapChildren {
44-
case marker0: DataMaskingStage0Marker => marker0
45-
case marker1: DataMaskingStage1Marker => marker1
46-
case query if queries.contains(query) && query.resolved =>
47-
applyDataMasking(query)
48-
case o => o
49-
}
50-
case cmd: Command if cmd.childrenResolved =>
51-
cmd.mapChildren(applyDataMasking)
52-
case cmd: Command => cmd
53-
case other if other.resolved => applyDataMasking(other)
54-
case other => other
37+
if (!dataMaskingEnabled(conf)) {
38+
plan
39+
} else {
40+
plan match {
41+
case marker0: DataMaskingStage0Marker => marker0
42+
case marker1: DataMaskingStage1Marker => marker1
43+
case cmd if isKnownTableCommand(cmd) =>
44+
val tableCommandSpec = getTableCommandSpec(cmd)
45+
val queries = tableCommandSpec.queries(cmd)
46+
cmd.mapChildren {
47+
case marker0: DataMaskingStage0Marker => marker0
48+
case marker1: DataMaskingStage1Marker => marker1
49+
case query if queries.contains(query) && query.resolved =>
50+
applyDataMasking(query)
51+
case o => o
52+
}
53+
case cmd: Command if cmd.childrenResolved =>
54+
cmd.mapChildren(applyDataMasking)
55+
case cmd: Command => cmd
56+
case other if other.resolved => applyDataMasking(other)
57+
case other => other
58+
}
5559
}
5660
}
5761

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/RuleApplyRowFilter.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.kyuubi.plugin.spark.authz.rule.rowfilter
1919

20+
import org.apache.spark.authz.AuthzConf.rowFilterEnabled
2021
import org.apache.spark.sql.SparkSession
2122
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
2223

@@ -29,14 +30,18 @@ import org.apache.kyuubi.plugin.spark.authz.serde._
2930
case class RuleApplyRowFilter(spark: SparkSession) extends RuleHelper {
3031

3132
override def apply(plan: LogicalPlan): LogicalPlan = {
32-
val newPlan = mapChildren(plan) {
33-
case p: RowFilterMarker => p
34-
case scan if isKnownScan(scan) && scan.resolved =>
35-
val tables = getScanSpec(scan).tables(scan, spark)
36-
tables.headOption.map(applyFilter(scan, _)).getOrElse(scan)
37-
case other => apply(other)
33+
if (!rowFilterEnabled(conf)) {
34+
plan
35+
} else {
36+
val newPlan = mapChildren(plan) {
37+
case p: RowFilterMarker => p
38+
case scan if isKnownScan(scan) && scan.resolved =>
39+
val tables = getScanSpec(scan).tables(scan, spark)
40+
tables.headOption.map(applyFilter(scan, _)).getOrElse(scan)
41+
case other => apply(other)
42+
}
43+
newPlan
3844
}
39-
newPlan
4045
}
4146

4247
private def applyFilter(
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.authz
18+
19+
import org.apache.spark.internal.config.ConfigBuilder
20+
import org.apache.spark.sql.internal.SQLConf
21+
22+
object AuthzConf {
23+
24+
def dataMaskingEnabled(conf: SQLConf): Boolean = {
25+
conf.getConf(DATA_MASKING_ENABLED)
26+
}
27+
28+
def rowFilterEnabled(conf: SQLConf): Boolean = {
29+
conf.getConf(ROW_FILTER_ENABLED)
30+
}
31+
32+
val CONF_RESTRICTED_LIST =
33+
ConfigBuilder("spark.kyuubi.conf.restricted.list")
34+
.doc("The config key in the restricted list cannot set dynamic configuration via SET syntax.")
35+
.version("1.7.0")
36+
.stringConf
37+
.createOptional
38+
39+
val DATA_MASKING_ENABLED =
40+
ConfigBuilder("spark.sql.authz.dataMasking.enabled")
41+
.doc("")
42+
.version("1.11.0")
43+
.booleanConf
44+
.createWithDefault(true)
45+
46+
val ROW_FILTER_ENABLED =
47+
ConfigBuilder("spark.sql.authz.rowFilter.enabled")
48+
.doc("")
49+
.version("1.11.0")
50+
.booleanConf
51+
.createWithDefault(true)
52+
53+
}

0 commit comments

Comments
 (0)