diff --git a/.github/workflows/pr.workflow.yml b/.github/workflows/pr.workflow.yml index 31e701c4..5eb09877 100644 --- a/.github/workflows/pr.workflow.yml +++ b/.github/workflows/pr.workflow.yml @@ -70,5 +70,6 @@ jobs: find . export PYTHONPATH=`echo dione-spark/target/dione-spark-*-SNAPSHOT.jar` python -V + pip install pypandoc==1.7.5 pip install pytest pyspark==2.4.8 pytest -v diff --git a/.gitignore b/.gitignore index 44fa7a67..68ac9874 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,6 @@ classes tmp TestData -settings-ossrh.xml \ No newline at end of file +settings-ossrh.xml + +.ipynb_checkpoints \ No newline at end of file diff --git a/dione-spark/src/main/python/dione/index_manager.py b/dione-spark/src/main/python/dione/index_manager.py index 0cb90f43..42505a9a 100644 --- a/dione-spark/src/main/python/dione/index_manager.py +++ b/dione-spark/src/main/python/dione/index_manager.py @@ -50,7 +50,7 @@ def create_new(spark, data_table_name, index_table_name, keys, more_fields=None) key = scala_helper.list_to_seq(keys) moreFields = scala_helper.list_to_seq(more_fields) idx_spec = scala_helper.get_object("com.paypal.dione.spark.index.IndexSpec") - is_java = idx_spec.apply(data_table_name, index_table_name, key, moreFields) + is_java = idx_spec.create(data_table_name, index_table_name, key, moreFields, spark._jsparkSession) im = scala_helper.get_object("com.paypal.dione.spark.index.IndexManager") \ .createNew(is_java, spark._jsparkSession) return IndexManager(spark, im) diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala new file mode 100644 index 00000000..85e216c2 --- /dev/null +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala @@ -0,0 +1,33 @@ +package com.paypal.dione.spark + +import com.paypal.dione.spark.execution.DioneIndexStrategy +import org.apache.spark.sql.SparkSession + +object Dione { + + private var dioneContext: DioneContext = _ + + def getContext(spark: SparkSession): DioneContext = { + if (Option(dioneContext).isEmpty) + dioneContext = DioneContext()(spark) + dioneContext + } + + def getContext: DioneContext = { + getContext(SparkSession.getActiveSession.getOrElse { + throw new RuntimeException("No active spark session found") + }) + } + + def enable(implicit spark: SparkSession): Unit = { + spark.sessionState.experimentalMethods.extraOptimizations ++= DioneRule :: Nil + spark.sessionState.experimentalMethods.extraStrategies ++= DioneIndexStrategy :: Nil + } + + def disable(implicit spark: SparkSession): Unit = { + spark.sessionState.experimentalMethods.extraOptimizations = + spark.sessionState.experimentalMethods.extraOptimizations.filterNot(_ == DioneRule) + spark.sessionState.experimentalMethods.extraStrategies = + spark.sessionState.experimentalMethods.extraStrategies.filterNot(_ == DioneIndexStrategy) + } +} diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala new file mode 100644 index 00000000..5b4433f4 --- /dev/null +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneContext.scala @@ -0,0 +1,25 @@ +package com.paypal.dione.spark + +import com.paypal.dione.spark.index.{IndexManager, IndexSpec} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier + +import scala.collection.mutable + +case class DioneContext(implicit spark: SparkSession) { + + val indices: mutable.Map[String, Seq[IndexManager]] = mutable.HashMap() + + def getIndices(tableIdentifier: TableIdentifier): Seq[IndexManager] = { + indices.getOrElse(tableIdentifier.database.getOrElse(spark.catalog.currentDatabase) + + "." + tableIdentifier.identifier, Nil) + } + + def addIndex(indexManager: IndexManager): Unit = { + indices.put(indexManager.dataTableName, + indices.getOrElse(indexManager.dataTableName, Seq()) ++ Seq(indexManager)) + } + + def addIndex(indexTable: String): Unit = addIndex(IndexManager.load(indexTable)) + +} diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala new file mode 100644 index 00000000..ef2a26a4 --- /dev/null +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala @@ -0,0 +1,91 @@ +package com.paypal.dione.spark + +import com.paypal.dione.spark.index.{IndexManager, IndexManagerUtils} +import com.paypal.dione.spark.sql.catalyst.catalog.HiveIndexTableRelation +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.StructType + +object DioneRule extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan transform { + + // For a query covering index we switch between the original table and the index table + case p @ Project(_, h @ HiveTableRelation(_, _, _)) + if getCoveringIndex(h, p.projectList.map(_.name)).nonEmpty => + val idx = getCoveringIndex(h, p.projectList.map(_.name)).get + val indexCatalogTable = IndexManagerUtils.getSparkCatalogTable(Dione.getContext.spark, + idx.indexTableName) + val updatedAttributes = toAttributes(indexCatalogTable.dataSchema, + h.dataCols.filter(dc => p.projectList.map(_.name).contains(dc.name))) + p.copy(child = h.copy(tableMeta = indexCatalogTable, dataCols = updatedAttributes)) + + case p @ Project(_, f @ Filter(_, h @ HiveTableRelation(_, _, _))) + if getCoveringIndex(h, p.projectList.map(_.name)).nonEmpty => + val idx = getCoveringIndex(h, p.projectList.map(_.name)).get + val indexCatalogTable = IndexManagerUtils.getSparkCatalogTable(Dione.getContext.spark, + idx.indexTableName) + val updatedAttributes = toAttributes(indexCatalogTable.dataSchema, + h.dataCols.filter(dc => p.projectList.map(_.name).contains(dc.name))) + p.copy(child = f.copy(child = h.copy(tableMeta = indexCatalogTable, dataCols = updatedAttributes))) + + // For a data lookup index we add relevant information to later use in the strategy + case p @ Project(_, f @ Filter(condition, h @ HiveTableRelation(_, _, _))) + if getSpecificFilter(condition, h).nonEmpty => + val (idx, literalFilter) = getSpecificFilter(condition, h).get + val indexCatalogTable = IndexManagerUtils.getSparkCatalogTable(Dione.getContext.spark, + idx.indexTableName) + val updatedAttributes = toAttributes(indexCatalogTable.dataSchema, + h.dataCols.filter(dc => p.references.map(_.name).toSet.contains(dc.name))) + p.copy(child = f.copy(child = new HiveIndexTableRelation(tableMeta = indexCatalogTable, + dataCols = updatedAttributes, partitionCols = h.partitionCols, h, idx, literalFilter))) + + case f @ Filter(condition, h @ HiveTableRelation(_, _, _)) + if getSpecificFilter(condition, h).nonEmpty => + val (idx, literalFilter) = getSpecificFilter(condition, h).get + val indexCatalogTable = IndexManagerUtils.getSparkCatalogTable(Dione.getContext.spark, + idx.indexTableName) + val updatedAttributes = toAttributes(indexCatalogTable.dataSchema, + h.dataCols.filter(dc => f.references.map(_.name).toSet.contains(dc.name))) + f.copy(child = new HiveIndexTableRelation(tableMeta = indexCatalogTable, + dataCols = updatedAttributes, partitionCols = h.partitionCols, h, idx, literalFilter)) + } + } + + // based on StructType.toAttributes() + def toAttributes(structType: StructType, origAttrs: Seq[AttributeReference]): Seq[AttributeReference] = { + val origMap = origAttrs.map(ar => ar.name -> ar).toMap + structType + .map(f => origMap.getOrElse(f.name, AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + } + + def getCoveringIndex(h: HiveTableRelation, referencedAtts: Seq[String]): Option[IndexManager] = { + Dione.getContext.getIndices(h.tableMeta.identifier) + .find(ci => referencedAtts.forall(ci.indexSpec.getFields.contains)) + } + + def getIndexForTable(h: HiveTableRelation): Option[IndexManager] = { + Dione.getContext.getIndices(h.tableMeta.identifier).headOption + } + + private def getSpecificFilter(condition: Expression, hiveTableRelation: HiveTableRelation): Option[(IndexManager, Seq[Literal])] = { + def findLiteralKeyExpression(key: String, p: Expression) = p match { + case EqualTo(left, right: Literal) if left.references.size == 1 && left.references.toSeq.head.name == key => Some(right) + case _ => None + } + + val idx = getIndexForTable(hiveTableRelation) + + if (idx.nonEmpty) { + val vals = idx.get.keys.map(k => condition.flatMap(p => findLiteralKeyExpression(k, p)).headOption) + if (vals.exists(_.isEmpty)) + None + else Some(idx.get, vals.map(v => Literal(v.get))) + } else { + None + } + } +} diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/SparkAvroBtreeUtils.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/SparkAvroBtreeUtils.scala index f0dbdbf9..f29065b5 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/SparkAvroBtreeUtils.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/SparkAvroBtreeUtils.scala @@ -68,7 +68,7 @@ object SparkAvroBtreeUtils { val partitionKeys = partitionsSpec.flatMap(_._1.map(_._1)).distinct val remainingColumns = df.columns.filterNot(c => keysSet.contains(c) || partitionKeys.contains(c)) - logger.info("writing index file to " + folderName + s" with interval: $interval, height: $height," + + logger.info("writing index file to " + folderName + s", with interval: $interval, height: $height," + s" partitionsSpec: $partitionsSpec") val repartitionedDF = customRepartition(df, keys, partitionsSpec) diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/execution/DioneIndexStrategy.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/execution/DioneIndexStrategy.scala new file mode 100644 index 00000000..cce75191 --- /dev/null +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/execution/DioneIndexStrategy.scala @@ -0,0 +1,36 @@ +package com.paypal.dione.spark.execution + +import com.paypal.dione.spark.Dione +import com.paypal.dione.spark.index.IndexManager +import com.paypal.dione.spark.index.IndexManager.indexMetaFields +import com.paypal.dione.spark.sql.catalyst.catalog.HiveIndexTableRelation +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, EqualTo, Expression, Literal} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan + +object DioneIndexStrategy extends Strategy { + + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + + case PhysicalOperation(projectList, predicates, indexRelation: HiveIndexTableRelation) => + // Filter out all predicates that only deal with partition keys, these are given to the + // hive table scan operator to be used for partition pruning. + val partitionKeyIds = AttributeSet(indexRelation.partitionCols) + val (pruningPredicates, otherPredicates) = predicates.partition { predicate => + !predicate.references.isEmpty && + predicate.references.subsetOf(partitionKeyIds) + } + + val idx = indexRelation.indexManager + + IndexBtreeFetchExec(projectList.flatMap(_.references.toSeq) + .filterNot(p => indexMetaFields.contains(p.name)).distinct, + indexRelation, idx, pruningPredicates, otherPredicates, indexRelation.literalFilters) :: Nil + case _ => + Nil + } + +} diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/execution/IndexBtreeFetchExec.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/execution/IndexBtreeFetchExec.scala new file mode 100644 index 00000000..a92ffa86 --- /dev/null +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/execution/IndexBtreeFetchExec.scala @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.paypal.dione.spark.execution + +import com.paypal.dione.kvstorage.hadoop.avro.AvroHashBtreeStorageFolderReader +import com.paypal.dione.spark.Dione +import com.paypal.dione.spark.index.IndexManager +import com.paypal.dione.spark.sql.catalyst.catalog.HiveIndexTableRelation +import org.apache.avro.util.Utf8 +import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.hive._ +import org.apache.spark.sql.types.{BooleanType, DataType, StructType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.{Partition, TaskContext} + + +import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ + + +/** + * copied and changed from HiveTableScanExec + */ +case class IndexBtreeFetchExec( + requestedAttributes: Seq[Attribute], + indexRelation: HiveIndexTableRelation, + idx: IndexManager, + partitionPruningPred: Seq[Expression], + dataFilters: Seq[Expression], + keys: Seq[Literal]) + extends LeafExecNode with CastSupport { + + def sparkSession = Dione.getContext.spark + + override def nodeName: String = s"Fetch From Index ${indexRelation.tableMeta.qualifiedName}" + + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + + override def producedAttributes: AttributeSet = outputSet ++ + AttributeSet(partitionPruningPred.flatMap(_.references)) + + private val originalAttributes = AttributeMap(indexRelation.output.map(a => a -> a) + ++ indexRelation.hiveDataTableRelation.output.map(a => a -> a)) + + override val output: Seq[Attribute] = { + // Retrieve the original attributes based on expression ID so that capitalization matches. + requestedAttributes.map(originalAttributes) + } + + // Bind all partition key attribute references in the partition pruning predicate for later + // evaluation. + private lazy val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred => + require(pred.dataType == BooleanType, + s"Data type of predicate $pred must be ${BooleanType.catalogString} rather than " + + s"${pred.dataType.catalogString}.") + + BindReferences.bindReference(pred, indexRelation.partitionCols) + } + + @transient private lazy val hiveQlTable = SparkSqlHiveUtils.toHiveTable(indexRelation.tableMeta) + + private def castFromString(value: String, dataType: DataType) = { + cast(Literal(value), dataType).eval(null) + } + + /** + * Prunes partitions not involve the query plan. + * + * @param partitions All partitions of the relation. + * @return Partitions that are involved in the query plan. + */ + private def prunePartitions(partitions: Seq[HivePartition]) = { + boundPruningPred match { + case None => partitions + case Some(shouldKeep) => partitions.filter { part => + val dataTypes = indexRelation.partitionCols.map(_.dataType) + val castedValues = part.getValues.asScala.zip(dataTypes) + .map { case (value, dataType) => castFromString(value, dataType) } + + // Only partitioned values are needed here, since the predicate has already been bound to + // partition key attribute references. + val row = InternalRow.fromSeq(castedValues) + shouldKeep.eval(row).asInstanceOf[Boolean] + } + } + } + + // exposed for tests + @transient lazy val rawPartitions = { + val prunedPartitions = + if (sparkSession.sessionState.conf.metastorePartitionPruning && + partitionPruningPred.size > 0) { + // Retrieve the original attributes based on expression ID so that capitalization matches. + val normalizedFilters = partitionPruningPred.map(_.transform { + case a: AttributeReference => originalAttributes(a) + }) + sparkSession.sessionState.catalog.listPartitionsByFilter( + indexRelation.tableMeta.identifier, + normalizedFilters) + } else { + sparkSession.sessionState.catalog.listPartitions(indexRelation.tableMeta.identifier) + } + prunedPartitions.map(SparkSqlHiveUtils.toHivePartition(_, hiveQlTable)) + } + + protected override def doExecute(): RDD[InternalRow] = { + + new RDD[InternalRow](Dione.getContext.spark.sparkContext, Nil) { + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val idxSplit = split.asInstanceOf[IndexPartition] + val avroHashBtreeFolderReader = AvroHashBtreeStorageFolderReader(idxSplit.path) + + val blahUTF2 = (a: Any) => a match { + case x: UTF8String => x.toString + case x => x + } + + val valueIter = avroHashBtreeFolderReader.getIterator(keys.map(k => blahUTF2(k.value))) + + val blahUTF8 = (a: Any) => a match { + case x: Utf8 => UTF8String.fromString(x.toString) + case x: String => UTF8String.fromString(x) + case x => x + } + + val cnvrt = UnsafeProjection.create(requestedAttributes, requestedAttributes) + val idxRequestedFields = requestedAttributes.map(_.name) + .filter(idx.moreFields.contains) + + val partitionMap = (indexRelation.partitionCols zip idxSplit.prtValues).map(p => p._1.name -> p._2).toMap + + val keysMap = (idx.keys zip keys).map(x => x._1 -> x._2.value).toMap + + val dataFieldsMap = indexRelation.hiveDataTableRelation.schema.fields.map(f => f.name -> f).toMap + val allButDataFieldsSet = (idxRequestedFields ++ partitionMap.keys ++ keysMap.keys).toSet + val dataRequestedFields = requestedAttributes.map(_.name) + .filterNot(f => allButDataFieldsSet.contains(f)) + val dataSchema = dataRequestedFields.map(f => dataFieldsMap(f)) + + valueIter.map(indexGR => { + val dataMap = idx.sparkIndexer.readPayload(indexGR, StructType(dataSchema)) + + val retSeq = requestedAttributes.map(_.name).map { + case f if idx.moreFields.contains(f) => indexGR.get(f) + case f if keysMap.contains(f) => keysMap(f) + case f if partitionMap.contains(f) => partitionMap(f) + case f if dataMap.contains(f) => dataMap(f) + case f => throw new RuntimeException(s"Field $f not found") + }.map(blahUTF8) + + val c = cnvrt(InternalRow.fromSeq(retSeq)) + c + }) + } + + override protected def getPartitions: Array[Partition] = { + val pp = prunePartitions(rawPartitions) + pp.zipWithIndex.map(p => IndexPartition(p._2, p._1.getValues.toList, p._1.getLocation)).toArray + } + } + } + +} + +case class IndexPartition(index: Int, prtValues: List[String], path: String) extends Partition diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala index 7dff275e..38261af0 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala @@ -1,5 +1,6 @@ package com.paypal.dione.spark.index +import com.paypal.dione.avro.hadoop.file.AvroBtreeFile import com.paypal.dione.hdfs.index.HdfsIndexContants.{FILE_NAME_COLUMN, OFFSET_COLUMN, SIZE_COLUMN, SUB_OFFSET_COLUMN} import com.paypal.dione.kvstorage.hadoop.avro.AvroHashBtreeStorageFolderReader import com.paypal.dione.spark.avro.btree.SparkAvroBtreeUtils @@ -9,8 +10,34 @@ import org.apache.spark.sql.functions.{col, expr} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, SparkSession} +object IndexSpec { + def create(dataTableName: String, indexTableName: String, + keys: Seq[String], moreFields: Seq[String] = Nil)(implicit sparkSession: SparkSession): IndexSpec = { + new IndexSpec(toFullTableName(dataTableName), toFullTableName(indexTableName), keys, moreFields) + } + + private def toFullTableName(tableName: String)(implicit sparkSession: SparkSession): String = { + tableName.split('.') match { + case Array(_, _) => + tableName + case Array(onlyTableName) => + sparkSession.catalog.currentDatabase + "." + onlyTableName + } + } +} + case class IndexSpec(dataTableName: String, indexTableName: String, - keys: Seq[String], moreFields: Seq[String] = Nil) + keys: Seq[String], moreFields: Seq[String] = Nil) { + + require(keys.nonEmpty, "you must provide at least one key") + require(dataTableName.contains("."), dataTableName + " must include DB name") + require(indexTableName.contains("."), indexTableName + " must include DB name") + + def getFields: Seq[String] = { + keys ++ moreFields + } + +} object IndexManager { @@ -23,6 +50,8 @@ object IndexManager { StructField(SIZE_COLUMN, IntegerType) )) + lazy val indexMetaFields = indexSchema.map(_.name).toSet + AvroBtreeFile.METADATA_COL_NAME + /** * Creates a new index table on a data table for the given keys. * @@ -30,6 +59,11 @@ object IndexManager { * @param spark * @return */ + def createNew(dataTableName: String, indexTableName: String, + keys: Seq[String], moreFields: Seq[String] = Nil)(implicit spark: SparkSession): IndexManager = { + createNew(IndexSpec.create(dataTableName, indexTableName, keys, moreFields)) + } + def createNew(indexSpec: IndexSpec)(implicit spark: SparkSession): IndexManager = { // TODO: assert index table doesn't exist @@ -58,7 +92,7 @@ object IndexManager { val dataTableName = tblProperties("index.meta.dataTableName") val keys = tblProperties("index.meta.keys").split("\\|") val moreFields = tblProperties("index.meta.moreFields").split("\\|").filterNot(_.isEmpty) - val indexSpec = IndexSpec(dataTableName, indexTableName, keys, moreFields) + val indexSpec = IndexSpec.create(dataTableName, indexTableName, keys, moreFields) // TODO - add the manager class to the table metadata, and pass explicitly here: val indexManager: IndexManager = IndexManagerUtils.createIndexManager(spark, indexSpec) diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexReader.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexReader.scala index 2d3e8b27..cd75916d 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexReader.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexReader.scala @@ -37,15 +37,6 @@ case class IndexReader(@transient spark: SparkSession, sparkIndexer: SparkIndexe def read(index: DataFrame): DataFrame = IndexReader.read(index, this) - def readPayload[T](indexGR: GenericRecord): Map[String, Any] = { - logger.debug("initing file: " + indexGR.get(FILE_NAME_COLUMN).toString) - val hdfsIndexer = sparkIndexer.initHdfsIndexer(new Path(indexGR.get(FILE_NAME_COLUMN).toString), - new Configuration(), fieldsSchema) - val hdfsIndexMetadata = HdfsIndexerMetadata(indexGR) - val fetchedT = hdfsIndexer.fetch(hdfsIndexMetadata) - sparkIndexer.convertMap(fetchedT) - } - private def init() = { reporter.initPartitionMetrics() } diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/index/SparkIndexer.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/index/SparkIndexer.scala index dcc05e70..eaa7a9cf 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/index/SparkIndexer.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/index/SparkIndexer.scala @@ -1,11 +1,13 @@ package com.paypal.dione.spark.index -import com.paypal.dione.hdfs.index.HdfsIndexer +import com.paypal.dione.hdfs.index.HdfsIndexContants.FILE_NAME_COLUMN +import com.paypal.dione.hdfs.index.{HdfsIndexer, HdfsIndexerMetadata} import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types.StructType +import org.slf4j.LoggerFactory /** * wrapper trait between to use HdfsIndexer inside Spark. @@ -21,6 +23,8 @@ import org.apache.spark.sql.types.StructType */ trait SparkIndexer { + private val logger = LoggerFactory.getLogger(this.getClass) + type T >: Null @transient val spark: SparkSession @@ -32,6 +36,7 @@ trait SparkIndexer { } def convert(t: T): Seq[Any] + def convertMap(t: T): Map[String, Any] /** @@ -46,7 +51,12 @@ trait SparkIndexer { } def readPayload(indexGR: GenericRecord, payloadSchema: StructType): Map[String, Any] = { - IndexReader(spark, this, payloadSchema, false).readPayload(indexGR) + logger.debug("initializing file: " + indexGR.get(FILE_NAME_COLUMN).toString) + val hdfsIndexer = initHdfsIndexer(new Path(indexGR.get(FILE_NAME_COLUMN).toString), + new Configuration(), payloadSchema) + val hdfsIndexMetadata = HdfsIndexerMetadata(indexGR) + val fetchedT = hdfsIndexer.fetch(hdfsIndexMetadata) + convertMap(fetchedT) } def loadByIndex(index: DataFrame, payloadSchema: StructType): DataFrame = { diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/sql/catalyst/catalog/HiveIndexTableRelation.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/sql/catalyst/catalog/HiveIndexTableRelation.scala new file mode 100644 index 00000000..6d59944c --- /dev/null +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/sql/catalyst/catalog/HiveIndexTableRelation.scala @@ -0,0 +1,16 @@ +package com.paypal.dione.spark.sql.catalyst.catalog + +import com.paypal.dione.spark.index.IndexManager +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal} + +class HiveIndexTableRelation(tableMeta: CatalogTable, + dataCols: Seq[AttributeReference], + partitionCols: Seq[AttributeReference], + val hiveDataTableRelation: HiveTableRelation, + val indexManager: IndexManager, + val literalFilters: Seq[Literal]) + extends HiveTableRelation(tableMeta, dataCols, partitionCols) { + + override def output: Seq[AttributeReference] = hiveDataTableRelation.output +} diff --git a/dione-spark/src/test/scala/com/paypal/dione/SparkTestBase.scala b/dione-spark/src/test/scala/com/paypal/dione/SparkTestBase.scala index 5dcfbfd3..d0b6874c 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/SparkTestBase.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/SparkTestBase.scala @@ -23,6 +23,7 @@ trait SparkTestBase { spark.conf.set("indexer.sampler.files.rate", "1.0") spark } + implicit def ss: SparkSession = spark lazy val hadoopConf = spark.sparkContext.hadoopConfiguration lazy val fs: FileSystem = FileSystem.get(hadoopConf) diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroDecimalConverter.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroDecimalConverter.scala index b288ecbf..3de18626 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroDecimalConverter.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroDecimalConverter.scala @@ -57,7 +57,8 @@ class TestAvroDecimalConverter { spark.table("foo").show(false) spark.table("foo").printSchema() - def create() = IndexManager.createNew(IndexSpec("foo", "index", Seq("id"), Seq("col1", "col2", "col3")))(spark) + def create() = IndexManager.createNew(IndexSpec.create("foo", "index", + Seq("id"), Seq("col1", "col2", "col3"))) // without setting indexer.castDecimalToDouble=true, should fail: Assertions.assertThrows(classOf[IllegalArgumentException], new Executable { diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManager.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManager.scala index ec79ee69..4f1653f6 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManager.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManager.scala @@ -41,7 +41,7 @@ class TestAvroIndexManager { @Test @Order(1) def testCreateIndexManager(): Unit = { - IndexManager.createNew(IndexSpec("t3", "index_t3", Seq("message_id", "sub_message_id"), Seq("time_result_created")))(spark) + IndexManager.createNew("t3", "index_t3", Seq("message_id", "sub_message_id"), Seq("time_result_created")) spark.sql("desc formatted index_t3").show(100, false) } diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerBase.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerBase.scala index be8d6cfd..86cdb96b 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerBase.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerBase.scala @@ -8,7 +8,7 @@ import org.junit.jupiter.api._ @TestMethodOrder(classOf[OrderAnnotation]) class TestAvroIndexManagerBase extends TestIndexManagerBase { - lazy val indexSpec: IndexSpec = IndexSpec("avro_data_tbl", "avro_data_tbl_idx", Seq("id_col")) + lazy val indexSpec: IndexSpec = IndexSpec.create("avro_data_tbl", "avro_data_tbl_idx", Seq("id_col")) def initDataTable(fieldsSchema: String, partitionFieldSchema: String): Unit = { spark.sql(s"create table ${indexSpec.dataTableName} ($fieldsSchema) partitioned by ($partitionFieldSchema) stored as avro") diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerJoin.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerJoin.scala index 0d332130..80de9076 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerJoin.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerJoin.scala @@ -41,13 +41,13 @@ class TestAvroIndexManagerJoin { @Test @Order(1) def testCreateIndexManager(): Unit = { - IndexManager.createNew(IndexSpec("t_join", "t_join_index", Seq("key"), Seq("ts")))(spark) + IndexManager.createNew("t_join", "t_join_index", Seq("key"), Seq("ts")) } @Test @Order(2) def testAppendNewPartitions(): Unit = { - val indexManager = IndexManager.load("t_join_index")(spark) + val indexManager = IndexManager.load("t_join_index") spark.conf.set("index.manager.btree.num.parts", "2") spark.conf.set("index.manager.btree.interval", "3") spark.conf.set("index.manager.btree.height", "2") diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerNonUnique.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerNonUnique.scala index a84fea3d..7634e44b 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerNonUnique.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManagerNonUnique.scala @@ -41,7 +41,7 @@ class TestAvroIndexManagerNonUnique { @Test @Order(1) def testCreateIndexManager(): Unit = { - IndexManager.createNew(IndexSpec("tbl", "index_tbl", Seq("key")))(spark) + IndexManager.createNew("tbl", "index_tbl", Seq("key")) spark.sql("desc formatted index_tbl").show(100, false) } diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestDataSplitter.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestDataSplitter.scala index 75dc6edf..9bafc3bd 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestDataSplitter.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestDataSplitter.scala @@ -38,7 +38,7 @@ case class DataSplitTester(spark: SparkSession, expectedNumChunks: Int, dataTabl def fullE2ETest(): Unit = { spark.conf.set("indexer.files.chunkMB", 1) - val manager = IndexManager.createNew(IndexSpec(dataTable, indexTable, Seq("id"), Seq("col1")))(spark) + val manager = IndexManager.createNew(dataTable, indexTable, Seq("id"), Seq("col1"))(spark) manager.appendMissingPartitions() val index = manager.getIndex() diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestOneLineInBlock.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestOneLineInBlock.scala index db1387a1..e67d6050 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestOneLineInBlock.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestOneLineInBlock.scala @@ -47,7 +47,7 @@ class TestOneLineInBlock { @Test @Order(1) def testCreateIndexManager(): Unit = { - IndexManager.createNew(IndexSpec("t3", "index_t3", Seq("message_id", "sub_message_id"), Seq("time_result_created")))(spark) + IndexManager.createNew("t3", "index_t3", Seq("message_id", "sub_message_id"), Seq("time_result_created")) spark.sql("desc formatted index_t3").show(100, false) // ensure one row per block import spark.implicits._ diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala new file mode 100644 index 00000000..f9713dc3 --- /dev/null +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/optimizations/TestIndexRule.scala @@ -0,0 +1,119 @@ +package com.paypal.dione.spark.index.optimizations + +import com.paypal.dione.SparkCleanTestDB +import com.paypal.dione.spark.Dione +import com.paypal.dione.spark.index.avro.TestAvroIndexManagerJoin.spark +import com.paypal.dione.spark.index.{IndexManager, IndexSpec} +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.junit.jupiter.api.TestInstance.Lifecycle +import org.junit.jupiter.api._ + + +object TestIndexRule extends SparkCleanTestDB { + + override val baseTestPath: String = "TestData/TestIndexRule/" + override val dbName: String = "TestIndexRule".toLowerCase + + @BeforeAll + def initData(): Unit = { + import spark.implicits._ + + spark.sql(s"create table t_rule (key string, sub_key string, var1 string, var2 int) " + + s"partitioned by (dt string) stored as avro") + + (0 until 10).map(i => (i, "sub_key_"+i, "var_a_" + i, i)) + .toDF("key", "sub_key", "var1", "var2").repartition(2).createOrReplaceTempView("t") + spark.sql(s"insert overwrite table t_rule partition (dt='2021-10-04') select * from t") + + //spark.table("t_rule").show() + } + +} + +@TestInstance(Lifecycle.PER_CLASS) +class TestIndexRule { + + val tableName = TestIndexRule.dbName + ".t_rule" + val idxTableName = "t_rule_index" + + @BeforeAll + def testCreateIndexManager(): Unit = { + val indexSpec = IndexSpec.create(tableName, idxTableName, Seq("key"), Seq("sub_key"))(spark) + IndexManager.createNew(indexSpec)(spark) + + val indexManager = IndexManager.load(idxTableName)(spark) + spark.conf.set("index.manager.btree.num.parts", "2") + spark.conf.set("index.manager.btree.interval", "3") + spark.conf.set("index.manager.btree.height", "1") + indexManager.appendMissingPartitions() + + Dione.enable(spark) + Dione.getContext.addIndex(indexManager) + } + + @Test + def testIndexSize(): Unit = { + Assertions.assertEquals(10, spark.table(idxTableName).count()) + } + + @Test + def testCoveringProject(): Unit = { + val df = spark.table(tableName).select("key", "sub_key") + AssertPlanUsesTable(df, TestIndexRule.dbName + "." + idxTableName) + Assertions.assertEquals(10, df.collect().length) + } + + @Test + def testCoveringProjectFilter(): Unit = { + val df = spark.table(tableName).select("key", "sub_key").where("sub_key=='sub_key_4'") + AssertPlanUsesTable(df, TestIndexRule.dbName + "." + idxTableName) + Assertions.assertEquals("[4,sub_key_4]", df.collect().mkString(",")) + } + + @Test + def testFilterEqualTo(): Unit = { + val df = spark.table(tableName).select("key", "sub_key", "var2", "var1").where("key == '7'") + AssertPlanUsesTable(df, TestIndexRule.dbName + "." + idxTableName) + Assertions.assertEquals("[7,sub_key_7,7,var_a_7]", df.collect().mkString(",")) + } + + @Test + def testFilterEqualToWithPartition(): Unit = { + val df = spark.table(tableName).select("sub_key", "key", "dt", "var1").where("key == '7'") +// df.explain(true) + // df.show() + AssertPlanUsesTable(df, TestIndexRule.dbName + "." + idxTableName) + Assertions.assertEquals("[sub_key_7,7,2021-10-04,var_a_7]", df.collect().mkString(",")) + } + + @Test + def testFilterEqualToStar(): Unit = { + val df = spark.table(tableName).where("key == '7'") + df.explain(true) + AssertPlanUsesTable(df, TestIndexRule.dbName + "." + idxTableName) + Assertions.assertEquals("[7,sub_key_7,var_a_7,7,2021-10-04]", df.collect().mkString(",")) + } + + @Test + def testDisabled(): Unit = { + Dione.disable(spark) + val df = spark.table(tableName).select("sub_key", "key", "dt", "var1").where("key == '7'") + AssertPlanUsesTable(df, tableName) + Dione.enable(spark) + } + + @Test + def testNoIndex(): Unit = { + val df = spark.table("t_rule").select("key", "sub_key", "var2", "var1").where("var2=6") + AssertPlanUsesTable(df, tableName) + } + + private def AssertPlanUsesTable(df: Dataset[Row], tableName: String) = { + Assertions.assertEquals(Seq(tableName), + df.queryExecution.optimizedPlan.collect { + case h: HiveTableRelation => + h.tableMeta.identifier.database.get + "." + h.tableMeta.identifier.identifier + }) + } +} diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/orc/TestOrcIndexManagerBase.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/orc/TestOrcIndexManagerBase.scala index 7720a9c5..652a2b97 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/orc/TestOrcIndexManagerBase.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/orc/TestOrcIndexManagerBase.scala @@ -8,7 +8,7 @@ import org.junit.jupiter.api._ @TestMethodOrder(classOf[OrderAnnotation]) class TestOrcIndexManagerBase extends TestIndexManagerBase { - lazy val indexSpec: IndexSpec = IndexSpec("orc_data_tbl", "orc_data_tbl_idx", Seq("id_col")) + lazy val indexSpec: IndexSpec = IndexSpec.create("orc_data_tbl", "orc_data_tbl_idx", Seq("id_col")) def initDataTable(fieldsSchema: String, partitionFieldSchema: String): Unit = { spark.sql(s"create table ${indexSpec.dataTableName} ($fieldsSchema) partitioned by ($partitionFieldSchema) stored as orc") diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManager.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManager.scala index 8737e3b7..41c893b0 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManager.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManager.scala @@ -45,20 +45,20 @@ class TestParquetIndexManager { @Test @Order(1) def testCreateIndexManager(): Unit = { - IndexManager.createNew(IndexSpec("parquet_tbl", "index_parquet_tbl", Seq("message_id", "sub_message_id"), Seq("time_result_created")))(spark) + IndexManager.createNew("parquet_tbl", "index_parquet_tbl", Seq("message_id", "sub_message_id"), Seq("time_result_created")) //spark.sql("desc formatted parquet_tbl").show(100, false) } @Test @Order(2) def testLoadIndexManager(): Unit = { - IndexManager.load("index_parquet_tbl")(spark) + IndexManager.load("index_parquet_tbl") } @Test @Order(3) def testAppendNewPartitions(): Unit = { - val indexManager = IndexManager.load("index_parquet_tbl")(spark) + val indexManager = IndexManager.load("index_parquet_tbl") indexManager.appendMissingPartitions() //spark.table("index_parquet_tbl").show(100, false) @@ -73,7 +73,7 @@ class TestParquetIndexManager { @Test @Order(4) def testLoadByIndex(): Unit = { - val indexManager = IndexManager.load("index_parquet_tbl")(spark) + val indexManager = IndexManager.load("index_parquet_tbl") val queryDF = indexManager.getIndex().where("message_id like '%g_2%' and dt='2018-10-04'") //queryDF.show(1000, false) diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManagerBase.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManagerBase.scala index 6e9bb16c..fb38e103 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManagerBase.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/parquet/TestParquetIndexManagerBase.scala @@ -8,7 +8,7 @@ import org.junit.jupiter.api._ @TestMethodOrder(classOf[OrderAnnotation]) class TestParquetIndexManagerBase extends TestIndexManagerBase { - lazy val indexSpec: IndexSpec = IndexSpec("parquet_data_tbl", "parquet_data_tbl_idx", Seq("id_col"), Seq("meta_field")) + lazy val indexSpec: IndexSpec = IndexSpec.create("parquet_data_tbl", "parquet_data_tbl_idx", Seq("id_col"), Seq("meta_field")) def initDataTable(fieldsSchema: String, partitionFieldSchema: String): Unit = { val sc = spark.sparkContext diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSequenceFileIndexManagerBase.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSequenceFileIndexManagerBase.scala index eca4c931..60629998 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSequenceFileIndexManagerBase.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSequenceFileIndexManagerBase.scala @@ -8,7 +8,7 @@ import org.junit.jupiter.api._ @TestMethodOrder(classOf[OrderAnnotation]) class TestSequenceFileIndexManagerBase extends TestIndexManagerBase { - lazy val indexSpec: IndexSpec = IndexSpec("seq_file_data_tbl", "seq_file_data_tbl_idx", Seq("id_col"), Seq("meta_field")) + lazy val indexSpec: IndexSpec = IndexSpec.create("seq_file_data_tbl", "seq_file_data_tbl_idx", Seq("id_col"), Seq("meta_field")) def initDataTable(fieldsSchema: String, partitionFieldSchema: String): Unit = { spark.sql(s"create table ${indexSpec.dataTableName} ($fieldsSchema)" + diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSparkSequenceFile.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSparkSequenceFile.scala index 74ccd814..ed301cc5 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSparkSequenceFile.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/sequence/TestSparkSequenceFile.scala @@ -84,7 +84,7 @@ class TestSparkSequenceFile { @Test def createNewIndex: Unit = { // dummy test to ensure initialization - val indexer = IndexManager.createNew(IndexSpec("foo", "index", Seq("id"), Seq("col0", "col1")))(spark) + val indexer = IndexManager.createNew("foo", "index", Seq("id"), Seq("col0", "col1")) assertEquals(0, spark.table(indexer.indexTableName).count()) } @@ -92,7 +92,7 @@ class TestSparkSequenceFile { @Order(2) @Test def addNewPartition: Unit = { - val indexer = IndexManager.load("index")(spark) + val indexer = IndexManager.load("index") spark.conf.set("index.manager.btree.parts", 5) @@ -122,7 +122,7 @@ class TestSparkSequenceFile { @Test def testLoadByIndex(): Unit = { import spark.implicits._ - val indexManager = IndexManager.load("index")(spark) + val indexManager = IndexManager.load("index") val indexTable = indexManager.getIndex()