diff --git a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.scala b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.scala new file mode 100644 index 0000000000..ff61cf90d2 --- /dev/null +++ b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.h2o.sparkling.ml.metrics + +import hex.ModelMetricsBinomial.IndependentMetricBuilderBinomial +import hex.genmodel.utils.DistributionFamily +import org.apache.spark.sql.DataFrame + +object H2OBinomialMetrics extends MetricCalculation { + + def calculate( + dataFrame: DataFrame, + domain: Array[String], + predictionProbabilitiesCol: String = "detailed_prediction.probabilities", + labelCol: String = "label", + weightColOption: Option[String] = None, + offsetColOption: Option[String] = None, + distributionFamily: String = "AUTO"): H2OBinomialMetrics = { + val domainFamilyEnum = DistributionFamily.valueOf(distributionFamily) + val getMetricBuilder = () => new IndependentMetricBuilderBinomial[_](domain, domainFamilyEnum) + + val gson = getMetricGson( + getMetricBuilder, + dataFrame, + predictionProbabilitiesCol, + labelCol, + offsetColOption, + weightColOption, + domain) + val result = new H2OBinomialMetrics() + result.setMetrics(gson, "H2OBinomialMetrics.calculate") + result + } + + def calculate( + dataFrame: DataFrame, + domain: Array[String], + predictionProbabilitiesCol: String, + labelCol: String, + weightCol: String, + offsetCol: String, + distributionFamily: String): Unit = { + calculate( + dataFrame, + domain, + predictionProbabilitiesCol, + labelCol, + Option(weightCol), + Option(offsetCol), + distributionFamily) + } +} diff --git a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.scala b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.scala new file mode 100644 index 0000000000..0d7da7d268 --- /dev/null +++ b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.h2o.sparkling.ml.metrics + +import ai.h2o.sparkling.ml.metrics.H2OBinomialMetrics.getMetricGson +import hex.ModelMetricsMultinomial.IndependentMetricBuilderMultinomial +import hex.MultinomialAucType +import org.apache.spark.sql.DataFrame + +object H2OMultinomialMetrics { + def calculate( + dataFrame: DataFrame, + domain: Array[String], + predictionProbabilitiesCol: String = "detailed_prediction.probabilities", + labelCol: String = "label", + weightColOption: Option[String] = None, + offsetColOption: Option[String] = None, + priorDistributionOption: Option[Array[Double]] = None, + aucType: String = "AUTO"): H2OMultinomialMetrics = { + + val aucTypeEnum = MultinomialAucType.valueOf(aucType) + val nclasses = domain.length + val priorDistribution = priorDistributionOption match { + case Some(x) => x + case None => null + } + val getMetricBuilder = + () => new IndependentMetricBuilderMultinomial[_](nclasses, domain, aucTypeEnum, priorDistribution) + + val gson = getMetricGson( + getMetricBuilder, + dataFrame, + predictionProbabilitiesCol, + labelCol, + offsetColOption, + weightColOption, + domain) + val result = new H2OMultinomialMetrics() + result.setMetrics(gson, "H2OMultinomialMetrics.calculate") + result + } + + def calculate( + dataFrame: DataFrame, + domain: Array[String], + predictionProbabilitiesCol: String, + labelCol: String, + weightCol: String, + offsetCol: String, + priorDistribution: Array[Double], + aucType: String): H2OMultinomialMetrics = { + calculate( + dataFrame, + domain, + predictionProbabilitiesCol, + labelCol, + Option(weightCol), + Option(offsetCol), + Option(priorDistribution), + aucType) + } +} diff --git a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.scala b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.scala new file mode 100644 index 0000000000..98552725a6 --- /dev/null +++ b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.h2o.sparkling.ml.metrics + +import ai.h2o.sparkling.ml.metrics.H2OBinomialMetrics.getMetricGson +import hex.DistributionFactory +import hex.ModelMetricsRegression.IndependentMetricBuilderRegression +import hex.genmodel.utils.DistributionFamily +import org.apache.spark.sql.DataFrame + +object H2ORegressionMetrics { + + def calculate( + dataFrame: DataFrame, + predictionCol: String = "prediction", + labelCol: String = "label", + weightColOption: Option[String] = None, + offsetColOption: Option[String] = None, + distributionFamily: String = "AUTO"): H2ORegressionMetrics = { + val domainFamilyEnum = DistributionFamily.valueOf(distributionFamily) + val distribution= DistributionFactory.getDistribution(domainFamilyEnum) + val getMetricBuilder = () => new IndependentMetricBuilderRegression[_](distribution) + + val gson = getMetricGson( + getMetricBuilder, + dataFrame, + predictionCol, + labelCol, + offsetColOption, + weightColOption, + null) + val result = new H2ORegressionMetrics() + result.setMetrics(gson, "H2ORegressionMetrics.calculate") + result + } + + def calculate( + dataFrame: DataFrame, + predictionCol: String, + labelCol: String, + weightCol: String, + offsetCol: String, + distributionFamily: String): H2ORegressionMetrics = { + calculate( + dataFrame, + predictionCol, + labelCol, + Option(weightCol), + Option(offsetCol), + distributionFamily) + } +} diff --git a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/MetricCalculation.scala b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/MetricCalculation.scala index 79157d8209..ec02bdb4d3 100644 --- a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/MetricCalculation.scala +++ b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/MetricCalculation.scala @@ -17,127 +17,54 @@ package ai.h2o.sparkling.ml.metrics -import java.io.File - -import ai.h2o.sparkling.ml.internals.H2OModelCategory -import ai.h2o.sparkling.ml.models.{H2OMOJOModel, RowConverter} +import ai.h2o.sparkling.ml.models.RowConverter import ai.h2o.sparkling.ml.utils.{DatasetShape, SchemaUtils} import com.google.gson.{GsonBuilder, JsonObject} import hex._ import hex.ModelMetrics.IndependentMetricBuilder -import hex.ModelMetricsBinomialGLM.{ModelMetricsMultinomialGLM, ModelMetricsOrdinalGLM} -import hex.genmodel.MojoModel -import hex.genmodel.easy.{EasyPredictModelWrapper, RowData} -import org.apache.spark.SparkFiles +import org.apache.spark.{ExposeUtils, ml, mllib} import org.apache.spark.sql.DataFrame import water.api.{Schema, SchemaServer} import water.api.schemas3._ -import org.apache.spark.sql.functions.col -import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, StringType} trait MetricCalculation { - self: H2OMOJOModel => - - /** - * Returns an object holding all metrics of the Double type and also more complex performance information - * calculated on a data frame passed as a parameter. - */ - def getMetricsObject(dataFrame: DataFrame): H2OMetrics = { - val gson = getMetricGson(dataFrame) - - val h2oMojo = unwrapMojoModel() - val modelCategory = H2OModelCategory.fromString(getModelCategory()) - - H2OMetrics.loadMetrics(gson, "realtime_metrics", h2oMojo._algoName, modelCategory, getDataFrameSerializer) - } - - /** - * Returns a map of all metrics of the Double type calculated on a data frame passed as a parameter. - */ - def getMetrics(dataFrame: DataFrame): Map[String, Double] = { - val gson = getMetricGson(dataFrame) - val conversionInput = new JsonObject() - conversionInput.add("realtime_metrics", gson) - - extractMetrics(conversionInput, "realtime_metrics") - } - - private[sparkling] def getActualValuesExtractor(): (RowData, EasyPredictModelWrapper) => Array[Double] = { - (rowData: RowData, wrapper: EasyPredictModelWrapper) => - { - val rawData = new Array[Double](wrapper.m.nfeatures()) - wrapper.fillRawData(rowData, rawData) - } - } - - private[sparkling] def getPredictionGetter(): (EasyPredictModelWrapper, RowData, Double) => Array[Double] = { - (wrapper: EasyPredictModelWrapper, rowData: RowData, offset: Double) => - { - wrapper.preamble(wrapper.m.getModelCategory, rowData, offset) - } - } - - private[sparkling] def getMetricGson(dataFrame: DataFrame): JsonObject = { - val (preparedDF, offsetColOption, weightColOption) = validateAndPrepareDataFrameForMetricCalculation(dataFrame) - val configInitializers = getEasyPredictModelWrapperConfigurationInitializers() - MetricCalculationClosure.getMetricGson( - uid, - mojoFileName, - preparedDF, - offsetColOption, - weightColOption, - configInitializers, - getActualValuesExtractor(), - getPredictionGetter()) - } - private[sparkling] def validateAndPrepareDataFrameForMetricCalculation( - dataFrame: DataFrame): (DataFrame, Option[String], Option[String]) = { + private[sparkling] def getFlattenDataFrame(dataFrame: DataFrame): DataFrame = { val flatDataFrame = DatasetShape.getDatasetShape(dataFrame.schema) match { case DatasetShape.Flat => dataFrame case DatasetShape.StructsOnly | DatasetShape.Nested => SchemaUtils.appendFlattenedStructsToDataFrame(dataFrame, RowConverter.temporaryColumnPrefix) } + flatDataFrame + } + + private[sparkling] def validateDataFrameForMetricCalculation( + flatDataFrame: DataFrame, + labelCol: String, + offsetColOption: Option[String], + weightColOption: Option[String]): Unit = { + + if (labelCol != null && !flatDataFrame.columns.contains(labelCol)) { + throw new IllegalArgumentException( + s"DataFrame passed as a parameter does not contain label column '$labelCol'.") + } - if (hasParam("labelCol")) { - val labelCol = getOrDefault(getParam("labelCol")).toString - if (labelCol != null && !flatDataFrame.columns.contains(labelCol)) { + if (offsetColOption.isDefined) { + val offsetCol = offsetColOption.get + if (!flatDataFrame.columns.contains(offsetCol)) { throw new IllegalArgumentException( - s"DataFrame passed as a parameter does not contain label column '$labelCol'.") + s"DataFrame passed as a parameter does not contain offset column '$offsetCol'.") } } - val (offsetColCastedDF, offsetColOption) = - if (hasParam("offsetCol") && getOrDefault(getParam("offsetCol")) != null) { - val offsetCol = getOrDefault(getParam("offsetCol")).toString - if (!flatDataFrame.columns.contains(offsetCol)) { - throw new IllegalArgumentException( - s"DataFrame passed as a parameter does not contain offset column '$offsetCol'.") - } - (flatDataFrame.withColumn(offsetCol, col(offsetCol).cast(DoubleType)), Some(offsetCol)) - } else { - (flatDataFrame, None) - } - val weightColTuple = if (hasParam("weightCol") && getOrDefault(getParam("weightCol")) != null) { - val weightCol = getOrDefault(getParam("weightCol")).toString + if (weightColOption.isDefined) { + val weightCol = weightColOption.get if (!flatDataFrame.columns.contains(weightCol)) { throw new IllegalArgumentException( s"DataFrame passed as a parameter does not contain weight column '$weightCol'.") } - (offsetColCastedDF.withColumn(weightCol, col(weightCol).cast(DoubleType)), offsetColOption, Some(weightCol)) - } else { - (offsetColCastedDF, offsetColOption, None) } - weightColTuple - } - -} - -object MetricCalculationClosure { - - private[sparkling] def makeMetricBuilder(mojoModel: MojoModel, mojoFileName: String): IndependentMetricBuilder[_] = { - val mojoFile = new File(SparkFiles.get(mojoFileName)) - MojoModel.loadMetricBuilder(mojoModel, mojoFile).asInstanceOf[IndependentMetricBuilder[_]] } private[sparkling] def metricsToSchema(metrics: ModelMetrics): Schema[_, _] = { @@ -147,42 +74,34 @@ object MetricCalculationClosure { schemas.foreach(SchemaServer.register) val schema = SchemaServer.schema(3, metrics) schema match { - case s: ModelMetricsBinomialGLMV3 => s.fillFromImpl(metrics.asInstanceOf[ModelMetricsBinomialGLM]) case s: ModelMetricsBinomialV3[ModelMetricsBinomial, _] => s.fillFromImpl(metrics.asInstanceOf[ModelMetricsBinomial]) - case s: ModelMetricsMultinomialGLMV3 => s.fillFromImpl(metrics.asInstanceOf[ModelMetricsMultinomialGLM]) case s: ModelMetricsMultinomialV3[ModelMetricsMultinomial, _] => s.fillFromImpl(metrics.asInstanceOf[ModelMetricsMultinomial]) - case s: ModelMetricsOrdinalGLMV3 => s.fillFromImpl(metrics.asInstanceOf[ModelMetricsOrdinalGLM]) - case s: ModelMetricsOrdinalV3[ModelMetricsOrdinal, _] => s.fillFromImpl(metrics.asInstanceOf[ModelMetricsOrdinal]) - case s: ModelMetricsRegressionCoxPHV3 => s.fillFromImpl(metrics.asInstanceOf[ModelMetricsRegressionCoxPH]) - case s: ModelMetricsRegressionGLMV3 => s.fillFromImpl(metrics.asInstanceOf[ModelMetricsRegressionGLM]) case s: ModelMetricsRegressionV3[ModelMetricsRegression, _] => s.fillFromImpl(metrics.asInstanceOf[ModelMetricsRegression]) - case s: ModelMetricsClusteringV3 => s.fillFromImpl(metrics.asInstanceOf[ModelMetricsClustering]) - case s: ModelMetricsHGLMV3[ModelMetricsHGLM, _] => s.fillFromImpl(metrics.asInstanceOf[ModelMetricsHGLM]) - case s: ModelMetricsAutoEncoderV3 => s.fillFromImpl(metrics) - case s: ModelMetricsBaseV3[_, _] => s.fillFromImpl(metrics) } schema } private[sparkling] def getMetricGson( - uid: String, - mojoFileName: String, - preparedDF: DataFrame, + createMetricBuilder: () => IndependentMetricBuilder[_], + dataFrame: DataFrame, + predictionCol: String, + labelCol: String, offsetColOption: Option[String], weightColOption: Option[String], - configInitializers: Seq[H2OMOJOModel.EasyPredictModelWrapperConfigurationInitializer], - extractActualValues: (RowData, EasyPredictModelWrapper) => Array[Double], - getPrediction: (EasyPredictModelWrapper, RowData, Double) => Array[Double]): JsonObject = { - val filledMetricsBuilder = preparedDF.rdd + domain: Array[String]): JsonObject = { + val flatDF = getFlattenDataFrame(dataFrame) + val predictionType = flatDF.schema.fields.find(f => f.name == predictionCol).get.dataType + val predictionColIndex = flatDF.schema.indexOf(predictionCol) + val actualType = flatDF.schema.fields.find(f => f.name == labelCol).get.dataType + val actualColIndex = flatDF.schema.indexOf(labelCol) + val filledMetricsBuilder = flatDF.rdd .mapPartitions[IndependentMetricBuilder[_]] { rows => - val wrapper = H2OMOJOModel.loadEasyPredictModelWrapper(uid, mojoFileName, configInitializers) - val metricBuilder = makeMetricBuilder(wrapper.getModel.asInstanceOf[MojoModel], mojoFileName) + val metricBuilder = createMetricBuilder() while (rows.hasNext) { val row = rows.next() - val rowData = RowConverter.toH2ORowData(row) val offset = offsetColOption match { case Some(offsetCol) => row.getDouble(row.fieldIndex(offsetCol)) case None => 0.0d @@ -191,9 +110,26 @@ object MetricCalculationClosure { case Some(weightCol) => row.getDouble(row.fieldIndex(weightCol)) case None => 1.0d } - val prediction = getPrediction(wrapper, rowData, offset) - val actualValues = extractActualValues(rowData, wrapper) - metricBuilder.perRow(prediction, actualValues, weight, offset) + val prediction = predictionType match { + case ArrayType(DoubleType, _) => row.getSeq[Double](predictionColIndex).toArray + case ArrayType(FloatType, _) => row.getSeq[Float](predictionColIndex).map(_.toDouble).toArray + case DoubleType => Array(row.getDouble(predictionColIndex)) + case FloatType => Array(row.getFloat(predictionColIndex).toDouble) + case v if ExposeUtils.isMLVectorUDT(v) => + val vector = row.getAs[ml.linalg.Vector](predictionColIndex) + vector.toDense.values + case _: mllib.linalg.VectorUDT => + val vector = row.getAs[mllib.linalg.Vector](predictionColIndex) + vector.toDense.values + } + val actualValue = actualType match { + case StringType => + val label = row.getString(actualColIndex) + domain.indexOf(label).toDouble + case DoubleType => row.getDouble(actualColIndex) + case FloatType => row.getFloat(actualColIndex) + } + metricBuilder.perRow(prediction, Array(actualValue), weight, offset) } Iterator.single(metricBuilder) } diff --git a/scoring/src/main/scala/ai/h2o/sparkling/ml/models/H2OMOJOModel.scala b/scoring/src/main/scala/ai/h2o/sparkling/ml/models/H2OMOJOModel.scala index fbd148cc88..b6bad23eee 100644 --- a/scoring/src/main/scala/ai/h2o/sparkling/ml/models/H2OMOJOModel.scala +++ b/scoring/src/main/scala/ai/h2o/sparkling/ml/models/H2OMOJOModel.scala @@ -56,7 +56,6 @@ abstract class H2OMOJOModel with SpecificMOJOParameters with H2OBaseMOJOParams with HasFeatureTypesOnMOJO - with MetricCalculation with Logging { H2OMOJOCache.startCleanupThread()