Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr.workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,6 @@ jobs:
find .
export PYTHONPATH=`echo dione-spark/target/dione-spark-*-SNAPSHOT.jar`
python -V
pip install pypandoc==1.7.5
pip install pytest pyspark==2.4.8
pytest -v
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ classes
tmp

TestData
settings-ossrh.xml
settings-ossrh.xml

.ipynb_checkpoints
2 changes: 1 addition & 1 deletion dione-spark/src/main/python/dione/index_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def create_new(spark, data_table_name, index_table_name, keys, more_fields=None)
key = scala_helper.list_to_seq(keys)
moreFields = scala_helper.list_to_seq(more_fields)
idx_spec = scala_helper.get_object("com.paypal.dione.spark.index.IndexSpec")
is_java = idx_spec.apply(data_table_name, index_table_name, key, moreFields)
is_java = idx_spec.create(data_table_name, index_table_name, key, moreFields, spark._jsparkSession)
im = scala_helper.get_object("com.paypal.dione.spark.index.IndexManager") \
.createNew(is_java, spark._jsparkSession)
return IndexManager(spark, im)
Expand Down
33 changes: 33 additions & 0 deletions dione-spark/src/main/scala/com/paypal/dione/spark/Dione.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.paypal.dione.spark

import com.paypal.dione.spark.execution.DioneIndexStrategy
import org.apache.spark.sql.SparkSession

