Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object AvroHashBtreeStorageFolderReader {

logger.debug("listing files in folder: " + folderName)
val fileList = fs.listStatus(new Path(folderName)).map(_.getPath.getName).filter(_.endsWith(AVRO_BTREE_FILE_EXTENSION)).sorted
logger.debug("got file list with " + fileList.size + " files")
logger.debug("got file list with " + fileList.length + " files")

new AvroHashBtreeStorageFolderReader(folderName, fileList.toList)
}
Expand All @@ -47,7 +47,7 @@ case class AvroHashBtreeStorageFolderReader(folderName: String, fileList: List[S
getFile(numFile)
}

private def getFile(numFile: Int): AvroBtreeStorageFileReader = {
def getFile(numFile: Int): AvroBtreeStorageFileReader = {
val kvStorageFilename = new Path(folderName, fileList(numFile)).toString
AvroBtreeStorageFileReader(kvStorageFilename)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ object SparkAvroBtreeUtils {
* @param folderName base folder to write the data to
* @param numFilesInFolder number of files in each folder
*
* // btree related params per resulted file
* @param interval size of each block of the b-tree
* @param height number of levels (how deep)
* @param mode whether to override or to append
* // btree related params per resulted file
* @param interval size of each block of the b-tree
* @param height number of levels (how deep)
* @param mode whether to override or to append
*/
def writeDFasAvroBtree(df: DataFrame, keys: Seq[String], folderName: String,
def writeDFAsAvroBtree(df: DataFrame, keys: Seq[String], folderName: String,
numFilesInFolder: Int, interval: Int, height: Int,
mode: String = "overwrite"
)(implicit spark: SparkSession): Unit = {
Expand Down Expand Up @@ -71,11 +71,11 @@ object SparkAvroBtreeUtils {
logger.info("writing index file to " + folderName + s" with interval: $interval, height: $height," +
s" partitionsSpec: $partitionsSpec")

val repartitionedDF = customRepartition(df, keys, partitionsSpec)
val repartitionedDF = repartitionByKeysAndPartitions(df, keys, partitionsSpec)

repartitionedDF
.write
.partitionBy(partitionKeys:_*)
.partitionBy(partitionKeys: _*)
.mode(mode)
.format("com.paypal.dione.spark.avro.btree")
.option("key.fields", keys.mkString(","))
Expand Down Expand Up @@ -120,14 +120,14 @@ object SparkAvroBtreeUtils {
val partitionKeys = partitionsSpec.headOption.map(_.map(_._1)).getOrElse(Nil)

val indexTableValueSchema = spark.table(avroBtreeTable)
.drop(keys:_*)
.drop(keys: _*)
.drop(AvroBtreeFile.METADATA_COL_NAME)
.drop(partitionKeys:_*)
.drop(partitionKeys: _*)
.schema
val outputSchema = StructType(dsDF.schema ++ indexTableValueSchema)

// repartition the dsDF using our customRepartition to get the matching partitions
val repartitionedDF = SparkAvroBtreeUtils.customRepartition(filteredDsDf, keys, partitionsSpecWithNumFiles)
val repartitionedDF = SparkAvroBtreeUtils.repartitionByKeysAndPartitions(filteredDsDf, keys, partitionsSpecWithNumFiles)

// on-the-fly join between the matching partitions. both sides are sorted, so we just "merge-join" them
val joinedDF = repartitionedDF.mapPartitions((it: Iterator[Row]) =>
Expand All @@ -136,41 +136,34 @@ object SparkAvroBtreeUtils {
else {
val bufIt = it.buffered
val row = bufIt.head
val folder = tableLocationStr + "/" + partitionKeys.map(pk => (pk, row.getAs[String](pk))).map(p => p._1 + "=" + p._2).mkString("/")
val fs = new Path(folder).getFileSystem(new Configuration)
if (!fs.exists(new Path(folder)))
val folder = tableLocationStr + "/" + getFolderLocationFromRow(row, partitionKeys)
if (!checkIfFolderExists(folder))
Iterator.empty
else {

val avroBtreeStorageFileReader = AvroHashBtreeStorageFolderReader(folder).getFile(keys.map(row.getAs[Any]))
val avroBtreeStorageFileReader = AvroHashBtreeStorageFolderReader(folder).getFile(row.getAs[Int](KEY_HASH_COLUMN))
val kvIt = avroBtreeStorageFileReader.getIterator().buffered
val converter = AvroSerializerHelper.avroDeserializer(avroBtreeStorageFileReader.fileReader.getValueSchema, indexTableValueSchema)

bufIt.flatMap(row => {
val key1GR = avroBtreeStorageFileReader.toGR(keys.map(row.getAs[Any]))
var cmp = -100
while (cmp<0 && kvIt.hasNext) {
val head = kvIt.head
cmp = GenericData.get.compare(head._1, key1GR, avroBtreeStorageFileReader.fileReader.getKeySchema)
logger.debug("comparing file key {} with DS key {} and got {}", head._1, key1GR, cmp+"")
if (cmp < 0)
kvIt.next()
}
if (!kvIt.hasNext || cmp>0)
val a = avroBtreeStorageFileReader.get(key1GR)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure that this is the correct baseline to compare, as I have a feeling that it will be much worse then just reading through all the index file (as was before). Even now after fixing #73 it should be much faster.

if (a.isEmpty)
Iterator.empty
else {
val nxt = kvIt.head
// taking the dsDF's data without the last two fields (keyhash, prthash)
val nxt = a.get
// taking the dsDF's data without the last field (keyhash)
// and the value record from the avro-btree file
Iterator(Row.fromSeq(row.toSeq.slice(0, row.size - 2) ++
converter(nxt._2).get.asInstanceOf[InternalRow].toSeq(indexTableValueSchema).map {
Iterator(Row.fromSeq(row.toSeq.slice(0, row.size - 1) ++
converter(nxt).get.asInstanceOf[InternalRow].toSeq(indexTableValueSchema).map {
case f: UTF8String => f.toString
case f => f
}))
}
})
}
}
})(RowEncoder(outputSchema))
)(RowEncoder(outputSchema))
joinedDF
}

Expand All @@ -183,8 +176,8 @@ object SparkAvroBtreeUtils {
* partition and number of files the partition's folder.
* @return repartitioned DataFrame where each partition contains only rows of the same hashkey and partition.
*/
def customRepartition(df: DataFrame, keys: Seq[String],
partitionsSpec: Seq[(Seq[(String, String)], Int)]): DataFrame = {
def repartitionByKeysAndPartitions(df: DataFrame, keys: Seq[String],
partitionsSpec: Seq[(Seq[(String, String)], Int)]): DataFrame = {

val dupPartitionValues = partitionsSpec.map(_._1).groupBy(identity).map(t => (t._1, t._2.size)).filter(_._2 > 1).keys
assert(dupPartitionValues.isEmpty, "Found duplicated partitions: " + dupPartitionValues)
Expand All @@ -201,10 +194,12 @@ object SparkAvroBtreeUtils {
val prtsIndex: Map[Seq[String], (Int, Int)] = partitionsSpec.map(sq => {
val sqMap = sq._1.toMap
(partitionKeys.map(sqMap), sq._2)
}).zipWithIndex.map(t => {s+=t._1._2; (t._1._1, (t._1._2, s-t._1._2))}).toMap
}).zipWithIndex.map(t => {
s += t._1._2; (t._1._1, (t._1._2, s - t._1._2))
}).toMap

val prtudf = spark.udf.register("prtudf", (prtSpec: mutable.WrappedArray[String]) =>
prtsIndex.getOrElse(prtSpec, (0,0)))
prtsIndex.getOrElse(prtSpec, (0, 0)))

// for each "static" partition, we would like to have numFilesInPartition files.
// and because each task creates one file, we want (numFilesInPartition * numOfPartitions) tasks.
Expand All @@ -227,7 +222,15 @@ object SparkAvroBtreeUtils {

})

spark.createDataFrame(customPartitionedRDD.map(_._2), dfWithHashes.schema)
.sortWithinPartitions((partitionKeys ++ keys).map(col):_*)
spark.createDataFrame(customPartitionedRDD.map(_._2), dfWithHashes.schema).drop(PARTITION_HASH_COLUMN)
.sortWithinPartitions((partitionKeys ++ keys).map(col): _*)
}

private def getFolderLocationFromRow(row: Row, partitionKeys: Seq[String]) =
partitionKeys.map(pk => (pk, row.getAs[String](pk))).map(p => p._1 + "=" + p._2).mkString("/")

private def checkIfFolderExists(folder: String) = {
val fs = new Path(folder).getFileSystem(new Configuration)
fs.exists(new Path(folder))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ case class IndexManager(@transient val spark: SparkSession, sparkIndexer: SparkI
* @param partitionsSpec the new partitions to index
*/
def appendNewPartitions(partitionsSpec: Seq[Seq[(String, String)]]): Unit = {

require(partitionsSpec.nonEmpty, "The partitionsSpec parameter cannot be empty")

val partitionLocations = IndexManagerUtils.getPartitionLocations(dataTableName, partitionsSpec, spark)

val fileLikeExp = spark.conf.get("index.manager.file.filter", "%")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ object SparkTestBase {
trait SparkTestBase {

lazy val spark: SparkSession = {
val sparkConf = new SparkConf().setMaster(s"local[1]").set("spark.sql.shuffle.partitions", "3").setAppName("Spark Unit Test")
val sparkConf = new SparkConf().setMaster(s"local[1]").set("spark.sql.shuffle.partitions", "3")
.set("spark.driver.bindAddress", "127.0.0.1").setAppName("Spark Unit Test")
val builder = SparkSession.builder().config(sparkConf).enableHiveSupport()
val spark = builder.getOrCreate()
// TODO really needed?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class TestAvroIndexManager {
@Test
def testKVstoreNoIndex(): Unit = {
val kvFolder = baseTestPath + "kv_no_index"
SparkAvroBtreeUtils.writeDFasAvroBtree(spark.table("t3"), Seq("message_id"), kvFolder,
SparkAvroBtreeUtils.writeDFAsAvroBtree(spark.table("t3"), Seq("message_id"), kvFolder,
5, 5, 2)(spark)

val kvGetter = AvroHashBtreeStorageFolderReader(kvFolder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class TestAvroSparkIndexer {
def testSparkStoringIndexInKV(): Unit = {
val folderPath = baseTestPath + "spark_index"

SparkAvroBtreeUtils.writeDFasAvroBtree(spark.table("indexed_df"),
SparkAvroBtreeUtils.writeDFAsAvroBtree(spark.table("indexed_df"),
Seq("message_id"), folderPath, 3, 10, 2)(spark)

val location = new File(folderPath).getAbsoluteFile.toString
Expand Down