From 4e9dcb1e0f22bcd061d31c939eb7e02f823243fc Mon Sep 17 00:00:00 2001 From: oraviv Date: Tue, 24 May 2022 18:25:47 +0300 Subject: [PATCH 1/9] very initial trial to integrate into Spark Optimizations --- .gitignore | 4 +- .../scala/com/paypal/dione/spark/Dione.scala | 24 +++++++ .../com/paypal/dione/spark/DioneContext.scala | 18 +++++ .../com/paypal/dione/spark/DioneRule.scala | 39 +++++++++++ .../dione/spark/index/IndexManager.scala | 8 ++- .../index/optimizations/TestIndexRule.scala | 67 +++++++++++++++++++ 6 files changed, 158 insertions(+), 2 deletions(-) create mode 100644 dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala create mode 100644 dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala create mode 100644 dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala create mode 100644 dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala diff --git a/.gitignore b/.gitignore index 44fa7a67..68ac9874 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,6 @@ classes tmp TestData -settings-ossrh.xml \ No newline at end of file +settings-ossrh.xml + +.ipynb_checkpoints \ No newline at end of file diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala new file mode 100644 index 00000000..340339b2 --- /dev/null +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala @@ -0,0 +1,24 @@ +package com.paypal.dione.spark + +import org.apache.spark.sql.SparkSession + +object Dione { + + private var dioneContext: DioneContext = _ + + def getContext(spark: SparkSession): DioneContext = { + if (Option(dioneContext).isEmpty) + dioneContext = DioneContext(spark) + dioneContext + } + + def getContext: DioneContext = { + getContext(SparkSession.getActiveSession.getOrElse { + throw new RuntimeException("No active spark session found") + }) + } + + def enable(spark: SparkSession): Unit = { + spark.sessionState.experimentalMethods.extraOptimizations ++= DioneRule :: Nil + } +} diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala new file mode 100644 index 00000000..f4a86368 --- /dev/null +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala @@ -0,0 +1,18 @@ +package com.paypal.dione.spark + +import com.paypal.dione.spark.index.IndexSpec +import org.apache.spark.sql.SparkSession + +import scala.collection.mutable + +case class DioneContext(spark: SparkSession) { + + val indices: mutable.Map[String, Seq[IndexSpec]] = mutable.HashMap() + + def addIndex(indexSpec: IndexSpec): Unit = { + indices.put(indexSpec.dataTableName, + indices.getOrElse(indexSpec.dataTableName, Seq()) ++ Seq(indexSpec)) + } + + +} diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala new file mode 100644 index 00000000..f48b1ec3 --- /dev/null +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala @@ -0,0 +1,39 @@ +package com.paypal.dione.spark + +import com.paypal.dione.spark.index.{IndexManagerUtils, IndexSpec} +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.StructType + +object DioneRule extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan transform { + + // For a query covering index we move from the original table to the index table + case p @ Project(_, h @ HiveTableRelation(_, _, _)) + if getQualifiedIndex(h, p.projectList.map(_.name)).nonEmpty => + val idx = getQualifiedIndex(h, p.projectList.map(_.name)).get + val indexCatalogTable = IndexManagerUtils.getSparkCatalogTable(Dione.getContext.spark, + idx.indexTableName) + val updatedAttributes = toAttributes(indexCatalogTable.dataSchema, + h.dataCols.filter(dc => p.projectList.map(_.name).contains(dc.name))) + p.copy(p.projectList, + child = h.copy(tableMeta = indexCatalogTable, dataCols = updatedAttributes)) + } + } + + // based on StructType.toAttributes() + def toAttributes(structType: StructType, origAttrs: Seq[AttributeReference]): Seq[AttributeReference] = { + val origMap = origAttrs.map(ar => ar.name -> ar).toMap + structType + .map(f => origMap.getOrElse(f.name, AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + } + + def getQualifiedIndex(h: HiveTableRelation, projectedAtts: Seq[String]): Option[IndexSpec] = { + Dione.getContext.indices.getOrElse(h.tableMeta.identifier.identifier, Nil) + .find(ci => projectedAtts.forall(ci.getFields.contains)) + } +} diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala index 7dff275e..9d42b17a 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala @@ -10,7 +10,13 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, SparkSession} case class IndexSpec(dataTableName: String, indexTableName: String, - keys: Seq[String], moreFields: Seq[String] = Nil) + keys: Seq[String], moreFields: Seq[String] = Nil) { + + def getFields: Seq[String] = { + keys ++ moreFields + } + +} object IndexManager { diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala new file mode 100644 index 00000000..421040ae --- /dev/null +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala @@ -0,0 +1,67 @@ +package com.paypal.dione.spark.index.optimizations + +import com.paypal.dione.SparkCleanTestDB +import com.paypal.dione.spark.Dione +import com.paypal.dione.spark.index.{IndexManager, IndexSpec} +import com.paypal.dione.spark.index.avro.TestAvroIndexManagerJoin.spark +import org.junit.jupiter.api.MethodOrderer.OrderAnnotation +import org.junit.jupiter.api._ + + +object TestIndexRule extends SparkCleanTestDB { + + override val baseTestPath: String = "TestData/TestIndexRule/" + override val dbName: String = "TestIndexRule" + + @BeforeAll + def initData(): Unit = { + import spark.implicits._ + + + spark.sql(s"create table t_rule (key int, sub_key string, var1 string, var2 int) " + + s"partitioned by (dt string) stored as avro") + + (0 until 10).map(i => (i, "sub_key_"+i, "var_a_" + i, i)) + .toDF("key", "sub_key", "var1", "var2").repartition(2).createOrReplaceTempView("t") + spark.sql(s"insert overwrite table t_rule partition (dt='2021-10-04') select * from t") + + spark.table("t_rule").show() + } + +} + +@TestMethodOrder(classOf[OrderAnnotation]) +class TestIndexRule { + + val indexSpec = IndexSpec("t_rule", "t_rule_index", Seq("key"), Seq("sub_key")) + + @Test + @Order(1) + def testCreateIndexManager(): Unit = { + IndexManager.createNew(indexSpec)(spark) + } + + @Test + @Order(2) + def testAppendNewPartitions(): Unit = { + val indexManager = IndexManager.load("t_rule_index")(spark) + spark.conf.set("index.manager.btree.num.parts", "2") + spark.conf.set("index.manager.btree.interval", "3") + spark.conf.set("index.manager.btree.height", "1") + indexManager.appendMissingPartitions() + + Assertions.assertEquals(10, spark.table("t_rule_index").count()) + } + + @Test + @Order(3) + def testFilter(): Unit = { + Dione.enable(spark) + Dione.getContext.addIndex(indexSpec) + val dsDF = spark.table("t_rule").select("key", "sub_key")//.where("key == 7") + + dsDF.show() + dsDF.explain(true) + } + +} From 6365346a18a302c2d57384b8c23b8f893dae46e6 Mon Sep 17 00:00:00 2001 From: oraviv Date: Tue, 24 May 2022 20:12:31 +0300 Subject: [PATCH 2/9] pypandoc version issue --- .github/workflows/pr.workflow.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr.workflow.yml b/.github/workflows/pr.workflow.yml index 31e701c4..b7b0eeb9 100644 --- a/.github/workflows/pr.workflow.yml +++ b/.github/workflows/pr.workflow.yml @@ -70,5 +70,5 @@ jobs: find . export PYTHONPATH=`echo dione-spark/target/dione-spark-*-SNAPSHOT.jar` python -V - pip install pytest pyspark==2.4.8 + pip install pytest pypandoc==1.7.5 pyspark==2.4.8 pytest -v From 60face6dc2a2a3c1356e718f343d00216e82bad0 Mon Sep 17 00:00:00 2001 From: oraviv Date: Tue, 24 May 2022 20:28:50 +0300 Subject: [PATCH 3/9] pypandoc 2 --- .github/workflows/pr.workflow.yml | 3 ++- .../spark/index/optimizations/TestIndexRule.scala | 10 +++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/.github/workflows/pr.workflow.yml b/.github/workflows/pr.workflow.yml index b7b0eeb9..5eb09877 100644 --- a/.github/workflows/pr.workflow.yml +++ b/.github/workflows/pr.workflow.yml @@ -70,5 +70,6 @@ jobs: find . export PYTHONPATH=`echo dione-spark/target/dione-spark-*-SNAPSHOT.jar` python -V - pip install pytest pypandoc==1.7.5 pyspark==2.4.8 + pip install pypandoc==1.7.5 + pip install pytest pyspark==2.4.8 pytest -v diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala index 421040ae..c09b25d3 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala @@ -4,6 +4,8 @@ import com.paypal.dione.SparkCleanTestDB import com.paypal.dione.spark.Dione import com.paypal.dione.spark.index.{IndexManager, IndexSpec} import com.paypal.dione.spark.index.avro.TestAvroIndexManagerJoin.spark +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.junit.jupiter.api.MethodOrderer.OrderAnnotation import org.junit.jupiter.api._ @@ -58,10 +60,12 @@ class TestIndexRule { def testFilter(): Unit = { Dione.enable(spark) Dione.getContext.addIndex(indexSpec) - val dsDF = spark.table("t_rule").select("key", "sub_key")//.where("key == 7") + val dsDF = spark.table("t_rule").select("key", "sub_key") - dsDF.show() - dsDF.explain(true) + Assertions.assertEquals(dsDF.queryExecution.optimizedPlan.collect { + case h: HiveTableRelation => + h.tableMeta.identifier.identifier + }, Seq("t_rule_index")) } } From 42485427882f3cca4b8a078a509cb92dcfabb9b5 Mon Sep 17 00:00:00 2001 From: oraviv Date: Wed, 25 May 2022 07:00:49 +0300 Subject: [PATCH 4/9] add filter test --- .../spark/index/optimizations/TestIndexRule.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala index c09b25d3..b37acc20 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala @@ -57,7 +57,7 @@ class TestIndexRule { @Test @Order(3) - def testFilter(): Unit = { + def testCoveringProject(): Unit = { Dione.enable(spark) Dione.getContext.addIndex(indexSpec) val dsDF = spark.table("t_rule").select("key", "sub_key") @@ -68,4 +68,14 @@ class TestIndexRule { }, Seq("t_rule_index")) } + @Test + @Order(3) + def testFilter(): Unit = { + Dione.enable(spark) + Dione.getContext.addIndex(indexSpec) + val dsDF = spark.table("t_rule").select("key", "sub_key").where("key == 7") + + dsDF.explain(true) + dsDF.show() + } } From bc2f4a6bb1d659f32185e0ae69093400331601b6 Mon Sep 17 00:00:00 2001 From: oraviv Date: Wed, 25 May 2022 09:28:39 +0300 Subject: [PATCH 5/9] added filter-project optimization --- .../com/paypal/dione/spark/DioneRule.scala | 19 +++++++++++++++---- .../index/optimizations/TestIndexRule.scala | 4 ++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala index f48b1ec3..648440d0 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala @@ -3,7 +3,7 @@ package com.paypal.dione.spark import com.paypal.dione.spark.index.{IndexManagerUtils, IndexSpec} import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.StructType @@ -12,7 +12,7 @@ object DioneRule extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { plan transform { - // For a query covering index we move from the original table to the index table + // For a query covering index we switch between the original table and the index table case p @ Project(_, h @ HiveTableRelation(_, _, _)) if getQualifiedIndex(h, p.projectList.map(_.name)).nonEmpty => val idx = getQualifiedIndex(h, p.projectList.map(_.name)).get @@ -22,6 +22,17 @@ object DioneRule extends Rule[LogicalPlan] { h.dataCols.filter(dc => p.projectList.map(_.name).contains(dc.name))) p.copy(p.projectList, child = h.copy(tableMeta = indexCatalogTable, dataCols = updatedAttributes)) + + case p @ Project(_, f @ Filter(_, h @ HiveTableRelation(_, _, _))) + if getQualifiedIndex(h, p.references.map(_.name).toSeq).nonEmpty => + val idx = getQualifiedIndex(h, p.references.map(_.name).toSeq).get + val indexCatalogTable = IndexManagerUtils.getSparkCatalogTable(Dione.getContext.spark, + idx.indexTableName) + val updatedAttributes = toAttributes(indexCatalogTable.dataSchema, + h.dataCols.filter(dc => p.references.map(_.name).toSet.contains(dc.name))) + p.copy(p.projectList, + child = f.copy(f.condition, + child = h.copy(tableMeta = indexCatalogTable, dataCols = updatedAttributes))) } } @@ -32,8 +43,8 @@ object DioneRule extends Rule[LogicalPlan] { .map(f => origMap.getOrElse(f.name, AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) } - def getQualifiedIndex(h: HiveTableRelation, projectedAtts: Seq[String]): Option[IndexSpec] = { + def getQualifiedIndex(h: HiveTableRelation, referencedAtts: Seq[String]): Option[IndexSpec] = { Dione.getContext.indices.getOrElse(h.tableMeta.identifier.identifier, Nil) - .find(ci => projectedAtts.forall(ci.getFields.contains)) + .find(ci => referencedAtts.forall(ci.getFields.contains)) } } diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala index b37acc20..1fa896ac 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala @@ -77,5 +77,9 @@ class TestIndexRule { dsDF.explain(true) dsDF.show() + Assertions.assertEquals(dsDF.queryExecution.optimizedPlan.collect { + case h: HiveTableRelation => + h.tableMeta.identifier.identifier + }, Seq("t_rule_index")) } } From 3abc1e5a1f5759c08fc63bb3fb15e6daebf9d885 Mon Sep 17 00:00:00 2001 From: oraviv Date: Mon, 30 May 2022 11:22:51 +0300 Subject: [PATCH 6/9] Adding Index strategy --- .../scala/com/paypal/dione/spark/Dione.scala | 2 + .../com/paypal/dione/spark/DioneContext.scala | 10 +- .../com/paypal/dione/spark/DioneRule.scala | 23 +- .../avro/btree/SparkAvroBtreeUtils.scala | 2 +- .../spark/execution/DioneIndexStrategy.scala | 47 ++++ .../spark/execution/IndexBtreeScanExec.scala | 234 ++++++++++++++++++ .../dione/spark/index/IndexReader.scala | 9 - .../dione/spark/index/SparkIndexer.scala | 14 +- .../catalog/HiveIndexTableRelation.scala | 14 ++ .../index/optimizations/TestIndexRule.scala | 46 ++-- 10 files changed, 355 insertions(+), 46 deletions(-) create mode 100644 dione-spark/src/main/scala/com/paypal/dione/spark/execution/DioneIndexStrategy.scala create mode 100644 dione-spark/src/main/scala/com/paypal/dione/spark/execution/IndexBtreeScanExec.scala create mode 100644 dione-spark/src/main/scala/com/paypal/dione/spark/sql/catalyst/catalog/HiveIndexTableRelation.scala diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala index 340339b2..6afff161 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala @@ -1,5 +1,6 @@ package com.paypal.dione.spark +import com.paypal.dione.spark.execution.DioneIndexStrategy import org.apache.spark.sql.SparkSession object Dione { @@ -20,5 +21,6 @@ object Dione { def enable(spark: SparkSession): Unit = { spark.sessionState.experimentalMethods.extraOptimizations ++= DioneRule :: Nil + spark.sessionState.experimentalMethods.extraStrategies ++= DioneIndexStrategy :: Nil } } diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala index f4a86368..ecf39408 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala @@ -1,17 +1,17 @@ package com.paypal.dione.spark -import com.paypal.dione.spark.index.IndexSpec +import com.paypal.dione.spark.index.{IndexManager, IndexSpec} import org.apache.spark.sql.SparkSession import scala.collection.mutable case class DioneContext(spark: SparkSession) { - val indices: mutable.Map[String, Seq[IndexSpec]] = mutable.HashMap() + val indices: mutable.Map[String, Seq[IndexManager]] = mutable.HashMap() - def addIndex(indexSpec: IndexSpec): Unit = { - indices.put(indexSpec.dataTableName, - indices.getOrElse(indexSpec.dataTableName, Seq()) ++ Seq(indexSpec)) + def addIndex(indexManager: IndexManager): Unit = { + indices.put(indexManager.dataTableName, + indices.getOrElse(indexManager.dataTableName, Seq()) ++ Seq(indexManager)) } diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala index 648440d0..3284b44b 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala @@ -1,6 +1,7 @@ package com.paypal.dione.spark -import com.paypal.dione.spark.index.{IndexManagerUtils, IndexSpec} +import com.paypal.dione.spark.index.{IndexManagerUtils, IndexManager} +import com.paypal.dione.spark.sql.catalyst.catalog.HiveIndexTableRelation import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} @@ -14,8 +15,8 @@ object DioneRule extends Rule[LogicalPlan] { // For a query covering index we switch between the original table and the index table case p @ Project(_, h @ HiveTableRelation(_, _, _)) - if getQualifiedIndex(h, p.projectList.map(_.name)).nonEmpty => - val idx = getQualifiedIndex(h, p.projectList.map(_.name)).get + if getCoveringIndex(h, p.projectList.map(_.name)).nonEmpty => + val idx = getCoveringIndex(h, p.projectList.map(_.name)).get val indexCatalogTable = IndexManagerUtils.getSparkCatalogTable(Dione.getContext.spark, idx.indexTableName) val updatedAttributes = toAttributes(indexCatalogTable.dataSchema, @@ -23,16 +24,19 @@ object DioneRule extends Rule[LogicalPlan] { p.copy(p.projectList, child = h.copy(tableMeta = indexCatalogTable, dataCols = updatedAttributes)) + // For a data lookup index we add relevant information to later use in the strategy + // (we might need to move the supported logic from the strategy to here..) case p @ Project(_, f @ Filter(_, h @ HiveTableRelation(_, _, _))) - if getQualifiedIndex(h, p.references.map(_.name).toSeq).nonEmpty => - val idx = getQualifiedIndex(h, p.references.map(_.name).toSeq).get + if getIndexForTable(h).nonEmpty => + val idx = getIndexForTable(h).get val indexCatalogTable = IndexManagerUtils.getSparkCatalogTable(Dione.getContext.spark, idx.indexTableName) val updatedAttributes = toAttributes(indexCatalogTable.dataSchema, h.dataCols.filter(dc => p.references.map(_.name).toSet.contains(dc.name))) p.copy(p.projectList, child = f.copy(f.condition, - child = h.copy(tableMeta = indexCatalogTable, dataCols = updatedAttributes))) + child = new HiveIndexTableRelation(tableMeta = indexCatalogTable, dataCols = updatedAttributes, + partitionCols = h.partitionCols, h, idx))) } } @@ -43,8 +47,11 @@ object DioneRule extends Rule[LogicalPlan] { .map(f => origMap.getOrElse(f.name, AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) } - def getQualifiedIndex(h: HiveTableRelation, referencedAtts: Seq[String]): Option[IndexSpec] = { + def getCoveringIndex(h: HiveTableRelation, referencedAtts: Seq[String]): Option[IndexManager] = { Dione.getContext.indices.getOrElse(h.tableMeta.identifier.identifier, Nil) - .find(ci => referencedAtts.forall(ci.getFields.contains)) + .find(ci => referencedAtts.forall(ci.indexSpec.getFields.contains)) + } + def getIndexForTable(h: HiveTableRelation): Option[IndexManager] = { + Dione.getContext.indices.getOrElse(h.tableMeta.identifier.identifier, Nil).headOption } } diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/SparkAvroBtreeUtils.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/SparkAvroBtreeUtils.scala index f0dbdbf9..f29065b5 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/SparkAvroBtreeUtils.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/SparkAvroBtreeUtils.scala @@ -68,7 +68,7 @@ object SparkAvroBtreeUtils { val partitionKeys = partitionsSpec.flatMap(_._1.map(_._1)).distinct val remainingColumns = df.columns.filterNot(c => keysSet.contains(c) || partitionKeys.contains(c)) - logger.info("writing index file to " + folderName + s" with interval: $interval, height: $height," + + logger.info("writing index file to " + folderName + s", with interval: $interval, height: $height," + s" partitionsSpec: $partitionsSpec") val repartitionedDF = customRepartition(df, keys, partitionsSpec) diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/execution/DioneIndexStrategy.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/execution/DioneIndexStrategy.scala new file mode 100644 index 00000000..fc2f7d61 --- /dev/null +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/execution/DioneIndexStrategy.scala @@ -0,0 +1,47 @@ +package com.paypal.dione.spark.execution + +import com.paypal.dione.spark.Dione +import com.paypal.dione.spark.index.IndexManager +import com.paypal.dione.spark.sql.catalyst.catalog.HiveIndexTableRelation +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, EqualTo, Expression, Literal} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan + +object DioneIndexStrategy extends Strategy { + + private def getIdx(relation: HiveTableRelation): Option[IndexManager] = + Dione.getContext.indices.find(_._2.head.indexTableName == relation.tableMeta.identifier.identifier).map(_._2.head) + + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + + case PhysicalOperation(projectList, predicates, relation: HiveIndexTableRelation) if getIdx(relation).nonEmpty => + // Filter out all predicates that only deal with partition keys, these are given to the + // hive table scan operator to be used for partition pruning. + val partitionKeyIds = AttributeSet(relation.partitionCols) + val (pruningPredicates, otherPredicates) = predicates.partition { predicate => + !predicate.references.isEmpty && + predicate.references.subsetOf(partitionKeyIds) + } + + val idx = getIdx(relation).get + + def findLiteralKeyExpression(key: String, p: Expression) = p match { + case EqualTo(left, right: Literal) if left.references.size == 1 && left.references.toSeq.head.name == key => Some(right) + case _ => None + } + + val vals = idx.keys.map(k => otherPredicates.flatMap(p => findLiteralKeyExpression(k, p)).headOption) + + if (vals.exists(_.isEmpty)) + Nil + else + IndexBtreeScanExec(projectList.flatMap(_.references.toSeq).distinct, + relation, idx, pruningPredicates, otherPredicates, vals.map(v => Literal(v.get))) :: Nil + case _ => + Nil + } + +} diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/execution/IndexBtreeScanExec.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/execution/IndexBtreeScanExec.scala new file mode 100644 index 00000000..de279001 --- /dev/null +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/execution/IndexBtreeScanExec.scala @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.paypal.dione.spark.execution + +import com.paypal.dione.kvstorage.hadoop.avro.AvroHashBtreeStorageFolderReader +import com.paypal.dione.spark.Dione +import com.paypal.dione.spark.index.IndexManager +import com.paypal.dione.spark.sql.catalyst.catalog.HiveIndexTableRelation +import org.apache.avro.util.Utf8 +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.hive._ +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{BooleanType, DataType, StructType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils + +import scala.collection.JavaConverters._ + +/** + */ +case class IndexBtreeScanExec( + requestedAttributes: Seq[Attribute], + indexRelation: HiveIndexTableRelation, + idx: IndexManager, + partitionPruningPred: Seq[Expression], + dataFilters: Seq[Expression], + keys: Seq[Literal]) + extends LeafExecNode with CastSupport { + + def sparkSession = Dione.getContext.spark + + override def nodeName: String = s"Scan Index ${indexRelation.tableMeta.qualifiedName}" + + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + + override def producedAttributes: AttributeSet = outputSet ++ + AttributeSet(partitionPruningPred.flatMap(_.references)) + + private val originalAttributes = AttributeMap(indexRelation.output.map(a => a -> a) + ++ indexRelation.hiveDataTableRelation.output.map(a => a -> a)) + + override val output: Seq[Attribute] = { + // Retrieve the original attributes based on expression ID so that capitalization matches. + requestedAttributes.map(originalAttributes) + } + + // Bind all partition key attribute references in the partition pruning predicate for later + // evaluation. + private lazy val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred => + require(pred.dataType == BooleanType, + s"Data type of predicate $pred must be ${BooleanType.catalogString} rather than " + + s"${pred.dataType.catalogString}.") + + BindReferences.bindReference(pred, indexRelation.partitionCols) + } + + @transient private lazy val hiveQlTable = SparkSqlHiveUtils.toHiveTable(indexRelation.tableMeta) +// @transient private lazy val tableDesc = new TableDesc( +// hiveQlTable.getInputFormatClass, +// hiveQlTable.getOutputFormatClass, +// hiveQlTable.getMetadata) + + // Create a local copy of hadoopConf,so that scan specific modifications should not impact + // other queries +// @transient private lazy val hadoopConf = { +// val c = sparkSession.sessionState.newHadoopConf() +// append columns ids and names before broadcast +// addColumnMetadataToConf(c) +// c +// } + +// @transient private lazy val hadoopReader = new HadoopTableReader( +// output, +// relation.partitionCols, +// tableDesc, +// sparkSession, +// hadoopConf) + + private def castFromString(value: String, dataType: DataType) = { + cast(Literal(value), dataType).eval(null) + } + +// private def addColumnMetadataToConf(hiveConf: Configuration): Unit = { +// // Specifies needed column IDs for those non-partitioning columns. +// val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) +// val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) +// +// HiveShim.appendReadColumns(hiveConf, neededColumnIDs, output.map(_.name)) +// +// val deserializer = tableDesc.getDeserializerClass.newInstance +// deserializer.initialize(hiveConf, tableDesc.getProperties) +// +// // Specifies types and object inspectors of columns to be scanned. +// val structOI = ObjectInspectorUtils +// .getStandardObjectInspector( +// deserializer.getObjectInspector, +// ObjectInspectorCopyOption.JAVA) +// .asInstanceOf[StructObjectInspector] +// +// val columnTypeNames = structOI +// .getAllStructFieldRefs.asScala +// .map(_.getFieldObjectInspector) +// .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) +// .mkString(",") +// +// hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) +// hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) +// } + + /** + * Prunes partitions not involve the query plan. + * + * @param partitions All partitions of the relation. + * @return Partitions that are involved in the query plan. + */ + private def prunePartitions(partitions: Seq[HivePartition]) = { + boundPruningPred match { + case None => partitions + case Some(shouldKeep) => partitions.filter { part => + val dataTypes = indexRelation.partitionCols.map(_.dataType) + val castedValues = part.getValues.asScala.zip(dataTypes) + .map { case (value, dataType) => castFromString(value, dataType) } + + // Only partitioned values are needed here, since the predicate has already been bound to + // partition key attribute references. + val row = InternalRow.fromSeq(castedValues) + shouldKeep.eval(row).asInstanceOf[Boolean] + } + } + } + + // exposed for tests + @transient lazy val rawPartitions = { + val prunedPartitions = + if (sparkSession.sessionState.conf.metastorePartitionPruning && + partitionPruningPred.size > 0) { + // Retrieve the original attributes based on expression ID so that capitalization matches. + val normalizedFilters = partitionPruningPred.map(_.transform { + case a: AttributeReference => originalAttributes(a) + }) + sparkSession.sessionState.catalog.listPartitionsByFilter( + indexRelation.tableMeta.identifier, + normalizedFilters) + } else { + sparkSession.sessionState.catalog.listPartitions(indexRelation.tableMeta.identifier) + } + prunedPartitions.map(SparkSqlHiveUtils.toHivePartition(_, hiveQlTable)) + } + + protected override def doExecute(): RDD[InternalRow] = { + + new RDD[InternalRow](Dione.getContext.spark.sparkContext, Nil) { + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val idxSplit = split.asInstanceOf[IndexFilePartition] + val avroHashBtreeFolderReader = AvroHashBtreeStorageFolderReader(idxSplit.files.head.filePath) + + val blahUTF2 = (a: Any) => a match { + case x: UTF8String => x.toString + case x => x + } + + val valueIter = avroHashBtreeFolderReader.getIterator(keys.map(k => blahUTF2(k.value))) + + val blahUTF = (a: Any) => a match { + case x: Utf8 => UTF8String.fromString(x.toString) + case x: String => UTF8String.fromString(x) + case x => x + } + + val cnvrt = UnsafeProjection.create(requestedAttributes, requestedAttributes) + val idxRequestedFields = requestedAttributes.map(_.name) + .filter(idx.moreFields.contains) + + val dataFieldsMap = indexRelation.hiveDataTableRelation.schema.fields.map(f => f.name -> f).toMap + val dataRequestedFields = requestedAttributes.map(_.name) + .filter(f => dataFieldsMap.contains(f)) + .filterNot(f => idxRequestedFields.contains(f)) + .filterNot(f => idx.keys.contains(f)) + + val dataSchema = dataRequestedFields.map(f => dataFieldsMap(f)) + + valueIter.map(indexGR => { + val indexSeq = keys.map(k => k.value) ++ idxRequestedFields.map(reqAtt => blahUTF(indexGR.get(reqAtt))) + val dataMap = idx.sparkIndexer.readPayload(indexGR, StructType(dataSchema)) + val dataSeq = dataRequestedFields.map(reqAtt => blahUTF(dataMap(reqAtt))) + val c = cnvrt(InternalRow.fromSeq(indexSeq ++ dataSeq)) + c + }) + } + + override protected def getPartitions: Array[Partition] = { + val pp = prunePartitions(rawPartitions) + pp.zipWithIndex.map(p => IndexFilePartition(p._2, Seq(IndexFile(p._1.getLocation)))).toArray + } + } + } + +} + +case class IndexFile(filePath: String) +case class IndexFilePartition(index: Int, files: Seq[IndexFile]) extends Partition diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexReader.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexReader.scala index 2d3e8b27..cd75916d 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexReader.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexReader.scala @@ -37,15 +37,6 @@ case class IndexReader(@transient spark: SparkSession, sparkIndexer: SparkIndexe def read(index: DataFrame): DataFrame = IndexReader.read(index, this) - def readPayload[T](indexGR: GenericRecord): Map[String, Any] = { - logger.debug("initing file: " + indexGR.get(FILE_NAME_COLUMN).toString) - val hdfsIndexer = sparkIndexer.initHdfsIndexer(new Path(indexGR.get(FILE_NAME_COLUMN).toString), - new Configuration(), fieldsSchema) - val hdfsIndexMetadata = HdfsIndexerMetadata(indexGR) - val fetchedT = hdfsIndexer.fetch(hdfsIndexMetadata) - sparkIndexer.convertMap(fetchedT) - } - private def init() = { reporter.initPartitionMetrics() } diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/index/SparkIndexer.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/index/SparkIndexer.scala index dcc05e70..eaa7a9cf 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/index/SparkIndexer.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/index/SparkIndexer.scala @@ -1,11 +1,13 @@ package com.paypal.dione.spark.index -import com.paypal.dione.hdfs.index.HdfsIndexer +import com.paypal.dione.hdfs.index.HdfsIndexContants.FILE_NAME_COLUMN +import com.paypal.dione.hdfs.index.{HdfsIndexer, HdfsIndexerMetadata} import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types.StructType +import org.slf4j.LoggerFactory /** * wrapper trait between to use HdfsIndexer inside Spark. @@ -21,6 +23,8 @@ import org.apache.spark.sql.types.StructType */ trait SparkIndexer { + private val logger = LoggerFactory.getLogger(this.getClass) + type T >: Null @transient val spark: SparkSession @@ -32,6 +36,7 @@ trait SparkIndexer { } def convert(t: T): Seq[Any] + def convertMap(t: T): Map[String, Any] /** @@ -46,7 +51,12 @@ trait SparkIndexer { } def readPayload(indexGR: GenericRecord, payloadSchema: StructType): Map[String, Any] = { - IndexReader(spark, this, payloadSchema, false).readPayload(indexGR) + logger.debug("initializing file: " + indexGR.get(FILE_NAME_COLUMN).toString) + val hdfsIndexer = initHdfsIndexer(new Path(indexGR.get(FILE_NAME_COLUMN).toString), + new Configuration(), payloadSchema) + val hdfsIndexMetadata = HdfsIndexerMetadata(indexGR) + val fetchedT = hdfsIndexer.fetch(hdfsIndexMetadata) + convertMap(fetchedT) } def loadByIndex(index: DataFrame, payloadSchema: StructType): DataFrame = { diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/sql/catalyst/catalog/HiveIndexTableRelation.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/sql/catalyst/catalog/HiveIndexTableRelation.scala new file mode 100644 index 00000000..72731129 --- /dev/null +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/sql/catalyst/catalog/HiveIndexTableRelation.scala @@ -0,0 +1,14 @@ +package com.paypal.dione.spark.sql.catalyst.catalog + +import com.paypal.dione.spark.index.IndexManager +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.AttributeReference + +class HiveIndexTableRelation(tableMeta: CatalogTable, + dataCols: Seq[AttributeReference], + partitionCols: Seq[AttributeReference], + val hiveDataTableRelation: HiveTableRelation, + val indexSpec: IndexManager) + extends HiveTableRelation(tableMeta, dataCols, partitionCols) { + +} diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala index 1fa896ac..a1cc95e0 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala @@ -2,10 +2,9 @@ package com.paypal.dione.spark.index.optimizations import com.paypal.dione.SparkCleanTestDB import com.paypal.dione.spark.Dione -import com.paypal.dione.spark.index.{IndexManager, IndexSpec} import com.paypal.dione.spark.index.avro.TestAvroIndexManagerJoin.spark +import com.paypal.dione.spark.index.{IndexManager, IndexSpec} import org.apache.spark.sql.catalyst.catalog.HiveTableRelation -import org.apache.spark.sql.execution.datasources.LogicalRelation import org.junit.jupiter.api.MethodOrderer.OrderAnnotation import org.junit.jupiter.api._ @@ -20,7 +19,7 @@ object TestIndexRule extends SparkCleanTestDB { import spark.implicits._ - spark.sql(s"create table t_rule (key int, sub_key string, var1 string, var2 int) " + + spark.sql(s"create table t_rule (key string, sub_key string, var1 string, var2 int) " + s"partitioned by (dt string) stored as avro") (0 until 10).map(i => (i, "sub_key_"+i, "var_a_" + i, i)) @@ -52,34 +51,39 @@ class TestIndexRule { spark.conf.set("index.manager.btree.height", "1") indexManager.appendMissingPartitions() + Dione.enable(spark) + Dione.getContext.addIndex(indexManager) + Assertions.assertEquals(10, spark.table("t_rule_index").count()) } @Test @Order(3) def testCoveringProject(): Unit = { - Dione.enable(spark) - Dione.getContext.addIndex(indexSpec) - val dsDF = spark.table("t_rule").select("key", "sub_key") + val df = spark.table("t_rule").select("key", "sub_key").where("sub_key=='sub_key_4'") + + Assertions.assertEquals(Seq("t_rule_index"), + df.queryExecution.optimizedPlan.collect { + case h: HiveTableRelation => + h.tableMeta.identifier.identifier + }) - Assertions.assertEquals(dsDF.queryExecution.optimizedPlan.collect { - case h: HiveTableRelation => - h.tableMeta.identifier.identifier - }, Seq("t_rule_index")) + Assertions.assertEquals("[4,sub_key_4]", df.collect().mkString(",")) } @Test @Order(3) - def testFilter(): Unit = { - Dione.enable(spark) - Dione.getContext.addIndex(indexSpec) - val dsDF = spark.table("t_rule").select("key", "sub_key").where("key == 7") - - dsDF.explain(true) - dsDF.show() - Assertions.assertEquals(dsDF.queryExecution.optimizedPlan.collect { - case h: HiveTableRelation => - h.tableMeta.identifier.identifier - }, Seq("t_rule_index")) + def testFilterEqualTo(): Unit = { + val df = spark.table("t_rule").select("key", "sub_key", "var2", "var1").where("key == '7'") + +// df.explain(true) +// df.show() + Assertions.assertEquals(Seq("t_rule_index"), + df.queryExecution.optimizedPlan.collect { + case h: HiveTableRelation => + h.tableMeta.identifier.identifier + }) + + Assertions.assertEquals("[7,sub_key_7,7,var_a_7]", df.collect().mkString(",")) } } From ad8a22d1b5501b81935ea621b575b78e79f11d67 Mon Sep 17 00:00:00 2001 From: oraviv Date: Tue, 31 May 2022 08:46:25 +0300 Subject: [PATCH 7/9] Moving logic from physical plan to optimizer --- .../com/paypal/dione/spark/DioneContext.scala | 6 + .../com/paypal/dione/spark/DioneRule.scala | 52 ++++++--- .../spark/execution/DioneIndexStrategy.scala | 23 +--- ...anExec.scala => IndexBtreeFetchExec.scala} | 103 +++++------------- .../dione/spark/index/IndexManager.scala | 27 ++++- .../catalog/HiveIndexTableRelation.scala | 5 +- .../com/paypal/dione/SparkTestBase.scala | 1 + .../index/avro/TestAvroDecimalConverter.scala | 3 +- .../index/avro/TestAvroIndexManager.scala | 2 +- .../index/avro/TestAvroIndexManagerBase.scala | 2 +- .../index/avro/TestAvroIndexManagerJoin.scala | 4 +- .../avro/TestAvroIndexManagerNonUnique.scala | 2 +- .../spark/index/avro/TestDataSplitter.scala | 2 +- .../spark/index/avro/TestOneLineInBlock.scala | 2 +- .../index/optimizations/TestIndexRule.scala | 82 +++++++++----- .../index/orc/TestOrcIndexManagerBase.scala | 2 +- .../parquet/TestParquetIndexManager.scala | 8 +- .../parquet/TestParquetIndexManagerBase.scala | 2 +- .../TestSequenceFileIndexManagerBase.scala | 2 +- .../sequence/TestSparkSequenceFile.scala | 6 +- 20 files changed, 179 insertions(+), 157 deletions(-) rename dione-spark/src/main/scala/com/paypal/dione/spark/execution/{IndexBtreeScanExec.scala => IndexBtreeFetchExec.scala} (65%) diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala index ecf39408..7a6fe7a7 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala @@ -2,6 +2,7 @@ package com.paypal.dione.spark import com.paypal.dione.spark.index.{IndexManager, IndexSpec} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier import scala.collection.mutable @@ -9,6 +10,11 @@ case class DioneContext(spark: SparkSession) { val indices: mutable.Map[String, Seq[IndexManager]] = mutable.HashMap() + def getIndices(tableIdentifier: TableIdentifier): Seq[IndexManager] = { + indices.getOrElse(tableIdentifier.database.getOrElse(spark.catalog.currentDatabase) + + "." + tableIdentifier.identifier, Nil) + } + def addIndex(indexManager: IndexManager): Unit = { indices.put(indexManager.dataTableName, indices.getOrElse(indexManager.dataTableName, Seq()) ++ Seq(indexManager)) diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala index 3284b44b..9c8d2ed2 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala @@ -1,9 +1,9 @@ package com.paypal.dione.spark -import com.paypal.dione.spark.index.{IndexManagerUtils, IndexManager} +import com.paypal.dione.spark.index.{IndexManager, IndexManagerUtils} import com.paypal.dione.spark.sql.catalyst.catalog.HiveIndexTableRelation import org.apache.spark.sql.catalyst.catalog.HiveTableRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.StructType @@ -21,22 +21,27 @@ object DioneRule extends Rule[LogicalPlan] { idx.indexTableName) val updatedAttributes = toAttributes(indexCatalogTable.dataSchema, h.dataCols.filter(dc => p.projectList.map(_.name).contains(dc.name))) - p.copy(p.projectList, - child = h.copy(tableMeta = indexCatalogTable, dataCols = updatedAttributes)) + p.copy(child = h.copy(tableMeta = indexCatalogTable, dataCols = updatedAttributes)) - // For a data lookup index we add relevant information to later use in the strategy - // (we might need to move the supported logic from the strategy to here..) case p @ Project(_, f @ Filter(_, h @ HiveTableRelation(_, _, _))) - if getIndexForTable(h).nonEmpty => - val idx = getIndexForTable(h).get + if getCoveringIndex(h, p.projectList.map(_.name)).nonEmpty => + val idx = getCoveringIndex(h, p.projectList.map(_.name)).get + val indexCatalogTable = IndexManagerUtils.getSparkCatalogTable(Dione.getContext.spark, + idx.indexTableName) + val updatedAttributes = toAttributes(indexCatalogTable.dataSchema, + h.dataCols.filter(dc => p.projectList.map(_.name).contains(dc.name))) + p.copy(child = f.copy(child = h.copy(tableMeta = indexCatalogTable, dataCols = updatedAttributes))) + + // For a data lookup index we add relevant information to later use in the strategy + case p @ Project(_, f @ Filter(condition, h @ HiveTableRelation(_, _, _))) + if getSpecificFilter(condition, h).nonEmpty => + val (idx, literalFilter) = getSpecificFilter(condition, h).get val indexCatalogTable = IndexManagerUtils.getSparkCatalogTable(Dione.getContext.spark, idx.indexTableName) val updatedAttributes = toAttributes(indexCatalogTable.dataSchema, h.dataCols.filter(dc => p.references.map(_.name).toSet.contains(dc.name))) - p.copy(p.projectList, - child = f.copy(f.condition, - child = new HiveIndexTableRelation(tableMeta = indexCatalogTable, dataCols = updatedAttributes, - partitionCols = h.partitionCols, h, idx))) + p.copy(child = f.copy(child = new HiveIndexTableRelation(tableMeta = indexCatalogTable, + dataCols = updatedAttributes, partitionCols = h.partitionCols, h, idx, literalFilter))) } } @@ -48,10 +53,29 @@ object DioneRule extends Rule[LogicalPlan] { } def getCoveringIndex(h: HiveTableRelation, referencedAtts: Seq[String]): Option[IndexManager] = { - Dione.getContext.indices.getOrElse(h.tableMeta.identifier.identifier, Nil) + Dione.getContext.getIndices(h.tableMeta.identifier) .find(ci => referencedAtts.forall(ci.indexSpec.getFields.contains)) } + def getIndexForTable(h: HiveTableRelation): Option[IndexManager] = { - Dione.getContext.indices.getOrElse(h.tableMeta.identifier.identifier, Nil).headOption + Dione.getContext.getIndices(h.tableMeta.identifier).headOption + } + + private def getSpecificFilter(condition: Expression, hiveTableRelation: HiveTableRelation): Option[(IndexManager, Seq[Literal])] = { + def findLiteralKeyExpression(key: String, p: Expression) = p match { + case EqualTo(left, right: Literal) if left.references.size == 1 && left.references.toSeq.head.name == key => Some(right) + case _ => None + } + + val idx = getIndexForTable(hiveTableRelation) + + if (idx.nonEmpty) { + val vals = idx.get.keys.map(k => condition.flatMap(p => findLiteralKeyExpression(k, p)).headOption) + if (vals.exists(_.isEmpty)) + None + else Some(idx.get, vals.map(v => Literal(v.get))) + } else { + None + } } } diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/execution/DioneIndexStrategy.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/execution/DioneIndexStrategy.scala index fc2f7d61..70499beb 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/execution/DioneIndexStrategy.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/execution/DioneIndexStrategy.scala @@ -12,34 +12,21 @@ import org.apache.spark.sql.execution.SparkPlan object DioneIndexStrategy extends Strategy { - private def getIdx(relation: HiveTableRelation): Option[IndexManager] = - Dione.getContext.indices.find(_._2.head.indexTableName == relation.tableMeta.identifier.identifier).map(_._2.head) - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projectList, predicates, relation: HiveIndexTableRelation) if getIdx(relation).nonEmpty => + case PhysicalOperation(projectList, predicates, indexRelation: HiveIndexTableRelation) => // Filter out all predicates that only deal with partition keys, these are given to the // hive table scan operator to be used for partition pruning. - val partitionKeyIds = AttributeSet(relation.partitionCols) + val partitionKeyIds = AttributeSet(indexRelation.partitionCols) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => !predicate.references.isEmpty && predicate.references.subsetOf(partitionKeyIds) } - val idx = getIdx(relation).get - - def findLiteralKeyExpression(key: String, p: Expression) = p match { - case EqualTo(left, right: Literal) if left.references.size == 1 && left.references.toSeq.head.name == key => Some(right) - case _ => None - } - - val vals = idx.keys.map(k => otherPredicates.flatMap(p => findLiteralKeyExpression(k, p)).headOption) + val idx = indexRelation.indexManager - if (vals.exists(_.isEmpty)) - Nil - else - IndexBtreeScanExec(projectList.flatMap(_.references.toSeq).distinct, - relation, idx, pruningPredicates, otherPredicates, vals.map(v => Literal(v.get))) :: Nil + IndexBtreeFetchExec(projectList.flatMap(_.references.toSeq).distinct, + indexRelation, idx, pruningPredicates, otherPredicates, indexRelation.literalFilters) :: Nil case _ => Nil } diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/execution/IndexBtreeScanExec.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/execution/IndexBtreeFetchExec.scala similarity index 65% rename from dione-spark/src/main/scala/com/paypal/dione/spark/execution/IndexBtreeScanExec.scala rename to dione-spark/src/main/scala/com/paypal/dione/spark/execution/IndexBtreeFetchExec.scala index de279001..a92ffa86 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/execution/IndexBtreeScanExec.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/execution/IndexBtreeFetchExec.scala @@ -22,35 +22,27 @@ import com.paypal.dione.spark.Dione import com.paypal.dione.spark.index.IndexManager import com.paypal.dione.spark.sql.catalyst.catalog.HiveIndexTableRelation import org.apache.avro.util.Utf8 -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} -import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.hadoop.hive.serde.serdeConstants -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption -import org.apache.hadoop.hive.serde2.objectinspector._ -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils -import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.CastSupport -import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.hive._ -import org.apache.spark.sql.hive.client.HiveClientImpl -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, DataType, StructType} import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.util.Utils +import org.apache.spark.{Partition, TaskContext} + import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ + /** + * copied and changed from HiveTableScanExec */ -case class IndexBtreeScanExec( +case class IndexBtreeFetchExec( requestedAttributes: Seq[Attribute], indexRelation: HiveIndexTableRelation, idx: IndexManager, @@ -61,7 +53,7 @@ case class IndexBtreeScanExec( def sparkSession = Dione.getContext.spark - override def nodeName: String = s"Scan Index ${indexRelation.tableMeta.qualifiedName}" + override def nodeName: String = s"Fetch From Index ${indexRelation.tableMeta.qualifiedName}" override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -88,58 +80,11 @@ case class IndexBtreeScanExec( } @transient private lazy val hiveQlTable = SparkSqlHiveUtils.toHiveTable(indexRelation.tableMeta) -// @transient private lazy val tableDesc = new TableDesc( -// hiveQlTable.getInputFormatClass, -// hiveQlTable.getOutputFormatClass, -// hiveQlTable.getMetadata) - - // Create a local copy of hadoopConf,so that scan specific modifications should not impact - // other queries -// @transient private lazy val hadoopConf = { -// val c = sparkSession.sessionState.newHadoopConf() -// append columns ids and names before broadcast -// addColumnMetadataToConf(c) -// c -// } - -// @transient private lazy val hadoopReader = new HadoopTableReader( -// output, -// relation.partitionCols, -// tableDesc, -// sparkSession, -// hadoopConf) private def castFromString(value: String, dataType: DataType) = { cast(Literal(value), dataType).eval(null) } -// private def addColumnMetadataToConf(hiveConf: Configuration): Unit = { -// // Specifies needed column IDs for those non-partitioning columns. -// val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) -// val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) -// -// HiveShim.appendReadColumns(hiveConf, neededColumnIDs, output.map(_.name)) -// -// val deserializer = tableDesc.getDeserializerClass.newInstance -// deserializer.initialize(hiveConf, tableDesc.getProperties) -// -// // Specifies types and object inspectors of columns to be scanned. -// val structOI = ObjectInspectorUtils -// .getStandardObjectInspector( -// deserializer.getObjectInspector, -// ObjectInspectorCopyOption.JAVA) -// .asInstanceOf[StructObjectInspector] -// -// val columnTypeNames = structOI -// .getAllStructFieldRefs.asScala -// .map(_.getFieldObjectInspector) -// .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) -// .mkString(",") -// -// hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) -// hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) -// } - /** * Prunes partitions not involve the query plan. * @@ -184,8 +129,8 @@ case class IndexBtreeScanExec( new RDD[InternalRow](Dione.getContext.spark.sparkContext, Nil) { override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { - val idxSplit = split.asInstanceOf[IndexFilePartition] - val avroHashBtreeFolderReader = AvroHashBtreeStorageFolderReader(idxSplit.files.head.filePath) + val idxSplit = split.asInstanceOf[IndexPartition] + val avroHashBtreeFolderReader = AvroHashBtreeStorageFolderReader(idxSplit.path) val blahUTF2 = (a: Any) => a match { case x: UTF8String => x.toString @@ -194,7 +139,7 @@ case class IndexBtreeScanExec( val valueIter = avroHashBtreeFolderReader.getIterator(keys.map(k => blahUTF2(k.value))) - val blahUTF = (a: Any) => a match { + val blahUTF8 = (a: Any) => a match { case x: Utf8 => UTF8String.fromString(x.toString) case x: String => UTF8String.fromString(x) case x => x @@ -204,31 +149,39 @@ case class IndexBtreeScanExec( val idxRequestedFields = requestedAttributes.map(_.name) .filter(idx.moreFields.contains) + val partitionMap = (indexRelation.partitionCols zip idxSplit.prtValues).map(p => p._1.name -> p._2).toMap + + val keysMap = (idx.keys zip keys).map(x => x._1 -> x._2.value).toMap + val dataFieldsMap = indexRelation.hiveDataTableRelation.schema.fields.map(f => f.name -> f).toMap + val allButDataFieldsSet = (idxRequestedFields ++ partitionMap.keys ++ keysMap.keys).toSet val dataRequestedFields = requestedAttributes.map(_.name) - .filter(f => dataFieldsMap.contains(f)) - .filterNot(f => idxRequestedFields.contains(f)) - .filterNot(f => idx.keys.contains(f)) - + .filterNot(f => allButDataFieldsSet.contains(f)) val dataSchema = dataRequestedFields.map(f => dataFieldsMap(f)) valueIter.map(indexGR => { - val indexSeq = keys.map(k => k.value) ++ idxRequestedFields.map(reqAtt => blahUTF(indexGR.get(reqAtt))) val dataMap = idx.sparkIndexer.readPayload(indexGR, StructType(dataSchema)) - val dataSeq = dataRequestedFields.map(reqAtt => blahUTF(dataMap(reqAtt))) - val c = cnvrt(InternalRow.fromSeq(indexSeq ++ dataSeq)) + + val retSeq = requestedAttributes.map(_.name).map { + case f if idx.moreFields.contains(f) => indexGR.get(f) + case f if keysMap.contains(f) => keysMap(f) + case f if partitionMap.contains(f) => partitionMap(f) + case f if dataMap.contains(f) => dataMap(f) + case f => throw new RuntimeException(s"Field $f not found") + }.map(blahUTF8) + + val c = cnvrt(InternalRow.fromSeq(retSeq)) c }) } override protected def getPartitions: Array[Partition] = { val pp = prunePartitions(rawPartitions) - pp.zipWithIndex.map(p => IndexFilePartition(p._2, Seq(IndexFile(p._1.getLocation)))).toArray + pp.zipWithIndex.map(p => IndexPartition(p._2, p._1.getValues.toList, p._1.getLocation)).toArray } } } } -case class IndexFile(filePath: String) -case class IndexFilePartition(index: Int, files: Seq[IndexFile]) extends Partition +case class IndexPartition(index: Int, prtValues: List[String], path: String) extends Partition diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala index 9d42b17a..2e3a11cb 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala @@ -9,9 +9,29 @@ import org.apache.spark.sql.functions.{col, expr} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, SparkSession} +object IndexSpec { + def create(dataTableName: String, indexTableName: String, + keys: Seq[String], moreFields: Seq[String] = Nil)(implicit sparkSession: SparkSession): IndexSpec = { + new IndexSpec(toFullTableName(dataTableName), toFullTableName(indexTableName), keys, moreFields) + } + + private def toFullTableName(tableName: String)(implicit sparkSession: SparkSession): String = { + tableName.split('.') match { + case Array(_, _) => + tableName + case Array(onlyTableName) => + sparkSession.catalog.currentDatabase + "." + onlyTableName + } + } +} + case class IndexSpec(dataTableName: String, indexTableName: String, keys: Seq[String], moreFields: Seq[String] = Nil) { + require(keys.nonEmpty, "you must provide at least one key") + require(dataTableName.contains("."), dataTableName + " must include DB name") + require(indexTableName.contains("."), indexTableName + " must include DB name") + def getFields: Seq[String] = { keys ++ moreFields } @@ -36,6 +56,11 @@ object IndexManager { * @param spark * @return */ + def createNew(dataTableName: String, indexTableName: String, + keys: Seq[String], moreFields: Seq[String] = Nil)(implicit spark: SparkSession): IndexManager = { + createNew(IndexSpec.create(dataTableName, indexTableName, keys, moreFields)) + } + def createNew(indexSpec: IndexSpec)(implicit spark: SparkSession): IndexManager = { // TODO: assert index table doesn't exist @@ -64,7 +89,7 @@ object IndexManager { val dataTableName = tblProperties("index.meta.dataTableName") val keys = tblProperties("index.meta.keys").split("\\|") val moreFields = tblProperties("index.meta.moreFields").split("\\|").filterNot(_.isEmpty) - val indexSpec = IndexSpec(dataTableName, indexTableName, keys, moreFields) + val indexSpec = IndexSpec.create(dataTableName, indexTableName, keys, moreFields) // TODO - add the manager class to the table metadata, and pass explicitly here: val indexManager: IndexManager = IndexManagerUtils.createIndexManager(spark, indexSpec) diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/sql/catalyst/catalog/HiveIndexTableRelation.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/sql/catalyst/catalog/HiveIndexTableRelation.scala index 72731129..4a4f3562 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/sql/catalyst/catalog/HiveIndexTableRelation.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/sql/catalyst/catalog/HiveIndexTableRelation.scala @@ -2,13 +2,14 @@ package com.paypal.dione.spark.sql.catalyst.catalog import com.paypal.dione.spark.index.IndexManager import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation} -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal} class HiveIndexTableRelation(tableMeta: CatalogTable, dataCols: Seq[AttributeReference], partitionCols: Seq[AttributeReference], val hiveDataTableRelation: HiveTableRelation, - val indexSpec: IndexManager) + val indexManager: IndexManager, + val literalFilters: Seq[Literal]) extends HiveTableRelation(tableMeta, dataCols, partitionCols) { } diff --git a/dione-spark/src/test/scala/com/paypal/dione/SparkTestBase.scala b/dione-spark/src/test/scala/com/paypal/dione/SparkTestBase.scala index 5dcfbfd3..d0b6874c 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/SparkTestBase.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/SparkTestBase.scala @@ -23,6 +23,7 @@ trait SparkTestBase { spark.conf.set("indexer.sampler.files.rate", "1.0") spark } + implicit def ss: SparkSession = spark lazy val hadoopConf = spark.sparkContext.hadoopConfiguration lazy val fs: FileSystem = FileSystem.get(hadoopConf) diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroDecimalConverter.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroDecimalConverter.scala index b288ecbf..3de18626 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroDecimalConverter.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroDecimalConverter.scala @@ -57,7 +57,8 @@ class TestAvroDecimalConverter { spark.table("foo").show(false) spark.table("foo").printSchema() - def create() = IndexManager.createNew(IndexSpec("foo", "index", Seq("id"), Seq("col1", "col2", "col3")))(spark) + def create() = IndexManager.createNew(IndexSpec.create("foo", "index", + Seq("id"), Seq("col1", "col2", "col3"))) // without setting indexer.castDecimalToDouble=true, should fail: Assertions.assertThrows(classOf[IllegalArgumentException], new Executable { diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManager.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManager.scala index ec79ee69..4f1653f6 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManager.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManager.scala @@ -41,7 +41,7 @@ class TestAvroIndexManager { @Test @Order(1) def testCreateIndexManager(): Unit = { - IndexManager.createNew(IndexSpec("t3", "index_t3", Seq("message_id", "sub_message_id"), Seq("time_result_created")))(spark) + IndexManager.createNew("t3", "index_t3", Seq("message_id", "sub_message_id"), Seq("time_result_created")) spark.sql("desc formatted index_t3").show(100, false) } diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerBase.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerBase.scala index be8d6cfd..86cdb96b 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerBase.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerBase.scala @@ -8,7 +8,7 @@ import org.junit.jupiter.api._ @TestMethodOrder(classOf[OrderAnnotation]) class TestAvroIndexManagerBase extends TestIndexManagerBase { - lazy val indexSpec: IndexSpec = IndexSpec("avro_data_tbl", "avro_data_tbl_idx", Seq("id_col")) + lazy val indexSpec: IndexSpec = IndexSpec.create("avro_data_tbl", "avro_data_tbl_idx", Seq("id_col")) def initDataTable(fieldsSchema: String, partitionFieldSchema: String): Unit = { spark.sql(s"create table ${indexSpec.dataTableName} ($fieldsSchema) partitioned by ($partitionFieldSchema) stored as avro") diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerJoin.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerJoin.scala index 0d332130..80de9076 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerJoin.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerJoin.scala @@ -41,13 +41,13 @@ class TestAvroIndexManagerJoin { @Test @Order(1) def testCreateIndexManager(): Unit = { - IndexManager.createNew(IndexSpec("t_join", "t_join_index", Seq("key"), Seq("ts")))(spark) + IndexManager.createNew("t_join", "t_join_index", Seq("key"), Seq("ts")) } @Test @Order(2) def testAppendNewPartitions(): Unit = { - val indexManager = IndexManager.load("t_join_index")(spark) + val indexManager = IndexManager.load("t_join_index") spark.conf.set("index.manager.btree.num.parts", "2") spark.conf.set("index.manager.btree.interval", "3") spark.conf.set("index.manager.btree.height", "2") diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerNonUnique.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerNonUnique.scala index a84fea3d..7634e44b 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerNonUnique.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerNonUnique.scala @@ -41,7 +41,7 @@ class TestAvroIndexManagerNonUnique { @Test @Order(1) def testCreateIndexManager(): Unit = { - IndexManager.createNew(IndexSpec("tbl", "index_tbl", Seq("key")))(spark) + IndexManager.createNew("tbl", "index_tbl", Seq("key")) spark.sql("desc formatted index_tbl").show(100, false) } diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestDataSplitter.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestDataSplitter.scala index 75dc6edf..9bafc3bd 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestDataSplitter.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestDataSplitter.scala @@ -38,7 +38,7 @@ case class DataSplitTester(spark: SparkSession, expectedNumChunks: Int, dataTabl def fullE2ETest(): Unit = { spark.conf.set("indexer.files.chunkMB", 1) - val manager = IndexManager.createNew(IndexSpec(dataTable, indexTable, Seq("id"), Seq("col1")))(spark) + val manager = IndexManager.createNew(dataTable, indexTable, Seq("id"), Seq("col1"))(spark) manager.appendMissingPartitions() val index = manager.getIndex() diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestOneLineInBlock.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestOneLineInBlock.scala index db1387a1..e67d6050 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestOneLineInBlock.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestOneLineInBlock.scala @@ -47,7 +47,7 @@ class TestOneLineInBlock { @Test @Order(1) def testCreateIndexManager(): Unit = { - IndexManager.createNew(IndexSpec("t3", "index_t3", Seq("message_id", "sub_message_id"), Seq("time_result_created")))(spark) + IndexManager.createNew("t3", "index_t3", Seq("message_id", "sub_message_id"), Seq("time_result_created")) spark.sql("desc formatted index_t3").show(100, false) // ensure one row per block import spark.implicits._ diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala index a1cc95e0..4ba3030c 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala @@ -4,21 +4,21 @@ import com.paypal.dione.SparkCleanTestDB import com.paypal.dione.spark.Dione import com.paypal.dione.spark.index.avro.TestAvroIndexManagerJoin.spark import com.paypal.dione.spark.index.{IndexManager, IndexSpec} +import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.catalyst.catalog.HiveTableRelation -import org.junit.jupiter.api.MethodOrderer.OrderAnnotation +import org.junit.jupiter.api.TestInstance.Lifecycle import org.junit.jupiter.api._ object TestIndexRule extends SparkCleanTestDB { override val baseTestPath: String = "TestData/TestIndexRule/" - override val dbName: String = "TestIndexRule" + override val dbName: String = "TestIndexRule".toLowerCase @BeforeAll def initData(): Unit = { import spark.implicits._ - spark.sql(s"create table t_rule (key string, sub_key string, var1 string, var2 int) " + s"partitioned by (dt string) stored as avro") @@ -26,26 +26,23 @@ object TestIndexRule extends SparkCleanTestDB { .toDF("key", "sub_key", "var1", "var2").repartition(2).createOrReplaceTempView("t") spark.sql(s"insert overwrite table t_rule partition (dt='2021-10-04') select * from t") - spark.table("t_rule").show() + //spark.table("t_rule").show() } } -@TestMethodOrder(classOf[OrderAnnotation]) +@TestInstance(Lifecycle.PER_CLASS) class TestIndexRule { - val indexSpec = IndexSpec("t_rule", "t_rule_index", Seq("key"), Seq("sub_key")) + val tableName = TestIndexRule.dbName + ".t_rule" + val idxTableName = "t_rule_index" - @Test - @Order(1) + @BeforeAll def testCreateIndexManager(): Unit = { + val indexSpec = IndexSpec.create(tableName, idxTableName, Seq("key"), Seq("sub_key"))(spark) IndexManager.createNew(indexSpec)(spark) - } - @Test - @Order(2) - def testAppendNewPartitions(): Unit = { - val indexManager = IndexManager.load("t_rule_index")(spark) + val indexManager = IndexManager.load(idxTableName)(spark) spark.conf.set("index.manager.btree.num.parts", "2") spark.conf.set("index.manager.btree.interval", "3") spark.conf.set("index.manager.btree.height", "1") @@ -53,37 +50,64 @@ class TestIndexRule { Dione.enable(spark) Dione.getContext.addIndex(indexManager) + } - Assertions.assertEquals(10, spark.table("t_rule_index").count()) + @Test + def testIndexSize(): Unit = { + Assertions.assertEquals(10, spark.table(idxTableName).count()) } @Test - @Order(3) def testCoveringProject(): Unit = { - val df = spark.table("t_rule").select("key", "sub_key").where("sub_key=='sub_key_4'") + val df = spark.table(tableName).select("key", "sub_key") - Assertions.assertEquals(Seq("t_rule_index"), - df.queryExecution.optimizedPlan.collect { - case h: HiveTableRelation => - h.tableMeta.identifier.identifier - }) + AssertPlanUsesTable(df, TestIndexRule.dbName + "." + idxTableName) + + Assertions.assertEquals(10, df.collect().length) + } + + @Test + def testCoveringProjectFilter(): Unit = { + val df = spark.table(tableName).select("key", "sub_key").where("sub_key=='sub_key_4'") + + AssertPlanUsesTable(df, TestIndexRule.dbName + "." + idxTableName) Assertions.assertEquals("[4,sub_key_4]", df.collect().mkString(",")) } @Test - @Order(3) def testFilterEqualTo(): Unit = { - val df = spark.table("t_rule").select("key", "sub_key", "var2", "var1").where("key == '7'") + val df = spark.table(tableName).select("key", "sub_key", "var2", "var1").where("key == '7'") + + AssertPlanUsesTable(df, TestIndexRule.dbName + "." + idxTableName) -// df.explain(true) -// df.show() - Assertions.assertEquals(Seq("t_rule_index"), + Assertions.assertEquals("[7,sub_key_7,7,var_a_7]", df.collect().mkString(",")) + } + + + @Test + def testFilterEqualToWithPartition(): Unit = { + val df = spark.table(tableName).select("sub_key", "key", "dt", "var1").where("key == '7'") + + df.explain(true) + // df.show() + AssertPlanUsesTable(df, TestIndexRule.dbName + "." + idxTableName) + + Assertions.assertEquals("[sub_key_7,7,2021-10-04,var_a_7]", df.collect().mkString(",")) + } + + @Test + def testNoIndex(): Unit = { + val df = spark.table("t_rule").select("key", "sub_key", "var2", "var1").where("var2=6") + + AssertPlanUsesTable(df, tableName) + } + + private def AssertPlanUsesTable(df: Dataset[Row], tableName: String) = { + Assertions.assertEquals(Seq(tableName), df.queryExecution.optimizedPlan.collect { case h: HiveTableRelation => - h.tableMeta.identifier.identifier + h.tableMeta.identifier.database.get + "." + h.tableMeta.identifier.identifier }) - - Assertions.assertEquals("[7,sub_key_7,7,var_a_7]", df.collect().mkString(",")) } } diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/orc/TestOrcIndexManagerBase.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/orc/TestOrcIndexManagerBase.scala index 7720a9c5..652a2b97 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/orc/TestOrcIndexManagerBase.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/orc/TestOrcIndexManagerBase.scala @@ -8,7 +8,7 @@ import org.junit.jupiter.api._ @TestMethodOrder(classOf[OrderAnnotation]) class TestOrcIndexManagerBase extends TestIndexManagerBase { - lazy val indexSpec: IndexSpec = IndexSpec("orc_data_tbl", "orc_data_tbl_idx", Seq("id_col")) + lazy val indexSpec: IndexSpec = IndexSpec.create("orc_data_tbl", "orc_data_tbl_idx", Seq("id_col")) def initDataTable(fieldsSchema: String, partitionFieldSchema: String): Unit = { spark.sql(s"create table ${indexSpec.dataTableName} ($fieldsSchema) partitioned by ($partitionFieldSchema) stored as orc") diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManager.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManager.scala index 8737e3b7..41c893b0 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManager.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManager.scala @@ -45,20 +45,20 @@ class TestParquetIndexManager { @Test @Order(1) def testCreateIndexManager(): Unit = { - IndexManager.createNew(IndexSpec("parquet_tbl", "index_parquet_tbl", Seq("message_id", "sub_message_id"), Seq("time_result_created")))(spark) + IndexManager.createNew("parquet_tbl", "index_parquet_tbl", Seq("message_id", "sub_message_id"), Seq("time_result_created")) //spark.sql("desc formatted parquet_tbl").show(100, false) } @Test @Order(2) def testLoadIndexManager(): Unit = { - IndexManager.load("index_parquet_tbl")(spark) + IndexManager.load("index_parquet_tbl") } @Test @Order(3) def testAppendNewPartitions(): Unit = { - val indexManager = IndexManager.load("index_parquet_tbl")(spark) + val indexManager = IndexManager.load("index_parquet_tbl") indexManager.appendMissingPartitions() //spark.table("index_parquet_tbl").show(100, false) @@ -73,7 +73,7 @@ class TestParquetIndexManager { @Test @Order(4) def testLoadByIndex(): Unit = { - val indexManager = IndexManager.load("index_parquet_tbl")(spark) + val indexManager = IndexManager.load("index_parquet_tbl") val queryDF = indexManager.getIndex().where("message_id like '%g_2%' and dt='2018-10-04'") //queryDF.show(1000, false) diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManagerBase.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManagerBase.scala index 6e9bb16c..fb38e103 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManagerBase.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManagerBase.scala @@ -8,7 +8,7 @@ import org.junit.jupiter.api._ @TestMethodOrder(classOf[OrderAnnotation]) class TestParquetIndexManagerBase extends TestIndexManagerBase { - lazy val indexSpec: IndexSpec = IndexSpec("parquet_data_tbl", "parquet_data_tbl_idx", Seq("id_col"), Seq("meta_field")) + lazy val indexSpec: IndexSpec = IndexSpec.create("parquet_data_tbl", "parquet_data_tbl_idx", Seq("id_col"), Seq("meta_field")) def initDataTable(fieldsSchema: String, partitionFieldSchema: String): Unit = { val sc = spark.sparkContext diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSequenceFileIndexManagerBase.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSequenceFileIndexManagerBase.scala index eca4c931..60629998 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSequenceFileIndexManagerBase.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSequenceFileIndexManagerBase.scala @@ -8,7 +8,7 @@ import org.junit.jupiter.api._ @TestMethodOrder(classOf[OrderAnnotation]) class TestSequenceFileIndexManagerBase extends TestIndexManagerBase { - lazy val indexSpec: IndexSpec = IndexSpec("seq_file_data_tbl", "seq_file_data_tbl_idx", Seq("id_col"), Seq("meta_field")) + lazy val indexSpec: IndexSpec = IndexSpec.create("seq_file_data_tbl", "seq_file_data_tbl_idx", Seq("id_col"), Seq("meta_field")) def initDataTable(fieldsSchema: String, partitionFieldSchema: String): Unit = { spark.sql(s"create table ${indexSpec.dataTableName} ($fieldsSchema)" + diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSparkSequenceFile.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSparkSequenceFile.scala index 74ccd814..ed301cc5 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSparkSequenceFile.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSparkSequenceFile.scala @@ -84,7 +84,7 @@ class TestSparkSequenceFile { @Test def createNewIndex: Unit = { // dummy test to ensure initialization - val indexer = IndexManager.createNew(IndexSpec("foo", "index", Seq("id"), Seq("col0", "col1")))(spark) + val indexer = IndexManager.createNew("foo", "index", Seq("id"), Seq("col0", "col1")) assertEquals(0, spark.table(indexer.indexTableName).count()) } @@ -92,7 +92,7 @@ class TestSparkSequenceFile { @Order(2) @Test def addNewPartition: Unit = { - val indexer = IndexManager.load("index")(spark) + val indexer = IndexManager.load("index") spark.conf.set("index.manager.btree.parts", 5) @@ -122,7 +122,7 @@ class TestSparkSequenceFile { @Test def testLoadByIndex(): Unit = { import spark.implicits._ - val indexManager = IndexManager.load("index")(spark) + val indexManager = IndexManager.load("index") val indexTable = indexManager.getIndex() From e331b1f420380f6e52b61d5f8e59e95ef61dd740 Mon Sep 17 00:00:00 2001 From: oraviv Date: Tue, 31 May 2022 14:47:13 +0300 Subject: [PATCH 8/9] fix star expansion issue --- .../scala/com/paypal/dione/spark/Dione.scala | 9 +++++- .../com/paypal/dione/spark/DioneRule.scala | 10 +++++++ .../spark/execution/DioneIndexStrategy.scala | 6 ++-- .../dione/spark/index/IndexManager.scala | 3 ++ .../catalog/HiveIndexTableRelation.scala | 1 + .../index/optimizations/TestIndexRule.scala | 28 +++++++++++-------- 6 files changed, 43 insertions(+), 14 deletions(-) diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala index 6afff161..7a56cc66 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala @@ -19,8 +19,15 @@ object Dione { }) } - def enable(spark: SparkSession): Unit = { + def enable(implicit spark: SparkSession): Unit = { spark.sessionState.experimentalMethods.extraOptimizations ++= DioneRule :: Nil spark.sessionState.experimentalMethods.extraStrategies ++= DioneIndexStrategy :: Nil } + + def disable(implicit spark: SparkSession): Unit = { + spark.sessionState.experimentalMethods.extraOptimizations = + spark.sessionState.experimentalMethods.extraOptimizations.filterNot(_ == DioneRule) + spark.sessionState.experimentalMethods.extraStrategies = + spark.sessionState.experimentalMethods.extraStrategies.filterNot(_ == DioneIndexStrategy) + } } diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala index 9c8d2ed2..ef2a26a4 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala @@ -42,6 +42,16 @@ object DioneRule extends Rule[LogicalPlan] { h.dataCols.filter(dc => p.references.map(_.name).toSet.contains(dc.name))) p.copy(child = f.copy(child = new HiveIndexTableRelation(tableMeta = indexCatalogTable, dataCols = updatedAttributes, partitionCols = h.partitionCols, h, idx, literalFilter))) + + case f @ Filter(condition, h @ HiveTableRelation(_, _, _)) + if getSpecificFilter(condition, h).nonEmpty => + val (idx, literalFilter) = getSpecificFilter(condition, h).get + val indexCatalogTable = IndexManagerUtils.getSparkCatalogTable(Dione.getContext.spark, + idx.indexTableName) + val updatedAttributes = toAttributes(indexCatalogTable.dataSchema, + h.dataCols.filter(dc => f.references.map(_.name).toSet.contains(dc.name))) + f.copy(child = new HiveIndexTableRelation(tableMeta = indexCatalogTable, + dataCols = updatedAttributes, partitionCols = h.partitionCols, h, idx, literalFilter)) } } diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/execution/DioneIndexStrategy.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/execution/DioneIndexStrategy.scala index 70499beb..cce75191 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/execution/DioneIndexStrategy.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/execution/DioneIndexStrategy.scala @@ -2,6 +2,7 @@ package com.paypal.dione.spark.execution import com.paypal.dione.spark.Dione import com.paypal.dione.spark.index.IndexManager +import com.paypal.dione.spark.index.IndexManager.indexMetaFields import com.paypal.dione.spark.sql.catalyst.catalog.HiveIndexTableRelation import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.catalog.HiveTableRelation @@ -25,8 +26,9 @@ object DioneIndexStrategy extends Strategy { val idx = indexRelation.indexManager - IndexBtreeFetchExec(projectList.flatMap(_.references.toSeq).distinct, - indexRelation, idx, pruningPredicates, otherPredicates, indexRelation.literalFilters) :: Nil + IndexBtreeFetchExec(projectList.flatMap(_.references.toSeq) + .filterNot(p => indexMetaFields.contains(p.name)).distinct, + indexRelation, idx, pruningPredicates, otherPredicates, indexRelation.literalFilters) :: Nil case _ => Nil } diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala index 2e3a11cb..38261af0 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala @@ -1,5 +1,6 @@ package com.paypal.dione.spark.index +import com.paypal.dione.avro.hadoop.file.AvroBtreeFile import com.paypal.dione.hdfs.index.HdfsIndexContants.{FILE_NAME_COLUMN, OFFSET_COLUMN, SIZE_COLUMN, SUB_OFFSET_COLUMN} import com.paypal.dione.kvstorage.hadoop.avro.AvroHashBtreeStorageFolderReader import com.paypal.dione.spark.avro.btree.SparkAvroBtreeUtils @@ -49,6 +50,8 @@ object IndexManager { StructField(SIZE_COLUMN, IntegerType) )) + lazy val indexMetaFields = indexSchema.map(_.name).toSet + AvroBtreeFile.METADATA_COL_NAME + /** * Creates a new index table on a data table for the given keys. * diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/sql/catalyst/catalog/HiveIndexTableRelation.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/sql/catalyst/catalog/HiveIndexTableRelation.scala index 4a4f3562..6d59944c 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/sql/catalyst/catalog/HiveIndexTableRelation.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/sql/catalyst/catalog/HiveIndexTableRelation.scala @@ -12,4 +12,5 @@ class HiveIndexTableRelation(tableMeta: CatalogTable, val literalFilters: Seq[Literal]) extends HiveTableRelation(tableMeta, dataCols, partitionCols) { + override def output: Seq[AttributeReference] = hiveDataTableRelation.output } diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala index 4ba3030c..f9713dc3 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala @@ -60,46 +60,52 @@ class TestIndexRule { @Test def testCoveringProject(): Unit = { val df = spark.table(tableName).select("key", "sub_key") - AssertPlanUsesTable(df, TestIndexRule.dbName + "." + idxTableName) - Assertions.assertEquals(10, df.collect().length) } @Test def testCoveringProjectFilter(): Unit = { val df = spark.table(tableName).select("key", "sub_key").where("sub_key=='sub_key_4'") - AssertPlanUsesTable(df, TestIndexRule.dbName + "." + idxTableName) - Assertions.assertEquals("[4,sub_key_4]", df.collect().mkString(",")) } @Test def testFilterEqualTo(): Unit = { val df = spark.table(tableName).select("key", "sub_key", "var2", "var1").where("key == '7'") - AssertPlanUsesTable(df, TestIndexRule.dbName + "." + idxTableName) - Assertions.assertEquals("[7,sub_key_7,7,var_a_7]", df.collect().mkString(",")) } - @Test def testFilterEqualToWithPartition(): Unit = { val df = spark.table(tableName).select("sub_key", "key", "dt", "var1").where("key == '7'") - - df.explain(true) +// df.explain(true) // df.show() AssertPlanUsesTable(df, TestIndexRule.dbName + "." + idxTableName) - Assertions.assertEquals("[sub_key_7,7,2021-10-04,var_a_7]", df.collect().mkString(",")) } + @Test + def testFilterEqualToStar(): Unit = { + val df = spark.table(tableName).where("key == '7'") + df.explain(true) + AssertPlanUsesTable(df, TestIndexRule.dbName + "." + idxTableName) + Assertions.assertEquals("[7,sub_key_7,var_a_7,7,2021-10-04]", df.collect().mkString(",")) + } + + @Test + def testDisabled(): Unit = { + Dione.disable(spark) + val df = spark.table(tableName).select("sub_key", "key", "dt", "var1").where("key == '7'") + AssertPlanUsesTable(df, tableName) + Dione.enable(spark) + } + @Test def testNoIndex(): Unit = { val df = spark.table("t_rule").select("key", "sub_key", "var2", "var1").where("var2=6") - AssertPlanUsesTable(df, tableName) } From 1aa94b7ac80b158cd507b985be9e2256de455a52 Mon Sep 17 00:00:00 2001 From: oraviv Date: Tue, 31 May 2022 20:21:34 +0300 Subject: [PATCH 9/9] fix python tests --- dione-spark/src/main/python/dione/index_manager.py | 2 +- dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala | 2 +- .../src/main/scala/com/paypal/dione/spark/DioneContext.scala | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/dione-spark/src/main/python/dione/index_manager.py b/dione-spark/src/main/python/dione/index_manager.py index 0cb90f43..42505a9a 100644 --- a/dione-spark/src/main/python/dione/index_manager.py +++ b/dione-spark/src/main/python/dione/index_manager.py @@ -50,7 +50,7 @@ def create_new(spark, data_table_name, index_table_name, keys, more_fields=None) key = scala_helper.list_to_seq(keys) moreFields = scala_helper.list_to_seq(more_fields) idx_spec = scala_helper.get_object("com.paypal.dione.spark.index.IndexSpec") - is_java = idx_spec.apply(data_table_name, index_table_name, key, moreFields) + is_java = idx_spec.create(data_table_name, index_table_name, key, moreFields, spark._jsparkSession) im = scala_helper.get_object("com.paypal.dione.spark.index.IndexManager") \ .createNew(is_java, spark._jsparkSession) return IndexManager(spark, im) diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala index 7a56cc66..85e216c2 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala @@ -9,7 +9,7 @@ object Dione { def getContext(spark: SparkSession): DioneContext = { if (Option(dioneContext).isEmpty) - dioneContext = DioneContext(spark) + dioneContext = DioneContext()(spark) dioneContext } diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala index 7a6fe7a7..5b4433f4 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala @@ -6,7 +6,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import scala.collection.mutable -case class DioneContext(spark: SparkSession) { +case class DioneContext(implicit spark: SparkSession) { val indices: mutable.Map[String, Seq[IndexManager]] = mutable.HashMap() @@ -20,5 +20,6 @@ case class DioneContext(spark: SparkSession) { indices.getOrElse(indexManager.dataTableName, Seq()) ++ Seq(indexManager)) } + def addIndex(indexTable: String): Unit = addIndex(IndexManager.load(indexTable)) }