Skip to content

Commit 3b57400

Browse files
committed
Merge pull request ddf-project#57 from Shiti/master
performance improvement for FiveNumSummary calculation
2 parents 071a825 + aaa5b13 commit 3b57400

File tree

2 files changed

+47
-29
lines changed

2 files changed

+47
-29
lines changed

flink/src/main/scala/io/ddf/flink/analytics/StatisticsHandler.scala

+46-28
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,33 @@ class StatisticsHandler(ddf: DDF) extends AStatisticsSupporter(ddf) {
6565
}
6666

6767
override def getFiveNumSummary(columnNames: util.List[String]): Array[FiveNumSummary] = {
68-
val percentiles: Array[java.lang.Double] = Array(0.0001, 0.25, 0.5001, 0.75, .9999)
69-
columnNames.map { columnName =>
70-
val quantiles = getVectorQuantiles(columnName, percentiles)
71-
// scalastyle:off magic.number
72-
new FiveNumSummary(quantiles(0), quantiles(1), quantiles(2), quantiles(3), quantiles(4))
73-
// scalastyle:on magic.number
74-
}.toArray
68+
val percentiles: Array[java.lang.Double] = Array(0.00001, 0.99999, 0.25, 0.5, 0.75)
69+
val data = ddf.getRepresentationHandler.get(DATASET_ARR_OBJ_TYPE_SPECS: _*).asInstanceOf[DataSet[Array[Object]]]
70+
val tDigestDataset: DataSet[Array[TDigest]] = data.mapPartition {
71+
rows =>
72+
rows.map {
73+
row =>
74+
row.map { colValue =>
75+
val tDigest = new TDigest(100)
76+
if (!isNull(colValue)) {
77+
tDigest.add(colValue.toString.toDouble)
78+
}
79+
tDigest
80+
}
81+
}
82+
}.reduce {
83+
(td1, td2) => (td1, td2).zipped.map {
84+
(x, y) =>
85+
x.add(y)
86+
x
87+
}
88+
}
89+
val tdigests = tDigestDataset.first(1).collect().head
90+
tdigests.map {
91+
td =>
92+
val quantiles = percentiles.map(p => td.quantile(p))
93+
new FiveNumSummary(quantiles(0), quantiles(1), quantiles(2), quantiles(3), quantiles(4))
94+
}
7595
}
7696

7797
override def getVectorVariance(columnName: String): Array[lang.Double] = {
@@ -107,29 +127,25 @@ class StatisticsHandler(ddf: DDF) extends AStatisticsSupporter(ddf) {
107127
}
108128

109129
private def getTDigest(columnName: String): TDigest = {
110-
val maybeDataSet: Option[DataSet[Double]] = getDoubleColumn(columnName)
111-
112-
maybeDataSet.map {
113-
ds =>
114-
val digests = ds.map {
115-
x =>
116-
val rs = new TDigest(100)
117-
rs.add(x)
118-
rs
119-
}.reduce {
120-
(x, y) => x.add(y)
121-
x
122-
}
123-
val reducedList: util.List[TDigest] = digests.collect()
124-
val finalDigest = reducedList.reduce {
125-
(x, y) =>
126-
x.add(y)
127-
x
130+
val table = ddf.getRepresentationHandler.get(DATASET_ROW_TYPE_SPECS: _*).asInstanceOf[DataSet[Row]]
131+
val columnData: DataSet[Row] = table.select(columnName).where(s"$columnName.isNotNull")
132+
val result = columnData.map {
133+
row =>
134+
val columnValue = row.productElement(0)
135+
val rs = new TDigest(100)
136+
if(!isNull(columnValue)){
137+
rs.add(columnValue.toString.toDouble)
128138
}
129-
finalDigest
130-
}.orNull
139+
rs
140+
}.reduce {
141+
(x, y) =>
142+
x.add(y)
143+
x
144+
}.collect().head
145+
result
131146
}
132147

148+
133149
override def getVectorQuantiles(columnName: String, percentiles: Array[lang.Double]): Array[lang.Double] = {
134150
val column = ddf.getColumn(columnName)
135151
if (ColumnType.isNumeric(column.getType)) {
@@ -180,7 +196,9 @@ class StatisticsHandler(ddf: DDF) extends AStatisticsSupporter(ddf) {
180196

181197
private def getMinAndMaxValue(data: DataSet[Array[Object]], columnIndex: Int): (Double, Double) = {
182198
val defaultValue = (Double.NaN, Double.NaN)
183-
val notNullValues: DataSet[Array[Object]] = data.filter { d => !isNull(d(columnIndex)) }
199+
val notNullValues: DataSet[Array[Object]] = data.filter {
200+
d => !isNull(d(columnIndex))
201+
}
184202
val tupleDataset = notNullValues.map {
185203
d =>
186204
val numericValue = d(columnIndex).toString.toDouble

flink/src/test/scala/io/ddf/flink/etl/MissingDataHandlerSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class MissingDataHandlerSpec extends BaseSpec {
7575
val dict: Map[String, String] = Map("V1" -> "2000", "V28" -> "0", "V29" -> "1")
7676
val filledDDF = ddf1.getMissingDataHandler.fillNA(null, null, 0, null, dict, null)
7777
val annualDelay = filledDDF.aggregate("V1, sum(V29)").get("2008")(0)
78-
annualDelay should be(302.0 +- 0.1)
78+
annualDelay should be(282.0 +- 0.1)
7979
}
8080

8181
it should "fill by aggregate function" in {

0 commit comments

Comments
 (0)