Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.paypal.dione.hdfs.index.csv

import com.paypal.dione.hdfs.index.{HdfsIndexer, HdfsIndexerMetadata}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, LineRecordReader}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.slf4j.LoggerFactory

/**
* based on org.apache.spark.sql.execution.datasources.HadoopFileLinesReader
*/

case class CsvIndexer(file: Path, start: Long, end: Long, conf: Configuration,
delimiter: Char)
extends HdfsIndexer[Seq[String]]() {

var reader: LineRecordReader = _
private var curPosition = -1L
private var lastPosition = -1L

override def closeCurrentFile(): Unit = {
reader.close()
}

/**
* Regular seek. Called once per offset (block).
*/
override def seek(offset: Long): Unit = {
// @TODO - need to better optimize. not to re-init on every seek
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess this is only for random fetches, right? The performance will be impractical for batch...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but than again, why not just save the reader open? am I missing anything?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently, the requested use-case is only fetches. but sure I would like to solve for both. started just with something that works.
In CSV we have only offset, sub-offset is almost only zero.
let's say we want to read every other row, then we need to understand when it will be better just to skip a row compared to when will we want to "re-init" from a new offset.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I got it... In the parquet implementation we have basically the same thing, right? I mean just one offset, when the row group is 128MB or more

initReader(offset, end)
}

/**
* Skip the next row - can avoid deserialization, etc.
*/
override def skip(): Unit = {
if (reader==null) {
initReader(start, end)
}
reader.nextKeyValue()
}

/**
* Read the next row
*/
override def readLine(): Seq[String] = {
if (reader==null) {
initReader(start, end)
}

lastPosition = curPosition
curPosition = Option(reader.getCurrentKey).map(_.get()).getOrElse(0)

val hasNext = reader.nextKeyValue()
if (hasNext) {
reader.getCurrentValue.toString.split(delimiter).toSeq
} else null
}

override def getCurMetadata(): HdfsIndexerMetadata = {
// some quirk of the first line read
val plusOne = if (lastPosition == curPosition) 1 else 0
val pos = if (curPosition==0) start else curPosition
HdfsIndexerMetadata(file.toString, pos, plusOne)
}

val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)

