Skip to content

Commit 071a825

Browse files
committed
updated loadTable method to improve performance
1 parent 903fa88 commit 071a825

File tree

1 file changed

+12
-11
lines changed

1 file changed

+12
-11
lines changed

flink/src/main/scala/io/ddf/flink/FlinkDDFManager.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,32 +26,33 @@ class FlinkDDFManager extends DDFManager {
2626
override def getNamespace: String = NAMESPACE
2727

2828
override def loadTable(fileURL: String, fieldSeparator: String): DDF = {
29-
val fileData: DataSet[Array[Object]] = flinkExecutionEnvironment
30-
.readTextFile(fileURL)
31-
.map(_.split(fieldSeparator).map(_.asInstanceOf[Object]))
29+
30+
val fileData: DataSet[String] = flinkExecutionEnvironment.readTextFile(fileURL)
3231

3332
// scalastyle:off magic.number
3433
val sampleSize = 5
3534
// scalastyle:on magic.number
3635

37-
val subset = fileData.first(sampleSize).map(_.map(_.toString)).collect()
38-
val columns: Array[Column] = getColumnInfo(subset)
36+
val subset = fileData.first(sampleSize).collect()
37+
val columns: Array[Column] = getColumnInfo(subset,fieldSeparator)
3938

40-
val typeSpecs: Array[Class[_]] = Array(classOf[DataSet[_]], classOf[Row])
39+
val typeSpecs: Array[Class[_]] = Array(classOf[DataSet[_]], classOf[Array[Object]])
4140
val rand: SecureRandom = new SecureRandom
4241
val tableName: String = "tbl" + String.valueOf(Math.abs(rand.nextLong))
4342

4443
val schema: Schema = new Schema(tableName, columns)
45-
val rowDS = RepresentationHandler.getRowDataSet(fileData, columns.toList, useDefaults = false)
44+
val data = fileData.map(_.split(fieldSeparator).map(_.asInstanceOf[Object]))
45+
// val rowDS = RepresentationHandler.getRowDataSet(fileData, columns.toList, useDefaults = false)
4646

47-
val ddf = this.newDDF(rowDS, typeSpecs, getEngine, getNamespace, tableName, schema)
47+
val ddf = this.newDDF(data, typeSpecs, getEngine, getNamespace, tableName, schema)
4848
this.addDDF(ddf)
4949
ddf
5050
}
5151

5252
def getExecutionEnvironment: ExecutionEnvironment = flinkExecutionEnvironment
5353

54-
def getColumnInfo(sampleData: Seq[Array[String]],
54+
def getColumnInfo(sampleData: Seq[String],
55+
fieldSeparator:String,
5556
hasHeader: Boolean = false,
5657
doPreferDouble: Boolean = true): Array[Schema.Column] = {
5758

@@ -63,7 +64,7 @@ class FlinkDDFManager extends DDFManager {
6364
// return null
6465
// }
6566

66-
val firstRow: Array[String] = sampleData.head
67+
val firstRow: Array[String] = sampleData.head.split(fieldSeparator)
6768

6869
val headers: Seq[String] = if (hasHeader) {
6970
firstRow.toSeq
@@ -74,7 +75,7 @@ class FlinkDDFManager extends DDFManager {
7475

7576
val sampleStrings = if (hasHeader) sampleData.tail else sampleData
7677

77-
val samples = sampleStrings.toArray.transpose
78+
val samples = sampleStrings.map(_.split(fieldSeparator)).toArray.transpose
7879

7980
samples.zipWithIndex.map {
8081
case (col, i) => new Schema.Column(headers(i), Utils.determineType(col, doPreferDouble, false))

0 commit comments

Comments
 (0)