object Dione {

private var dioneContext: DioneContext = _

def getContext(spark: SparkSession): DioneContext = {
if (Option(dioneContext).isEmpty)
dioneContext = DioneContext()(spark)
dioneContext
}

def getContext: DioneContext = {
getContext(SparkSession.getActiveSession.getOrElse {
throw new RuntimeException("No active spark session found")
})
}

def enable(implicit spark: SparkSession): Unit = {
spark.sessionState.experimentalMethods.extraOptimizations ++= DioneRule :: Nil
spark.sessionState.experimentalMethods.extraStrategies ++= DioneIndexStrategy :: Nil
}

def disable(implicit spark: SparkSession): Unit = {
spark.sessionState.experimentalMethods.extraOptimizations =
spark.sessionState.experimentalMethods.extraOptimizations.filterNot(_ == DioneRule)
spark.sessionState.experimentalMethods.extraStrategies =
spark.sessionState.experimentalMethods.extraStrategies.filterNot(_ == DioneIndexStrategy)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.paypal.dione.spark

import com.paypal.dione.spark.index.{IndexManager, IndexSpec}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier

import scala.collection.mutable

case class DioneContext(implicit spark: SparkSession) {

val indices: mutable.Map[String, Seq[IndexManager]] = mutable.HashMap()

def getIndices(tableIdentifier: TableIdentifier): Seq[IndexManager] = {
indices.getOrElse(tableIdentifier.database.getOrElse(spark.catalog.currentDatabase) +
"." + tableIdentifier.identifier, Nil)
}

def addIndex(indexManager: IndexManager): Unit = {
indices.put(indexManager.dataTableName,
indices.getOrElse(indexManager.dataTableName, Seq()) ++ Seq(indexManager))
}

def addIndex(indexTable: String): Unit = addIndex(IndexManager.load(indexTable))

}
91 changes: 91 additions & 0 deletions dione-spark/src/main/scala/com/paypal/dione/spark/DioneRule.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.paypal.dione.spark

import com.paypal.dione.spark.index.{IndexManager, IndexManagerUtils}
import com.paypal.dione.spark.sql.catalyst.catalog.HiveIndexTableRelation
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.StructType

object DioneRule extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = {
plan transform {

// For a query covering index we switch between the original table and the index table
case p @ Project(_, h @ HiveTableRelation(_, _, _))
if getCoveringIndex(h, p.projectList.map(_.name)).nonEmpty =>
val idx = getCoveringIndex(h, p.projectList.map(_.name)).get
val indexCatalogTable = IndexManagerUtils.getSparkCatalogTable(Dione.getContext.spark,
idx.indexTableName)
val updatedAttributes = toAttributes(indexCatalogTable.dataSchema,
h.dataCols.filter(dc => p.projectList.map(_.name).contains(dc.name)))
p.copy(child = h.copy(tableMeta = indexCatalogTable, dataCols = updatedAttributes))

case p @ Project(_, f @ Filter(_, h @ HiveTableRelation(_, _, _)))
if getCoveringIndex(h, p.projectList.map(_.name)).nonEmpty =>
val idx = getCoveringIndex(h, p.projectList.map(_.name)).get
val indexCatalogTable = IndexManagerUtils.getSparkCatalogTable(Dione.getContext.spark,
idx.indexTableName)
val updatedAttributes = toAttributes(indexCatalogTable.dataSchema,
h.dataCols.filter(dc => p.projectList.map(_.name).contains(dc.name)))
p.copy(child = f.copy(child = h.copy(tableMeta = indexCatalogTable, dataCols = updatedAttributes)))

// For a data lookup index we add relevant information to later use in the strategy
case p @ Project(_, f @ Filter(condition, h @ HiveTableRelation(_, _, _)))
if getSpecificFilter(condition, h).nonEmpty =>
val (idx, literalFilter) = getSpecificFilter(condition, h).get
val indexCatalogTable = IndexManagerUtils.getSparkCatalogTable(Dione.getContext.spark,
idx.indexTableName)
val updatedAttributes = toAttributes(indexCatalogTable.dataSchema,
h.dataCols.filter(dc => p.references.map(_.name).toSet.contains(dc.name)))
p.copy(child = f.copy(child = new HiveIndexTableRelation(tableMeta = indexCatalogTable,
dataCols = updatedAttributes, partitionCols = h.partitionCols, h, idx, literalFilter)))

case f @ Filter(condition, h @ HiveTableRelation(_, _, _))
if getSpecificFilter(condition, h).nonEmpty =>
val (idx, literalFilter) = getSpecificFilter(condition, h).get
val indexCatalogTable = IndexManagerUtils.getSparkCatalogTable(Dione.getContext.spark,
idx.indexTableName)
val updatedAttributes = toAttributes(indexCatalogTable.dataSchema,
h.dataCols.filter(dc => f.references.map(_.name).toSet.contains(dc.name)))
f.copy(child = new HiveIndexTableRelation(tableMeta = indexCatalogTable,
dataCols = updatedAttributes, partitionCols = h.partitionCols, h, idx, literalFilter))
}
}

// based on StructType.toAttributes()
def toAttributes(structType: StructType, origAttrs: Seq[AttributeReference]): Seq[AttributeReference] = {
val origMap = origAttrs.map(ar => ar.name -> ar).toMap
structType
.map(f => origMap.getOrElse(f.name, AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
}

def getCoveringIndex(h: HiveTableRelation, referencedAtts: Seq[String]): Option[IndexManager] = {
Dione.getContext.getIndices(h.tableMeta.identifier)
.find(ci => referencedAtts.forall(ci.indexSpec.getFields.contains))
}

def getIndexForTable(h: HiveTableRelation): Option[IndexManager] = {
Dione.getContext.getIndices(h.tableMeta.identifier).headOption
}

private def getSpecificFilter(condition: Expression, hiveTableRelation: HiveTableRelation): Option[(IndexManager, Seq[Literal])] = {
def findLiteralKeyExpression(key: String, p: Expression) = p match {
case EqualTo(left, right: Literal) if left.references.size == 1 && left.references.toSeq.head.name == key => Some(right)
case _ => None
}

val idx = getIndexForTable(hiveTableRelation)

if (idx.nonEmpty) {
val vals = idx.get.keys.map(k => condition.flatMap(p => findLiteralKeyExpression(k, p)).headOption)
if (vals.exists(_.isEmpty))
None
else Some(idx.get, vals.map(v => Literal(v.get)))
} else {
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object SparkAvroBtreeUtils {
val partitionKeys = partitionsSpec.flatMap(_._1.map(_._1)).distinct
val remainingColumns = df.columns.filterNot(c => keysSet.contains(c) || partitionKeys.contains(c))

logger.info("writing index file to " + folderName + s" with interval: $interval, height: $height," +
logger.info("writing index file to " + folderName + s", with interval: $interval, height: $height," +
s" partitionsSpec: $partitionsSpec")

val repartitionedDF = customRepartition(df, keys, partitionsSpec)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.paypal.dione.spark.execution

import com.paypal.dione.spark.Dione
import com.paypal.dione.spark.index.IndexManager
import com.paypal.dione.spark.index.IndexManager.indexMetaFields
import com.paypal.dione.spark.sql.catalyst.catalog.HiveIndexTableRelation
import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, EqualTo, Expression, Literal}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan

object DioneIndexStrategy extends Strategy {

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

case PhysicalOperation(projectList, predicates, indexRelation: HiveIndexTableRelation) =>
// Filter out all predicates that only deal with partition keys, these are given to the
// hive table scan operator to be used for partition pruning.
val partitionKeyIds = AttributeSet(indexRelation.partitionCols)
val (pruningPredicates, otherPredicates) = predicates.partition { predicate =>
!predicate.references.isEmpty &&
predicate.references.subsetOf(partitionKeyIds)
}

val idx = indexRelation.indexManager

IndexBtreeFetchExec(projectList.flatMap(_.references.toSeq)
.filterNot(p => indexMetaFields.contains(p.name)).distinct,
indexRelation, idx, pruningPredicates, otherPredicates, indexRelation.literalFilters) :: Nil
case _ =>
Nil
}

}
Loading