diff --git a/dione-hadoop/src/main/scala/com/paypal/dione/kvstorage/hadoop/avro/AvroHashBtreeStorageFolderReader.scala b/dione-hadoop/src/main/scala/com/paypal/dione/kvstorage/hadoop/avro/AvroHashBtreeStorageFolderReader.scala index 2a2f033c..67d56aa4 100644 --- a/dione-hadoop/src/main/scala/com/paypal/dione/kvstorage/hadoop/avro/AvroHashBtreeStorageFolderReader.scala +++ b/dione-hadoop/src/main/scala/com/paypal/dione/kvstorage/hadoop/avro/AvroHashBtreeStorageFolderReader.scala @@ -22,7 +22,7 @@ object AvroHashBtreeStorageFolderReader { logger.debug("listing files in folder: " + folderName) val fileList = fs.listStatus(new Path(folderName)).map(_.getPath.getName).filter(_.endsWith(AVRO_BTREE_FILE_EXTENSION)).sorted - logger.debug("got file list with " + fileList.size + " files") + logger.debug("got file list with " + fileList.length + " files") new AvroHashBtreeStorageFolderReader(folderName, fileList.toList) } @@ -47,7 +47,7 @@ case class AvroHashBtreeStorageFolderReader(folderName: String, fileList: List[S getFile(numFile) } - private def getFile(numFile: Int): AvroBtreeStorageFileReader = { + def getFile(numFile: Int): AvroBtreeStorageFileReader = { val kvStorageFilename = new Path(folderName, fileList(numFile)).toString AvroBtreeStorageFileReader(kvStorageFilename) } diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/SparkAvroBtreeUtils.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/SparkAvroBtreeUtils.scala index a9a510f7..70fcd163 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/SparkAvroBtreeUtils.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/avro/btree/SparkAvroBtreeUtils.scala @@ -38,12 +38,12 @@ object SparkAvroBtreeUtils { * @param folderName base folder to write the data to * @param numFilesInFolder number of files in each folder * - * // btree related params per resulted file - * @param interval size of each block of the b-tree - * @param height number of levels (how deep) - * @param mode whether to override or to append + * // btree related params per resulted file + * @param interval size of each block of the b-tree + * @param height number of levels (how deep) + * @param mode whether to override or to append */ - def writeDFasAvroBtree(df: DataFrame, keys: Seq[String], folderName: String, + def writeDFAsAvroBtree(df: DataFrame, keys: Seq[String], folderName: String, numFilesInFolder: Int, interval: Int, height: Int, mode: String = "overwrite" )(implicit spark: SparkSession): Unit = { @@ -71,11 +71,11 @@ object SparkAvroBtreeUtils { logger.info("writing index file to " + folderName + s" with interval: $interval, height: $height," + s" partitionsSpec: $partitionsSpec") - val repartitionedDF = customRepartition(df, keys, partitionsSpec) + val repartitionedDF = repartitionByKeysAndPartitions(df, keys, partitionsSpec) repartitionedDF .write - .partitionBy(partitionKeys:_*) + .partitionBy(partitionKeys: _*) .mode(mode) .format("com.paypal.dione.spark.avro.btree") .option("key.fields", keys.mkString(",")) @@ -120,14 +120,14 @@ object SparkAvroBtreeUtils { val partitionKeys = partitionsSpec.headOption.map(_.map(_._1)).getOrElse(Nil) val indexTableValueSchema = spark.table(avroBtreeTable) - .drop(keys:_*) + .drop(keys: _*) .drop(AvroBtreeFile.METADATA_COL_NAME) - .drop(partitionKeys:_*) + .drop(partitionKeys: _*) .schema val outputSchema = StructType(dsDF.schema ++ indexTableValueSchema) // repartition the dsDF using our customRepartition to get the matching partitions - val repartitionedDF = SparkAvroBtreeUtils.customRepartition(filteredDsDf, keys, partitionsSpecWithNumFiles) + val repartitionedDF = SparkAvroBtreeUtils.repartitionByKeysAndPartitions(filteredDsDf, keys, partitionsSpecWithNumFiles) // on-the-fly join between the matching partitions. both sides are sorted, so we just "merge-join" them val joinedDF = repartitionedDF.mapPartitions((it: Iterator[Row]) => @@ -136,41 +136,34 @@ object SparkAvroBtreeUtils { else { val bufIt = it.buffered val row = bufIt.head - val folder = tableLocationStr + "/" + partitionKeys.map(pk => (pk, row.getAs[String](pk))).map(p => p._1 + "=" + p._2).mkString("/") - val fs = new Path(folder).getFileSystem(new Configuration) - if (!fs.exists(new Path(folder))) + val folder = tableLocationStr + "/" + getFolderLocationFromRow(row, partitionKeys) + if (!checkIfFolderExists(folder)) Iterator.empty else { - val avroBtreeStorageFileReader = AvroHashBtreeStorageFolderReader(folder).getFile(keys.map(row.getAs[Any])) + val avroBtreeStorageFileReader = AvroHashBtreeStorageFolderReader(folder).getFile(row.getAs[Int](KEY_HASH_COLUMN)) val kvIt = avroBtreeStorageFileReader.getIterator().buffered val converter = AvroSerializerHelper.avroDeserializer(avroBtreeStorageFileReader.fileReader.getValueSchema, indexTableValueSchema) bufIt.flatMap(row => { val key1GR = avroBtreeStorageFileReader.toGR(keys.map(row.getAs[Any])) - var cmp = -100 - while (cmp<0 && kvIt.hasNext) { - val head = kvIt.head - cmp = GenericData.get.compare(head._1, key1GR, avroBtreeStorageFileReader.fileReader.getKeySchema) - logger.debug("comparing file key {} with DS key {} and got {}", head._1, key1GR, cmp+"") - if (cmp < 0) - kvIt.next() - } - if (!kvIt.hasNext || cmp>0) + val a = avroBtreeStorageFileReader.get(key1GR) + if (a.isEmpty) Iterator.empty else { - val nxt = kvIt.head - // taking the dsDF's data without the last two fields (keyhash, prthash) + val nxt = a.get + // taking the dsDF's data without the last field (keyhash) // and the value record from the avro-btree file - Iterator(Row.fromSeq(row.toSeq.slice(0, row.size - 2) ++ - converter(nxt._2).get.asInstanceOf[InternalRow].toSeq(indexTableValueSchema).map { + Iterator(Row.fromSeq(row.toSeq.slice(0, row.size - 1) ++ + converter(nxt).get.asInstanceOf[InternalRow].toSeq(indexTableValueSchema).map { case f: UTF8String => f.toString case f => f })) } }) + } } - })(RowEncoder(outputSchema)) + )(RowEncoder(outputSchema)) joinedDF } @@ -183,8 +176,8 @@ object SparkAvroBtreeUtils { * partition and number of files the partition's folder. * @return repartitioned DataFrame where each partition contains only rows of the same hashkey and partition. */ - def customRepartition(df: DataFrame, keys: Seq[String], - partitionsSpec: Seq[(Seq[(String, String)], Int)]): DataFrame = { + def repartitionByKeysAndPartitions(df: DataFrame, keys: Seq[String], + partitionsSpec: Seq[(Seq[(String, String)], Int)]): DataFrame = { val dupPartitionValues = partitionsSpec.map(_._1).groupBy(identity).map(t => (t._1, t._2.size)).filter(_._2 > 1).keys assert(dupPartitionValues.isEmpty, "Found duplicated partitions: " + dupPartitionValues) @@ -201,10 +194,12 @@ object SparkAvroBtreeUtils { val prtsIndex: Map[Seq[String], (Int, Int)] = partitionsSpec.map(sq => { val sqMap = sq._1.toMap (partitionKeys.map(sqMap), sq._2) - }).zipWithIndex.map(t => {s+=t._1._2; (t._1._1, (t._1._2, s-t._1._2))}).toMap + }).zipWithIndex.map(t => { + s += t._1._2; (t._1._1, (t._1._2, s - t._1._2)) + }).toMap val prtudf = spark.udf.register("prtudf", (prtSpec: mutable.WrappedArray[String]) => - prtsIndex.getOrElse(prtSpec, (0,0))) + prtsIndex.getOrElse(prtSpec, (0, 0))) // for each "static" partition, we would like to have numFilesInPartition files. // and because each task creates one file, we want (numFilesInPartition * numOfPartitions) tasks. @@ -227,7 +222,15 @@ object SparkAvroBtreeUtils { }) - spark.createDataFrame(customPartitionedRDD.map(_._2), dfWithHashes.schema) - .sortWithinPartitions((partitionKeys ++ keys).map(col):_*) + spark.createDataFrame(customPartitionedRDD.map(_._2), dfWithHashes.schema).drop(PARTITION_HASH_COLUMN) + .sortWithinPartitions((partitionKeys ++ keys).map(col): _*) + } + + private def getFolderLocationFromRow(row: Row, partitionKeys: Seq[String]) = + partitionKeys.map(pk => (pk, row.getAs[String](pk))).map(p => p._1 + "=" + p._2).mkString("/") + + private def checkIfFolderExists(folder: String) = { + val fs = new Path(folder).getFileSystem(new Configuration) + fs.exists(new Path(folder)) } } diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala index 7dff275e..01353822 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManager.scala @@ -112,6 +112,9 @@ case class IndexManager(@transient val spark: SparkSession, sparkIndexer: SparkI * @param partitionsSpec the new partitions to index */ def appendNewPartitions(partitionsSpec: Seq[Seq[(String, String)]]): Unit = { + + require(partitionsSpec.nonEmpty, "The partitionsSpec parameter cannot be empty") + val partitionLocations = IndexManagerUtils.getPartitionLocations(dataTableName, partitionsSpec, spark) val fileLikeExp = spark.conf.get("index.manager.file.filter", "%") diff --git a/dione-spark/src/test/scala/com/paypal/dione/SparkTestBase.scala b/dione-spark/src/test/scala/com/paypal/dione/SparkTestBase.scala index 5dcfbfd3..ca180382 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/SparkTestBase.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/SparkTestBase.scala @@ -16,7 +16,8 @@ object SparkTestBase { trait SparkTestBase { lazy val spark: SparkSession = { - val sparkConf = new SparkConf().setMaster(s"local[1]").set("spark.sql.shuffle.partitions", "3").setAppName("Spark Unit Test") + val sparkConf = new SparkConf().setMaster(s"local[1]").set("spark.sql.shuffle.partitions", "3") + .set("spark.driver.bindAddress", "127.0.0.1").setAppName("Spark Unit Test") val builder = SparkSession.builder().config(sparkConf).enableHiveSupport() val spark = builder.getOrCreate() // TODO really needed? diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManager.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManager.scala index ab5e587c..a60536a6 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManager.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroIndexManager.scala @@ -119,7 +119,7 @@ class TestAvroIndexManager { @Test def testKVstoreNoIndex(): Unit = { val kvFolder = baseTestPath + "kv_no_index" - SparkAvroBtreeUtils.writeDFasAvroBtree(spark.table("t3"), Seq("message_id"), kvFolder, + SparkAvroBtreeUtils.writeDFAsAvroBtree(spark.table("t3"), Seq("message_id"), kvFolder, 5, 5, 2)(spark) val kvGetter = AvroHashBtreeStorageFolderReader(kvFolder) diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroSparkIndexer.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroSparkIndexer.scala index 86d41082..fb587fb1 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroSparkIndexer.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/avro/TestAvroSparkIndexer.scala @@ -78,7 +78,7 @@ class TestAvroSparkIndexer { def testSparkStoringIndexInKV(): Unit = { val folderPath = baseTestPath + "spark_index" - SparkAvroBtreeUtils.writeDFasAvroBtree(spark.table("indexed_df"), + SparkAvroBtreeUtils.writeDFAsAvroBtree(spark.table("indexed_df"), Seq("message_id"), folderPath, 3, 10, 2)(spark) val location = new File(folderPath).getAbsoluteFile.toString