Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GH-5699] Scala 2.13 preparations - part 1 #5700

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ trait MetricResolver {
val (swFieldName, swMetricName) = MetricNameConverter.convertFromH2OToSW(field.getName)
Metric(swFieldName, swMetricName, field.getName, field.getType, field.getAnnotation(classOf[API]).help())
}
parameters
parameters.toSeq
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/ai/h2o/sparkling/H2OColumnType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package ai.h2o.sparkling

object H2OColumnType extends Enumeration {
val enum, string, int, real, time, uuid = Value
val `enum`, string, int, real, time, uuid = Value

def fromString(dataType: String): Value = {
values.find(_.toString == dataType).getOrElse(throw new RuntimeException(s"Unknown H2O's Data type $dataType"))
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/ai/h2o/sparkling/H2OContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class H2OContext private[sparkling] (private val conf: H2OConf) extends H2OConte
cloudV3.compiled_on)
val h2oClusterInfo = H2OClusterInfo(
s"$flowIp:$flowPort",
visibleFlowURL,
visibleFlowURL(),
cloudV3.cloud_healthy,
cloudV3.internal_security_enabled,
nodes.map(_.ipPort()),
Expand Down Expand Up @@ -305,7 +305,7 @@ class H2OContext private[sparkling] (private val conf: H2OConf) extends H2OConte
| ${nodes.mkString("\n ")}
| ------------------------
|
| Open H2O Flow in browser: ${getFlowUIHint}
| Open H2O Flow in browser: ${getFlowUIHint()}
|
""".stripMargin
val sparkYarnAppId = if (sparkContext.master.toLowerCase.startsWith("yarn")) {
Expand Down Expand Up @@ -475,8 +475,8 @@ object H2OContext extends Logging {

private def logStartingInfo(conf: H2OConf): Unit = {
logInfo("Sparkling Water version: " + BuildInfo.SWVersion)
val unsupportedSuffix = if (getFirstUnsupportedSWVersion.isDefined) " (unsupported)" else ""
val deprecationSuffix = if (isSparkVersionDeprecated) " (deprecated)" else ""
val unsupportedSuffix = if (getFirstUnsupportedSWVersion().isDefined) " (unsupported)" else ""
val deprecationSuffix = if (isSparkVersionDeprecated()) " (deprecated)" else ""
logInfo("Spark version: " + SparkSessionUtils.active.version + unsupportedSuffix + deprecationSuffix)
logInfo("Integrated H2O version: " + BuildInfo.H2OVersion)
logInfo("The following Spark configuration is used: \n " + conf.getAll.mkString("\n "))
Expand All @@ -502,7 +502,7 @@ object H2OContext extends Logging {
s"Apache Spark ${SparkSessionUtils.active.version} is unsupported" +
s"since the Sparkling Water version ${unsupportedSWVersion.get}.")
}
if (isSparkVersionDeprecated) {
if (isSparkVersionDeprecated()) {
logWarning(
s"Apache Spark ${SparkSessionUtils.active.version} is deprecated and " +
"the support will be removed in the Sparkling Water version 3.44.")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/ai/h2o/sparkling/H2OFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class H2OFrame private (
this
} else {
val endpoint = getClusterEndpoint(conf)
val colIndices = columns.map(columnNames.indexOf)
val colIndices = columns.map(col => columnNames.indexOf(col))
val newFrameId = s"${frameId}_subframe_${colIndices.mkString("_")}"
val params = Map(
"ast" -> MessageFormat.format(s"( assign {0} (cols {1} {2}))", newFrameId, frameId, stringifyArray(colIndices)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.SparkConf
object SparklingWaterDriver {

/** Entry point */
def main(args: Array[String]) {
def main(args: Array[String]): Unit = {
// Configure this application
val conf: SparkConf = H2OConf.checkSparkConf(
new SparkConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[backend] class H2ODataFrame(val frame: H2OFrame, val requiredColumns: Ar
if (requiredColumns == null) {
colNames.indices.toArray
} else {
requiredColumns.map(colNames.indexOf)
requiredColumns.map(col => colNames.indexOf(col))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private[backend] class H2ORDD[A <: Product: TypeTag: ClassTag](val frame: H2OFra
private def columnReaders(rcc: Reader) = {
val readerMapByName = (rcc.OptionReaders ++ rcc.SimpleReaders).map {
case (supportedType, reader) => supportedType.name -> reader
}
}.toMap
productType.memberTypeNames.map(name => readerMapByName(name))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private[this] object SupportedDataset {
override def toH2OFrame(hc: H2OContext, frameKeyName: Option[String]): H2OFrame = {
val spark = SparkSessionUtils.active
import spark.implicits._
SparkDataFrameConverter.toH2OFrame(hc, dataset.map(v => Tuple1(v)).toDF, frameKeyName)
SparkDataFrameConverter.toH2OFrame(hc, dataset.map(v => Tuple1(v)).toDF(), frameKeyName)
}
}

Expand All @@ -171,7 +171,7 @@ private[this] object SupportedDataset {
override def toH2OFrame(hc: H2OContext, frameKeyName: Option[String]): H2OFrame = {
val spark = SparkSessionUtils.active
import spark.implicits._
SparkDataFrameConverter.toH2OFrame(hc, dataset.map(v => Tuple1(v)).toDF, frameKeyName)
SparkDataFrameConverter.toH2OFrame(hc, dataset.map(v => Tuple1(v)).toDF(), frameKeyName)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,15 @@ private[this] object SupportedRDD {
override def toH2OFrame(hc: H2OContext, frameKeyName: Option[String]): H2OFrame = {
val spark = SparkSessionUtils.active
import spark.implicits._
SparkDataFrameConverter.toH2OFrame(hc, rdd.map(v => Tuple1(v)).toDF, frameKeyName)
SparkDataFrameConverter.toH2OFrame(hc, rdd.map(v => Tuple1(v)).toDF(), frameKeyName)
}
}

implicit def toH2OFrameFromRDDMLlibVector(rdd: RDD[mllib.linalg.Vector]): SupportedRDD = new SupportedRDD {
override def toH2OFrame(hc: H2OContext, frameKeyName: Option[String]): H2OFrame = {
val spark = SparkSessionUtils.active
import spark.implicits._
SparkDataFrameConverter.toH2OFrame(hc, rdd.map(v => Tuple1(v)).toDF, frameKeyName)
SparkDataFrameConverter.toH2OFrame(hc, rdd.map(v => Tuple1(v)).toDF(), frameKeyName)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ trait ExternalBackendConf extends SharedBackendConf with Logging with ExternalBa

def setExternalExtraJars(commaSeparatedPaths: String): H2OConf = set(PROP_EXTERNAL_EXTRA_JARS._1, commaSeparatedPaths)

def setExternalExtraJars(paths: java.util.ArrayList[String]): H2OConf = setExternalExtraJars(paths.asScala)
def setExternalExtraJars(paths: java.util.ArrayList[String]): H2OConf = setExternalExtraJars(paths.asScala.toList)

def setExternalExtraJars(paths: Seq[String]): H2OConf = setExternalExtraJars(paths.mkString(","))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import water.fvec.Vec

import scala.reflect.runtime.universe
import scala.reflect.runtime.universe._

/**
Expand All @@ -46,7 +47,7 @@ private[sparkling] object SupportedTypes extends Enumeration {
def name: NameOfType = toString
}

implicit val mirror = runtimeMirror(getClass.getClassLoader)
implicit val mirror: universe.Mirror = runtimeMirror(getClass.getClassLoader)

def typeForClass[T](clazz: Class[T])(implicit runtimeMirror: Mirror) = runtimeMirror.classSymbol(clazz).toType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import ai.h2o.sparkling.utils.SparkSessionUtils
import ai.h2o.sparkling.{H2OConf, H2OContext}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.expose.Utils
import org.apache.spark.h2o.backends.internal.InternalH2OBackend._
import org.apache.spark.h2o.backends.internal.InternalH2OBackend.{distributeFlatFile, getLeaderNode, lockCloud, registerEndpoints, registerNewExecutorListener, startH2OWorkers, startSingleH2OWorker, tearDownEndpoints, waitForClusterSize}
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private[spark] class SpreadRDDBuilder(@transient private val hc: H2OContext, num
val sb = sc.schedulerBackend
sb match {
case _: LocalSchedulerBackend => 1
case b: CoarseGrainedSchedulerBackend => b.getExecutorIds.length
case b: CoarseGrainedSchedulerBackend => b.getExecutorIds().length
case _ => SparkEnv.get.blockManager.master.getStorageStatus.length - 1
}
}
Expand Down
4 changes: 3 additions & 1 deletion gradle/scala.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ configurations
}

dependencies {
scalaCompilerPlugin "org.scalamacros:paradise_${scalaVersion}:2.1.1"
if (scalaVersion < '2.13') {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need it for scala 2.13 as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Scala 2.13, the plugin's functionality has been included in the compiler directly under the -Ymacro-annotations flag.

scalaCompilerPlugin "org.scalamacros:paradise_${scalaVersion}:2.1.1"
}
}

// Activate Zinc compiler and configure scalac
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[sparkling] class H2OModel private (val modelId: String) extends RestComm

private[sparkling] def tryDelete(): Unit =
try {
getCrossValidationModels.foreach(_.foreach(_.tryDelete()))
getCrossValidationModels().foreach(_.foreach(_.tryDelete()))
delete()
} catch {
case e: Throwable => logWarning(s"Unsuccessful try to delete model '${this.modelId}'", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class H2OTargetEncoderModel(override val uid: String, targetEncoderModel: Option
val hc = H2OContext.ensure(
"H2OContext needs to be created in order to use target encoding. Please create one as H2OContext.getOrCreate().")
val temporaryColumn = getClass.getSimpleName + "_temporary_id"
val withIdDF = dataset.withColumn(temporaryColumn, monotonically_increasing_id)
val withIdDF = dataset.withColumn(temporaryColumn, monotonically_increasing_id())
val flatDF = SchemaUtils.flattenDataFrame(withIdDF)
val distinctInputCols = getInputCols().flatten.distinct
val relevantColumns = distinctInputCols ++ Array(getLabelCol(), getFoldCol(), temporaryColumn).flatMap(Option(_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DictionaryParam(parent: Params, name: String, doc: String, isValid: java.u
def this(parent: Params, name: String, doc: String) =
this(parent, name, doc, _ => true)

implicit def formats = DefaultFormats
implicit def formats: DefaultFormats.type = DefaultFormats

override def jsonEncode(dictionary: java.util.Map[String, Double]): String = write(dictionary.asScala)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ trait HasLossByColNames extends H2OAlgoParamsBase {
null
} else {
val frameColumns = trainingFrame.columnNames
val indices = names.map(frameColumns.indexOf)
val indices = names.map(col => frameColumns.indexOf(col))
indices
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ trait HasRandomCols extends H2OAlgoParamsBase with H2OAlgoCommonUtils {
null
} else {
val frameColumns = trainingFrame.columnNames
val indices = randomColumnNames.map(frameColumns.indexOf)
val indices = randomColumnNames.map(col => frameColumns.indexOf(col))
indices
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ class HyperParamsParam(
case JNull =>
null
case JArray(values) =>
val bytes = values.map {
val bytes: Array[Byte] = values.map {
case JInt(x) =>
x.byteValue()
x.byteValue
case _ =>
throw new IllegalArgumentException(s"Cannot decode $json to Byte.")
}.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ class NullableStringPairArrayParam(
case JNull =>
null
case JArray(values) =>
val bytes = values.map {
val bytes: Array[Byte] = values.map {
case JInt(x) =>
x.byteValue()
x.byteValue
case _ =>
throw new IllegalArgumentException(s"Cannot decode $json to Byte.")
}.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ trait H2OMOJOFlattenedInput {
val relevantColumnNames = getRelevantColumnNames(flatDataFrame, inputs)
val args = relevantColumnNames.map(c => flatDataFrame(s"`$c`"))
val udf = udfConstructor(relevantColumnNames)
flatDataFrame.withColumn(outputColumnName, udf(struct(args: _*)))
flatDataFrame.withColumn(outputColumnName, udf(struct(args.toIndexedSeq: _*)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,9 @@ abstract class H2OMOJOModel
// Propagation of offset to EasyPredictModelWrapper was introduced with H2OSupervisedMOJOModel.
// `lit(0.0)` represents a column with zero values (offset disabled) to ensure backward-compatibility of
// MOJO models.
flatDataFrame.withColumn(outputColumnName, udf(struct(args: _*), lit(0.0)))
flatDataFrame.withColumn(outputColumnName, udf(struct(args.toIndexedSeq: _*), lit(0.0)))
case _ =>
flatDataFrame.withColumn(outputColumnName, udf(struct(args: _*)))
flatDataFrame.withColumn(outputColumnName, udf(struct(args.toIndexedSeq: _*)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,16 @@ class H2OSupervisedMOJOModel(override val uid: String) extends H2OAlgorithmMOJOM
if (!flatDataFrame.columns.contains(offsetColumn)) {
throw new RuntimeException("Offset column must be present within the dataset!")
}
flatDataFrame.withColumn(outputColumnName, udf(struct(args: _*), col(getOffsetCol()).cast(DoubleType)))
flatDataFrame.withColumn(
outputColumnName,
udf(struct(args.toIndexedSeq: _*), col(getOffsetCol()).cast(DoubleType)))
} else {
// Methods of EasyPredictModelWrapper for given prediction categories take offset as parameter.
// `lit(0.0)` represents a column with zero values (offset disabled).
flatDataFrame.withColumn(outputColumnName, udf(struct(args: _*), lit(0.0)))
flatDataFrame.withColumn(outputColumnName, udf(struct(args.toIndexedSeq: _*), lit(0.0)))
}
case _ =>
flatDataFrame.withColumn(outputColumnName, udf(struct(args: _*)))
flatDataFrame.withColumn(outputColumnName, udf(struct(args.toIndexedSeq: _*)))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ trait H2OTargetEncoderBase extends PipelineStage with H2OTargetEncoderMOJOParams

private[sparkling] def createVectorColumn(df: DataFrame, name: String, sourceColumns: Array[String]): DataFrame = {
val assembler = new VectorAssembler().setInputCols(sourceColumns).setOutputCol(name)
assembler.transform(df).drop(sourceColumns: _*)
assembler.transform(df).drop(sourceColumns.toIndexedSeq: _*)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ case class H2OTargetEncoderMOJOUdfWrapper(
new EasyPredictModelWrapper(config)
}

@transient private lazy val positions: Seq[Int] = mojoModel._inoutMapping.asScala.map(_.to.length).scanLeft(0)(_ + _)
@transient private lazy val positions: Seq[Int] =
mojoModel._inoutMapping.asScala.map(_.to.length).scanLeft(0)(_ + _).toSeq

val mojoUdf: UserDefinedFunction =
udf[Array[Option[Vector]], Row] { r: Row =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ class NullableStringArrayArrayParam(parent: Params, name: String, doc: String, i
case JNull =>
null
case JArray(values) =>
val bytes = values.map {
val bytes: Array[Byte] = values.map {
case JInt(x) =>
x.byteValue()
x.byteValue
case _ =>
throw new IllegalArgumentException(s"Cannot decode $json to Byte.")
}.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ class NullableStringPairArrayParam(
case JNull =>
null
case JArray(values) =>
val bytes = values.map {
val bytes: Array[Byte] = values.map {
case JInt(x) =>
x.byteValue()
x.byteValue
case _ =>
throw new IllegalArgumentException(s"Cannot decode $json to Byte.")
}.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ object SchemaUtils {

def flattenSchema(df: DataFrame): StructType = {
val rowSchemas = rowsToRowSchemas(df)
val mergedSchema = mergeRowSchemas(rowSchemas)
val mergedSchema = mergeRowSchemas(rowSchemas).toList
StructType(mergedSchema.map(_.field))
}

Expand Down Expand Up @@ -221,13 +221,13 @@ object SchemaUtils {
if (data != null) {
dataType match {
case BinaryType =>
flattenBinaryType(qualifiedName, ByteType, nullable, metadata, data, path)
flattenBinaryType(qualifiedName, ByteType, nullable, metadata, data, path).toList
case MapType(_, valueType, containsNull) =>
flattenMapType(qualifiedName, valueType, containsNull || nullable, metadata, data, path)
flattenMapType(qualifiedName, valueType, containsNull || nullable, metadata, data, path).toList
case ArrayType(elementType, containsNull) =>
flattenArrayType(qualifiedName, elementType, containsNull || nullable, metadata, data, path)
flattenArrayType(qualifiedName, elementType, containsNull || nullable, metadata, data, path).toList
case StructType(fields) =>
flattenStructType(qualifiedName, nullable, metadata, fields, data, path)
flattenStructType(qualifiedName, nullable, metadata, fields, data, path).toList
case dt =>
FieldWithOrder(StructField(qualifiedName, dt, nullable, metadata), path.reverse) :: Nil
}
Expand Down Expand Up @@ -317,7 +317,7 @@ object SchemaUtils {
targetColPrefix: Option[String] = None,
nullable: Boolean = false): Seq[(StructField, String)] = {

val flattened = schema.fields.flatMap { f =>
val flattened = schema.fields.toList.flatMap { f =>
val escaped = if (f.name.contains(".")) "`" + f.name + "`" else f.name
val colName = if (sourceColPrefix.isDefined) sourceColPrefix.get + "." + escaped else escaped
val newName = if (targetColPrefix.isDefined) targetColPrefix.get + "." + f.name else f.name
Expand Down Expand Up @@ -402,7 +402,7 @@ object SchemaUtils {
getCollectionSize(row, idx)
}
}
if (sizes.isEmpty) {
if (sizes.isEmpty()) {
Array(0)
} else {
sizes.reduce((a, b) => a.indices.map(i => if (a(i) > b(i)) a(i) else b(i))).toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object SparkSessionUtils extends Logging {
}

def createSparkSession(conf: SparkConf, forceHive: Boolean = false): SparkSession = {
val builder = SparkSession.builder.config(conf)
val builder = SparkSession.builder().config(conf)
if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase == "hive" || forceHive) {
if (ExposeUtils.hiveClassesArePresent || forceHive) {
builder.enableHiveSupport()
Expand Down