private def initReader(start: Long, end: Long) = {
reader = new LineRecordReader()
reader.initialize(new FileSplit(file, start, end - start, Array.empty), hadoopAttemptContext)
}
}
4 changes: 4 additions & 0 deletions dione-hadoop/src/test/resources/csv/file.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
a1,b1,c1,d1,11,12,13
a2,b2,c2,d2,21,22,23
a3,b3,c3,d3,31,32,33
a4,b4,c4,d4,41,42,43
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ object TestAvroIndexer extends AvroExtensions {
val avroFile = new Path("TestData/hdfs_indexer/avro_file")
val fileSystem = avroFile.getFileSystem(new Configuration())

var entries: Seq[(GenericRecord, HdfsIndexerMetadata)] = _

@BeforeAll
def dataPrep(): Unit = {

Expand Down Expand Up @@ -53,7 +55,7 @@ class TestAvroIndexer extends AvroExtensions {
@Test
@Order(1)
def testSimpleCreateIndex(): Unit = {
val entries = AvroIndexer(avroFile, 0, 1<<30, fileSystem.getConf).iteratorWithMetadata.toList
entries = AvroIndexer(avroFile, 0, 1<<30, fileSystem.getConf).iteratorWithMetadata.toList

val schema = SchemaBuilder.record("single_string").fields().requiredString("val1")
.requiredInt("val2").requiredString("val3").requiredInt("val4").endRecord()
Expand Down Expand Up @@ -102,9 +104,10 @@ class TestAvroIndexer extends AvroExtensions {
Assertions.assertEquals("8", gr.get("val3").asInstanceOf[Utf8].toString)
}

(6 to 14).foreach(i => {
val gr = avroIndexer.fetch(HdfsIndexerMetadata(avroFile.toString, 393, i))
Assertions.assertEquals(((21 + i) * 2).toString, gr.get("val3").asInstanceOf[Utf8].toString)
entries.foreach(e => {
println("fetching: " + e._2)
val sq = avroIndexer.fetch(e._2)
Assertions.assertEquals(e._1, sq)
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.paypal.dione.hdfs.index.csv

import com.paypal.dione.hdfs.index.HdfsIndexerMetadata
import com.paypal.dione.hdfs.index.csv.TestCsvIndexer.{entries, splitEntries}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation
import org.junit.jupiter.api._

object TestCsvIndexer {
var entries: Seq[(Seq[String], HdfsIndexerMetadata)] = _
var splitEntries: Seq[(Seq[String], HdfsIndexerMetadata)] = _
}

@TestMethodOrder(classOf[OrderAnnotation])
class TestCsvIndexer {

val bizlogFile = new Path("src/test/resources/csv/file.csv")
val fileSystem = bizlogFile.getFileSystem(new Configuration())

@Test
@Order(1)
def testSimpleCreateIndex(): Unit = {
entries = CsvIndexer(bizlogFile, 0, 1<<30, fileSystem.getConf, ',').iteratorWithMetadata.toList

Assertions.assertEquals(4, entries.size)

entries.take(10).foreach(println)

Assertions.assertEquals(entries.head._2, HdfsIndexerMetadata(bizlogFile.toString, 0, 0, -1))
Assertions.assertEquals(entries.head._1.toList, List("a1", "b1", "c1", "d1", "11", "12", "13"))
Assertions.assertEquals(entries(2)._2, HdfsIndexerMetadata(bizlogFile.toString, 21, 0, -1))
}

@Test
@Order(1)
def testCreateIndexFromSplit(): Unit = {
splitEntries = CsvIndexer(bizlogFile, 30, 1000, fileSystem.getConf, ',').iteratorWithMetadata.toList

Assertions.assertEquals(2, splitEntries.size)

splitEntries.take(10).foreach(println)

Assertions.assertEquals(splitEntries.head._2, HdfsIndexerMetadata(bizlogFile.toString, 30, 0, -1))
Assertions.assertEquals(splitEntries.head._1.toList, List("a3", "b3", "c3", "d3", "31", "32", "33"))
Assertions.assertEquals(splitEntries.tail.head._2, HdfsIndexerMetadata(bizlogFile.toString, 42, 0, -1))
}

@Order(2)
@Test
def testSimpleFetch(): Unit = {

val csvIndexer = CsvIndexer(bizlogFile, 0, 1 << 30, fileSystem.getConf, ',')

{
val sq = csvIndexer.fetch(HdfsIndexerMetadata(bizlogFile.toString, 0, 0))
Assertions.assertEquals("c1", sq(2))
Assertions.assertEquals("d1", sq(3))
}

{
val sq = csvIndexer.fetch(HdfsIndexerMetadata(bizlogFile.toString, 21, 0))
Assertions.assertEquals("c3", sq(2))
Assertions.assertEquals("d3", sq(3))
}

{
val sq = csvIndexer.fetch(HdfsIndexerMetadata(bizlogFile.toString, 42, 0))
Assertions.assertEquals("b4", sq(1))
Assertions.assertEquals("41", sq(4))
}
}

@Order(2)
@Test
def testAllEntiresFetch(): Unit = {
testEntriesFetch(entries)
}

@Order(2)
@Test
def testSplitEntiresFetch(): Unit = {
testEntriesFetch(splitEntries)
}

def testEntriesFetch(entries: Seq[(Seq[String], HdfsIndexerMetadata)]): Unit = {
val csvIndexer = CsvIndexer(bizlogFile, 0, 1 << 30, fileSystem.getConf, ',')
entries.foreach(e => {
println("fetching: " + e._2)
val sq = csvIndexer.fetch(e._2)
Assertions.assertEquals(e._1, sq)
})
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.paypal.dione.hdfs.index.parquet

import com.paypal.dione.avro.utils.AvroExtensions
import com.paypal.dione.hdfs.index.HdfsIndexerMetadata
import com.paypal.dione.hdfs.index.parquet.TestParquetIndexer.{fileSystem, parquetFile}
import com.paypal.dione.hdfs.index.parquet.TestParquetIndexer.{entries, fileSystem, parquetFile}
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.hadoop.conf.Configuration
Expand All @@ -15,6 +15,8 @@ object TestParquetIndexer extends AvroExtensions {
val parquetFile = new Path("TestData/hdfs_indexer/parquet_file")
private val fileSystem = parquetFile.getFileSystem(new Configuration())

var entries: Seq[(GenericRecord, HdfsIndexerMetadata)] = _

@BeforeAll
def dataPrep(): Unit = {
import org.apache.parquet.avro.AvroParquetWriter
Expand Down Expand Up @@ -53,20 +55,27 @@ class TestParquetIndexer {
@Test
@Order(1)
def testCreateIndex: Unit = {
val parquetIndexList = ParquetIndexer(TestParquetIndexer.parquetFile, 0, 0, fileSystem.getConf, Some(projectedAvroSchema))
entries = ParquetIndexer(TestParquetIndexer.parquetFile, 0, 0, fileSystem.getConf, Some(projectedAvroSchema))
.iteratorWithMetadata.toList

//println(parquetIndexList)
Assertions.assertEquals(10, parquetIndexList.size)
Assertions.assertEquals((0,0), (parquetIndexList.head._2.position, parquetIndexList.head._2.numInBlock))
entries.take(10).foreach(println)
Assertions.assertEquals(10, entries.size)
Assertions.assertEquals((0,0), (entries.head._2.position, entries.head._2.numInBlock))
}

@Order(2)
@Test
def testSimpleFetch(): Unit = {
val parquetIndexer = new ParquetIndexer(parquetFile, fileSystem.getConf)
Assertions.assertEquals("{\"val1\": \"5\", \"val2\": 5, \"val3\": \"10\", \"val4\": 10}",
parquetIndexer.fetch(HdfsIndexerMetadata(parquetFile.toString, 0,4,0)).toString)
parquetIndexer.fetch(HdfsIndexerMetadata(parquetFile.toString, 0, 4)).toString)

entries.foreach(e => {
println("fetching: " + e._2)
val parquetIndexer = new ParquetIndexer(parquetFile, fileSystem.getConf)
val sq = parquetIndexer.fetch(e._2)
Assertions.assertEquals(e._1.get("val2"), sq.get("val2"))
})
}

@Order(3)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.paypal.dione.spark.index

import java.util.UUID

import com.paypal.dione.hdfs.index.HdfsIndexContants._
import com.paypal.dione.spark.index.IndexManager.PARTITION_DEF_COLUMN
import com.paypal.dione.spark.index.avro.AvroSparkIndexer
import com.paypal.dione.spark.index.csv.CsvSparkIndexer
import com.paypal.dione.spark.index.parquet.ParquetSparkIndexer
import com.paypal.dione.spark.index.sequence.SeqFileSparkIndexer
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -240,7 +240,7 @@ object IndexManagerUtils {
(format, serde)
}

val sparkIndexer = Seq(SeqFileSparkIndexer, AvroSparkIndexer, ParquetSparkIndexer)
val sparkIndexer = Seq(SeqFileSparkIndexer, AvroSparkIndexer, ParquetSparkIndexer, CsvSparkIndexer)
.find(_.canResolve(inputFormat, serde))
.getOrElse(throw new RuntimeException("could not find indexer for data type: " + storage))
.createSparkIndexer(spark, indexSpec)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.paypal.dione.spark.index.csv

import com.paypal.dione.hdfs.index.HdfsIndexer
import com.paypal.dione.hdfs.index.csv.CsvIndexer
import com.paypal.dione.spark.index._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.serde2.`lazy`.{LazySerDeParameters, LazyUtils}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

object CsvSparkIndexer extends IndexManagerFactory {

override def canResolve(inputFormat: String, serde: String): Boolean =
inputFormat.contains("org.apache.hadoop.mapred.TextInputFormat")

override def createSparkIndexer(spark: SparkSession, indexSpec: IndexSpec): SparkIndexer =
CsvSparkIndexer(spark, indexSpec.dataTableName)
}

case class CsvSparkIndexer(@transient spark: SparkSession, dataTableName: String)
extends SparkIndexer {

override type T = Seq[String]

private val sparkCatalogTable = IndexManagerUtils.getSparkCatalogTable(spark, dataTableName)
private val schemaWithoutPartitionCols = spark.table(dataTableName).drop(sparkCatalogTable.partitionColumnNames: _*).schema
private var requestedFieldsSchema: Seq[(Int, StructField)] = _

def initHdfsIndexer(file: Path, conf: Configuration, start: Long, end: Long, fieldsSchema: StructType): HdfsIndexer[Seq[String]] = {
if (requestedFieldsSchema == null) {
val fieldsMap = schemaWithoutPartitionCols.map(_.name).zipWithIndex.toMap
requestedFieldsSchema = fieldsSchema.map(f => fieldsMap(f.name) -> f)
}

// logic from org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.collectSeparators()
val tableProperties = sparkCatalogTable.storage.properties
val delimiter = LazyUtils.getByte(tableProperties.getOrElse("field.delim",
tableProperties.getOrElse("serialization.format", null)), LazySerDeParameters.DefaultSeparators(0))
CsvIndexer(file, start, end, conf, delimiter.toChar)
}

def convert(t: Seq[String]): Seq[Any] = {
val l = t.toList
requestedFieldsSchema.map(rf => convertAny(rf._2.dataType, l(rf._1)))
}

def convertMap(t: Seq[String]): Map[String, Any] = {
val l = t.toList
requestedFieldsSchema.map(rf => rf._2.name -> convertAny(rf._2.dataType, l(rf._1))).toMap
}

private def convertAny(dataType: DataType, strVal: String): Any =
dataType match {
case _: StringType => strVal
case _: LongType => strVal.toLong
case _: IntegerType => strVal.toInt
case _: BooleanType => strVal.toBoolean
case _: FloatType => strVal.toFloat
case _: DoubleType => strVal.toDouble
case _: CharType => strVal.charAt(0)
case other => throw new RuntimeException("type " + other + " is currently not support for CsvSparkIndexer")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.paypal.dione.spark.index.csv

import com.paypal.dione.spark.index.{IndexSpec, TestIndexManagerBase}
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation
import org.junit.jupiter.api._


@TestMethodOrder(classOf[OrderAnnotation])
class TestCsvIndexManagerBase extends TestIndexManagerBase {

lazy val indexSpec: IndexSpec = IndexSpec("csv_data_tbl", "csv_data_tbl_idx", Seq("id_col"))

def initDataTable(fieldsSchema: String, partitionFieldSchema: String): Unit = {
spark.sql(s"create table ${indexSpec.dataTableName} ($fieldsSchema) partitioned by ($partitionFieldSchema)" +
s" row format delimited" +
s" fields terminated by '16'")
}

val testSamples = Seq(SampleTest("msg_100", Nil, "var_a_100", 607, 0, -1))
}