diff --git a/dione-hadoop/src/main/scala/com/paypal/dione/hdfs/index/csv/CsvIndexer.scala b/dione-hadoop/src/main/scala/com/paypal/dione/hdfs/index/csv/CsvIndexer.scala new file mode 100644 index 00000000..cda09776 --- /dev/null +++ b/dione-hadoop/src/main/scala/com/paypal/dione/hdfs/index/csv/CsvIndexer.scala @@ -0,0 +1,76 @@ +package com.paypal.dione.hdfs.index.csv + +import com.paypal.dione.hdfs.index.{HdfsIndexer, HdfsIndexerMetadata} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.lib.input.{FileSplit, LineRecordReader} +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.slf4j.LoggerFactory + +/** + * based on org.apache.spark.sql.execution.datasources.HadoopFileLinesReader + */ + +case class CsvIndexer(file: Path, start: Long, end: Long, conf: Configuration, + delimiter: Char) + extends HdfsIndexer[Seq[String]]() { + + var reader: LineRecordReader = _ + private var curPosition = -1L + private var lastPosition = -1L + + override def closeCurrentFile(): Unit = { + reader.close() + } + + /** + * Regular seek. Called once per offset (block). + */ + override def seek(offset: Long): Unit = { + // @TODO - need to better optimize. not to re-init on every seek + initReader(offset, end) + } + + /** + * Skip the next row - can avoid deserialization, etc. + */ + override def skip(): Unit = { + if (reader==null) { + initReader(start, end) + } + reader.nextKeyValue() + } + + /** + * Read the next row + */ + override def readLine(): Seq[String] = { + if (reader==null) { + initReader(start, end) + } + + lastPosition = curPosition + curPosition = Option(reader.getCurrentKey).map(_.get()).getOrElse(0) + + val hasNext = reader.nextKeyValue() + if (hasNext) { + reader.getCurrentValue.toString.split(delimiter).toSeq + } else null + } + + override def getCurMetadata(): HdfsIndexerMetadata = { + val pos = if (curPosition==0) start else curPosition + // some quirk of the first line read + val plusOne = if (lastPosition == curPosition) 1 else 0 + HdfsIndexerMetadata(file.toString, pos, plusOne) + } + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + + private def initReader(start: Long, end: Long): Unit = { + reader = new LineRecordReader() + reader.initialize(new FileSplit(file, start, end - start, Array.empty), hadoopAttemptContext) + } +} diff --git a/dione-hadoop/src/test/resources/csv/file.csv b/dione-hadoop/src/test/resources/csv/file.csv new file mode 100644 index 00000000..2428463e --- /dev/null +++ b/dione-hadoop/src/test/resources/csv/file.csv @@ -0,0 +1,4 @@ +a1,b1,c1,d1,11,12,13 +a2,b2,c2,d2,21,22,23 +a3,b3,c3,d3,31,32,33 +a4,b4,c4,d4,41,42,43 \ No newline at end of file diff --git a/dione-hadoop/src/test/scala/com/paypal/dione/hdfs/index/avro/TestAvroIndexer.scala b/dione-hadoop/src/test/scala/com/paypal/dione/hdfs/index/avro/TestAvroIndexer.scala index 7d6879b1..5930266f 100644 --- a/dione-hadoop/src/test/scala/com/paypal/dione/hdfs/index/avro/TestAvroIndexer.scala +++ b/dione-hadoop/src/test/scala/com/paypal/dione/hdfs/index/avro/TestAvroIndexer.scala @@ -18,6 +18,8 @@ object TestAvroIndexer extends AvroExtensions { val avroFile = new Path("TestData/hdfs_indexer/avro_file") val fileSystem = avroFile.getFileSystem(new Configuration()) + var entries: Seq[(GenericRecord, HdfsIndexerMetadata)] = _ + @BeforeAll def dataPrep(): Unit = { @@ -53,7 +55,7 @@ class TestAvroIndexer extends AvroExtensions { @Test @Order(1) def testSimpleCreateIndex(): Unit = { - val entries = AvroIndexer(avroFile, 0, 1<<30, fileSystem.getConf).iteratorWithMetadata.toList + entries = AvroIndexer(avroFile, 0, 1<<30, fileSystem.getConf).iteratorWithMetadata.toList val schema = SchemaBuilder.record("single_string").fields().requiredString("val1") .requiredInt("val2").requiredString("val3").requiredInt("val4").endRecord() @@ -102,9 +104,10 @@ class TestAvroIndexer extends AvroExtensions { Assertions.assertEquals("8", gr.get("val3").asInstanceOf[Utf8].toString) } - (6 to 14).foreach(i => { - val gr = avroIndexer.fetch(HdfsIndexerMetadata(avroFile.toString, 393, i)) - Assertions.assertEquals(((21 + i) * 2).toString, gr.get("val3").asInstanceOf[Utf8].toString) + entries.foreach(e => { + println("fetching: " + e._2) + val sq = avroIndexer.fetch(e._2) + Assertions.assertEquals(e._1, sq) }) } } \ No newline at end of file diff --git a/dione-hadoop/src/test/scala/com/paypal/dione/hdfs/index/csv/TestCsvIndexer.scala b/dione-hadoop/src/test/scala/com/paypal/dione/hdfs/index/csv/TestCsvIndexer.scala new file mode 100644 index 00000000..8b83e01a --- /dev/null +++ b/dione-hadoop/src/test/scala/com/paypal/dione/hdfs/index/csv/TestCsvIndexer.scala @@ -0,0 +1,95 @@ +package com.paypal.dione.hdfs.index.csv + +import com.paypal.dione.hdfs.index.HdfsIndexerMetadata +import com.paypal.dione.hdfs.index.csv.TestCsvIndexer.{entries, splitEntries} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.junit.jupiter.api.MethodOrderer.OrderAnnotation +import org.junit.jupiter.api._ + +object TestCsvIndexer { + var entries: Seq[(Seq[String], HdfsIndexerMetadata)] = _ + var splitEntries: Seq[(Seq[String], HdfsIndexerMetadata)] = _ +} + +@TestMethodOrder(classOf[OrderAnnotation]) +class TestCsvIndexer { + + val bizlogFile = new Path("src/test/resources/csv/file.csv") + val fileSystem = bizlogFile.getFileSystem(new Configuration()) + + @Test + @Order(1) + def testSimpleCreateIndex(): Unit = { + entries = CsvIndexer(bizlogFile, 0, 1<<30, fileSystem.getConf, ',').iteratorWithMetadata.toList + + Assertions.assertEquals(4, entries.size) + + entries.take(10).foreach(println) + + Assertions.assertEquals(entries.head._2, HdfsIndexerMetadata(bizlogFile.toString, 0, 0, -1)) + Assertions.assertEquals(entries.head._1.toList, List("a1", "b1", "c1", "d1", "11", "12", "13")) + Assertions.assertEquals(entries(2)._2, HdfsIndexerMetadata(bizlogFile.toString, 21, 0, -1)) + } + + @Test + @Order(1) + def testCreateIndexFromSplit(): Unit = { + splitEntries = CsvIndexer(bizlogFile, 30, 1000, fileSystem.getConf, ',').iteratorWithMetadata.toList + + Assertions.assertEquals(2, splitEntries.size) + + splitEntries.take(10).foreach(println) + + Assertions.assertEquals(splitEntries.head._2, HdfsIndexerMetadata(bizlogFile.toString, 30, 0, -1)) + Assertions.assertEquals(splitEntries.head._1.toList, List("a3", "b3", "c3", "d3", "31", "32", "33")) + Assertions.assertEquals(splitEntries.tail.head._2, HdfsIndexerMetadata(bizlogFile.toString, 42, 0, -1)) + } + + @Order(2) + @Test + def testSimpleFetch(): Unit = { + + val csvIndexer = CsvIndexer(bizlogFile, 0, 1 << 30, fileSystem.getConf, ',') + + { + val sq = csvIndexer.fetch(HdfsIndexerMetadata(bizlogFile.toString, 0, 0)) + Assertions.assertEquals("c1", sq(2)) + Assertions.assertEquals("d1", sq(3)) + } + + { + val sq = csvIndexer.fetch(HdfsIndexerMetadata(bizlogFile.toString, 21, 0)) + Assertions.assertEquals("c3", sq(2)) + Assertions.assertEquals("d3", sq(3)) + } + + { + val sq = csvIndexer.fetch(HdfsIndexerMetadata(bizlogFile.toString, 42, 0)) + Assertions.assertEquals("b4", sq(1)) + Assertions.assertEquals("41", sq(4)) + } + } + + @Order(2) + @Test + def testAllEntiresFetch(): Unit = { + testEntriesFetch(entries) + } + + @Order(2) + @Test + def testSplitEntiresFetch(): Unit = { + testEntriesFetch(splitEntries) + } + + def testEntriesFetch(entries: Seq[(Seq[String], HdfsIndexerMetadata)]): Unit = { + val csvIndexer = CsvIndexer(bizlogFile, 0, 1 << 30, fileSystem.getConf, ',') + entries.foreach(e => { + println("fetching: " + e._2) + val sq = csvIndexer.fetch(e._2) + Assertions.assertEquals(e._1, sq) + }) + } + +} \ No newline at end of file diff --git a/dione-hadoop/src/test/scala/com/paypal/dione/hdfs/index/parquet/TestParquetIndexer.scala b/dione-hadoop/src/test/scala/com/paypal/dione/hdfs/index/parquet/TestParquetIndexer.scala index cb6d5208..6746e161 100644 --- a/dione-hadoop/src/test/scala/com/paypal/dione/hdfs/index/parquet/TestParquetIndexer.scala +++ b/dione-hadoop/src/test/scala/com/paypal/dione/hdfs/index/parquet/TestParquetIndexer.scala @@ -2,7 +2,7 @@ package com.paypal.dione.hdfs.index.parquet import com.paypal.dione.avro.utils.AvroExtensions import com.paypal.dione.hdfs.index.HdfsIndexerMetadata -import com.paypal.dione.hdfs.index.parquet.TestParquetIndexer.{fileSystem, parquetFile} +import com.paypal.dione.hdfs.index.parquet.TestParquetIndexer.{entries, fileSystem, parquetFile} import org.apache.avro.SchemaBuilder import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.hadoop.conf.Configuration @@ -15,6 +15,8 @@ object TestParquetIndexer extends AvroExtensions { val parquetFile = new Path("TestData/hdfs_indexer/parquet_file") private val fileSystem = parquetFile.getFileSystem(new Configuration()) + var entries: Seq[(GenericRecord, HdfsIndexerMetadata)] = _ + @BeforeAll def dataPrep(): Unit = { import org.apache.parquet.avro.AvroParquetWriter @@ -53,12 +55,12 @@ class TestParquetIndexer { @Test @Order(1) def testCreateIndex: Unit = { - val parquetIndexList = ParquetIndexer(TestParquetIndexer.parquetFile, 0, 0, fileSystem.getConf, Some(projectedAvroSchema)) + entries = ParquetIndexer(TestParquetIndexer.parquetFile, 0, 0, fileSystem.getConf, Some(projectedAvroSchema)) .iteratorWithMetadata.toList - //println(parquetIndexList) - Assertions.assertEquals(10, parquetIndexList.size) - Assertions.assertEquals((0,0), (parquetIndexList.head._2.position, parquetIndexList.head._2.numInBlock)) + entries.take(10).foreach(println) + Assertions.assertEquals(10, entries.size) + Assertions.assertEquals((0,0), (entries.head._2.position, entries.head._2.numInBlock)) } @Order(2) @@ -66,7 +68,14 @@ class TestParquetIndexer { def testSimpleFetch(): Unit = { val parquetIndexer = new ParquetIndexer(parquetFile, fileSystem.getConf) Assertions.assertEquals("{\"val1\": \"5\", \"val2\": 5, \"val3\": \"10\", \"val4\": 10}", - parquetIndexer.fetch(HdfsIndexerMetadata(parquetFile.toString, 0,4,0)).toString) + parquetIndexer.fetch(HdfsIndexerMetadata(parquetFile.toString, 0, 4)).toString) + + entries.foreach(e => { + println("fetching: " + e._2) + val parquetIndexer = new ParquetIndexer(parquetFile, fileSystem.getConf) + val sq = parquetIndexer.fetch(e._2) + Assertions.assertEquals(e._1.get("val2"), sq.get("val2")) + }) } @Order(3) diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManagerUtils.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManagerUtils.scala index 43213d56..e7044390 100644 --- a/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManagerUtils.scala +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/index/IndexManagerUtils.scala @@ -1,10 +1,10 @@ package com.paypal.dione.spark.index import java.util.UUID - import com.paypal.dione.hdfs.index.HdfsIndexContants._ import com.paypal.dione.spark.index.IndexManager.PARTITION_DEF_COLUMN import com.paypal.dione.spark.index.avro.AvroSparkIndexer +import com.paypal.dione.spark.index.csv.CsvSparkIndexer import com.paypal.dione.spark.index.orc.OrcSparkIndexer import com.paypal.dione.spark.index.parquet.ParquetSparkIndexer import com.paypal.dione.spark.index.sequence.SeqFileSparkIndexer @@ -245,7 +245,7 @@ object IndexManagerUtils { (format, serde) } - val sparkIndexer = Seq(SeqFileSparkIndexer, AvroSparkIndexer, ParquetSparkIndexer, OrcSparkIndexer) + val sparkIndexer = Seq(SeqFileSparkIndexer, AvroSparkIndexer, ParquetSparkIndexer, CsvSparkIndexer, OrcSparkIndexer) .find(_.canResolve(inputFormat, serde)) .getOrElse(throw new RuntimeException("could not find indexer for data type: " + storage)) .createSparkIndexer(spark, indexSpec) diff --git a/dione-spark/src/main/scala/com/paypal/dione/spark/index/csv/CsvSparkIndexer.scala b/dione-spark/src/main/scala/com/paypal/dione/spark/index/csv/CsvSparkIndexer.scala new file mode 100644 index 00000000..e9c4ad01 --- /dev/null +++ b/dione-spark/src/main/scala/com/paypal/dione/spark/index/csv/CsvSparkIndexer.scala @@ -0,0 +1,65 @@ +package com.paypal.dione.spark.index.csv + +import com.paypal.dione.hdfs.index.HdfsIndexer +import com.paypal.dione.hdfs.index.csv.CsvIndexer +import com.paypal.dione.spark.index._ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.serde2.`lazy`.{LazySerDeParameters, LazyUtils} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types._ + +object CsvSparkIndexer extends IndexManagerFactory { + + override def canResolve(inputFormat: String, serde: String): Boolean = + inputFormat.contains("org.apache.hadoop.mapred.TextInputFormat") + + override def createSparkIndexer(spark: SparkSession, indexSpec: IndexSpec): SparkIndexer = + CsvSparkIndexer(spark, indexSpec.dataTableName) +} + +case class CsvSparkIndexer(@transient spark: SparkSession, dataTableName: String) + extends SparkIndexer { + + override type T = Seq[String] + + private val sparkCatalogTable = IndexManagerUtils.getSparkCatalogTable(spark, dataTableName) + private val schemaWithoutPartitionCols = spark.table(dataTableName).drop(sparkCatalogTable.partitionColumnNames: _*).schema + private var requestedFieldsSchema: Seq[(Int, StructField)] = _ + + def initHdfsIndexer(file: Path, conf: Configuration, start: Long, end: Long, fieldsSchema: StructType): HdfsIndexer[Seq[String]] = { + if (requestedFieldsSchema == null) { + val fieldsMap = schemaWithoutPartitionCols.map(_.name).zipWithIndex.toMap + requestedFieldsSchema = fieldsSchema.map(f => fieldsMap(f.name) -> f) + } + + // logic from org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.collectSeparators() + val tableProperties = sparkCatalogTable.storage.properties + val delimiter = LazyUtils.getByte(tableProperties.getOrElse("field.delim", + tableProperties.getOrElse("serialization.format", null)), LazySerDeParameters.DefaultSeparators(0)) + CsvIndexer(file, start, end, conf, delimiter.toChar) + } + + def convert(t: Seq[String]): Seq[Any] = { + val l = t.toList + requestedFieldsSchema.map(rf => convertAny(rf._2.dataType, l(rf._1))) + } + + def convertMap(t: Seq[String]): Map[String, Any] = { + val l = t.toList + requestedFieldsSchema.map(rf => rf._2.name -> convertAny(rf._2.dataType, l(rf._1))).toMap + } + + private def convertAny(dataType: DataType, strVal: String): Any = + dataType match { + case _: StringType => strVal + case _: LongType => strVal.toLong + case _: IntegerType => strVal.toInt + case _: BooleanType => strVal.toBoolean + case _: FloatType => strVal.toFloat + case _: DoubleType => strVal.toDouble + case _: CharType => strVal.charAt(0) + case other => throw new RuntimeException("type " + other + " is currently not support for CsvSparkIndexer") + } + +} diff --git a/dione-spark/src/test/scala/com/paypal/dione/spark/index/csv/TestCsvIndexManagerBase.scala b/dione-spark/src/test/scala/com/paypal/dione/spark/index/csv/TestCsvIndexManagerBase.scala new file mode 100644 index 00000000..bc710c38 --- /dev/null +++ b/dione-spark/src/test/scala/com/paypal/dione/spark/index/csv/TestCsvIndexManagerBase.scala @@ -0,0 +1,20 @@ +package com.paypal.dione.spark.index.csv + +import com.paypal.dione.spark.index.{IndexSpec, TestIndexManagerBase} +import org.junit.jupiter.api.MethodOrderer.OrderAnnotation +import org.junit.jupiter.api._ + + +@TestMethodOrder(classOf[OrderAnnotation]) +class TestCsvIndexManagerBase extends TestIndexManagerBase { + + lazy val indexSpec: IndexSpec = IndexSpec("csv_data_tbl", "csv_data_tbl_idx", Seq("id_col")) + + def initDataTable(fieldsSchema: String, partitionFieldSchema: String): Unit = { + spark.sql(s"create table ${indexSpec.dataTableName} ($fieldsSchema) partitioned by ($partitionFieldSchema)" + + s" row format delimited" + + s" fields terminated by '16'") + } + + val testSamples = Seq(SampleTest("msg_100", Nil, "var_a_100", 607, 0, -1)) +} \ No newline at end of file