Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#764 Add GPU support via --enable-features #810

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>org.apache.mesos</groupId>
<artifactId>chronos</artifactId>
<version>3.0.1-SNAPSHOT</version>
<version>3.0.3-SNAPSHOT</version>
<inceptionYear>2012</inceptionYear>

<prerequisites>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.mesos.chronos.scheduler.config

import javax.ws.rs.core.{MediaType => CoreMediaType}

object Features {

//enable GPUs
lazy val GPU_RESOURCES = "gpu_resources"

lazy val availableFeatures = Map(
GPU_RESOURCES -> "Enable support for GPU in Chronos (experimental)"
)

def description: String = {
availableFeatures.map { case (name, description) => s"$name - $description" }.mkString(", ")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ trait SchedulerConfiguration extends ScallopConf {
lazy val mesosTaskDisk = opt[Double]("mesos_task_disk",
descr = "Amount of disk capacity to request from Mesos for each task (MB)",
default = Some(256.0))
lazy val mesosTaskGpu = opt[Int]("mesos_task_gpu",
descr = "Number of GPUs to request from Mesos for each task",
default = Some(0))
lazy val mesosCheckpoint = opt[Boolean]("mesos_checkpoint",
descr = "Enable checkpointing in Mesos",
default = Some(true))
Expand Down Expand Up @@ -132,7 +135,13 @@ trait SchedulerConfiguration extends ScallopConf {
lazy val minReviveOffersInterval = opt[Long]("min_revive_offers_interval",
descr = "Do not ask for all offers (also already seen ones) more often than this interval (ms). (Default: 5000)",
default = Some(5000))

lazy val features = opt[String]("enable_features",
descr = s"A comma-separated list of features. Available features are: ${Features.description}",
required = false,
default = None,
noshort = true,
validate = validateFeatures
)

def zooKeeperHostAddresses: Seq[InetSocketAddress] =
for (s <- zookeeperServers().split(",")) yield {
Expand All @@ -151,4 +160,23 @@ trait SchedulerConfiguration extends ScallopConf {
def zooKeeperStatePath = "%s/state".format(zooKeeperPath())

def zooKeeperCandidatePath = "%s/candidate".format(zooKeeperPath())

lazy val availableFeatures: Set[String] = features.get.map(parseFeatures).getOrElse(Set.empty)

private[this] def parseFeatures(str: String): Set[String] =
str.split(',').map(_.trim).filter(_.nonEmpty).toSet

private[this] def validateFeatures(str: String): Boolean = {
val parsed = parseFeatures(str)
// throw exceptions for better error messages
val unknownFeatures = parsed.filter(!Features.availableFeatures.contains(_))
lazy val unknownFeaturesString = unknownFeatures.mkString(", ")
require(
unknownFeatures.isEmpty,
s"Unknown features specified: $unknownFeaturesString. Available features are: ${Features.description}"
)
true
}

def isFeatureSet(name: String): Boolean = availableFeatures.contains(name)
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ trait BaseJob {

def mem: Double = 0

def gpus: Int = 0

def disabled: Boolean = false

def errorsSinceLastSuccess: Long = 0L
Expand Down Expand Up @@ -104,6 +106,7 @@ case class ScheduleBasedJob(
@JsonProperty override val cpus: Double = 0,
@JsonProperty override val disk: Double = 0,
@JsonProperty override val mem: Double = 0,
@JsonProperty override val gpus: Int = 0,
@JsonProperty override val disabled: Boolean = false,
@JsonProperty override val errorsSinceLastSuccess: Long = 0L,
@Deprecated @JsonProperty override val uris: Seq[String] = List(),
Expand Down Expand Up @@ -141,6 +144,7 @@ case class DependencyBasedJob(
@JsonProperty override val cpus: Double = 0,
@JsonProperty override val disk: Double = 0,
@JsonProperty override val mem: Double = 0,
@JsonProperty override val gpus: Int = 0,
@JsonProperty override val disabled: Boolean = false,
@JsonProperty override val errorsSinceLastSuccess: Long = 0L,
@Deprecated @JsonProperty override val uris: Seq[String] = List(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,24 @@ class MesosJobFramework @Inject()(
log.warning("Disconnected")
}

def getReservedResources(offer: Offer): (Double, Double) = {
def getReservedResources(offer: Offer): (Double, Double, Int) = {
val resources = offer.getResourcesList.asScala
val reservedResources = resources.filter({ x => x.hasRole && x.getRole != "*" })
(
getScalarValueOrElse(reservedResources.find(x => x.getName == "cpus"), 0),
getScalarValueOrElse(reservedResources.find(x => x.getName == "mem"), 0)
getScalarValueOrElse(reservedResources.find(x => x.getName == "mem"), 0),
getScalarValueOrElse(reservedResources.find(x => x.getName == "gpus"), 0)
)
}

def getScalarValueOrElse(opt: Option[Resource], value: Double): Double = {
opt.map(x => x.getScalar.getValue).getOrElse(value)
}

def getScalarValueOrElse(opt: Option[Resource], value: Int): Int = {
opt.map(x => x.getScalar.getValue.intValue()).getOrElse(value)
}

//TODO(FL): Persist the UPDATED task or job into ZK such that on failover / reload, we don't have to step through the
// entire task stream.
@Override
Expand Down Expand Up @@ -281,30 +286,34 @@ class MesosJobFramework @Inject()(
class Resources(
var cpus: Double,
var mem: Double,
var disk: Double
var disk: Double,
var gpus: Int
) {
def this(job: BaseJob) {
this(
if (job.cpus > 0) job.cpus else config.mesosTaskCpu(),
if (job.mem > 0) job.mem else config.mesosTaskMem(),
if (job.disk > 0) job.disk else config.mesosTaskDisk()
if (job.disk > 0) job.disk else config.mesosTaskDisk(),
if (job.gpus > 0) job.gpus else config.mesosTaskGpu()
)
}

def canSatisfy(needed: Resources): Boolean = {
(this.cpus >= needed.cpus) &&
(this.mem >= needed.mem) &&
(this.disk >= needed.disk)
(this.disk >= needed.disk)&&
(this.gpus >= needed.gpus)
}

def -=(that: Resources) {
this.cpus -= that.cpus
this.mem -= that.mem
this.disk -= that.disk
this.gpus -= that.gpus
}

override def toString: String = {
"cpus: " + this.cpus + " mem: " + this.mem + " disk: " + this.disk
"cpus: " + this.cpus + " mem: " + this.mem + " disk: " + this.disk + " gpus: " + this.gpus
}
}

Expand All @@ -314,7 +323,8 @@ class MesosJobFramework @Inject()(
new Resources(
getScalarValueOrElse(resources.find(_.getName == "cpus"), 0),
getScalarValueOrElse(resources.find(_.getName == "mem"), 0),
getScalarValueOrElse(resources.find(_.getName == "disk"), 0)
getScalarValueOrElse(resources.find(_.getName == "disk"), 0),
getScalarValueOrElse(resources.find(_.getName == "gpus"), 0)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class MesosTaskBuilder @Inject()(val conf: SchedulerConfiguration) {
final val cpusResourceName = "cpus"
final val memResourceName = "mem"
final val diskResourceName = "disk"
final val gpusResourceName = "gpus"
val taskNameTemplate = "ChronosTask:%s"
//args|command.
// e.g. args: -av (async job), verbose mode
Expand Down Expand Up @@ -124,10 +125,12 @@ class MesosTaskBuilder @Inject()(val conf: SchedulerConfiguration) {
val mem = if (job.mem > 0) job.mem else conf.mesosTaskMem()
val cpus = if (job.cpus > 0) job.cpus else conf.mesosTaskCpu()
val disk = if (job.disk > 0) job.disk else conf.mesosTaskDisk()
val gpus : Int = if (job.gpus > 0) job.gpus else conf.mesosTaskGpu()
taskInfo
.addResources(scalarResource(cpusResourceName, cpus, offer))
.addResources(scalarResource(memResourceName, mem, offer))
.addResources(scalarResource(diskResourceName, disk, offer))
.addResources(scalarResource(gpusResourceName, gpus, offer))

taskInfo
}
Expand All @@ -145,6 +148,7 @@ class MesosTaskBuilder @Inject()(val conf: SchedulerConfiguration) {
"CHRONOS_RESOURCE_MEM" -> job.mem.toString,
"CHRONOS_RESOURCE_CPU" -> job.cpus.toString,
"CHRONOS_RESOURCE_DISK" -> job.disk.toString,
"CHRONOS_RESOURCE_GPU" -> job.gpus.toString,
"CHRONOS_JOB_RUN_TIME" -> start.toString,
"CHRONOS_JOB_RUN_ATTEMPT" -> attempt.toString
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import java.util.logging.Logger
import com.google.protobuf.ByteString
import mesosphere.chaos.http.HttpConf
import org.apache.mesos.Protos.{Credential, FrameworkID, FrameworkInfo}
import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import org.apache.mesos.chronos.scheduler.config.{Features, SchedulerConfiguration}
import org.apache.mesos.{
MesosSchedulerDriver,
Protos,
Scheduler,
SchedulerDriver
}
import FrameworkInfo.Capability

import scala.collection.JavaConverters.asScalaSetConverter

Expand Down Expand Up @@ -82,6 +83,11 @@ class SchedulerDriverBuilder {
config.mesosAuthenticationPrincipal.get
.foreach(frameworkInfoBuilder.setPrincipal)

if (config.isFeatureSet(Features.GPU_RESOURCES)) {
frameworkInfoBuilder.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES))
log.info("GPU_RESOURCES feature enabled.")
}

frameworkInfoBuilder.build()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
JobDeserializer.config.mesosTaskMem()
else 0

val gpus =
if (node.has("gpus") && node.get("gpus") != null && node
.get("gpus")
.asInt != 0) node.get("gpus").asInt
else if (JobDeserializer.config != null)
JobDeserializer.config.mesosTaskGpu()
else 0

val errorsSinceLastSuccess =
if (node.has("errorsSinceLastSuccess") && node.get(
"errorsSinceLastSuccess") != null)
Expand Down Expand Up @@ -396,6 +404,7 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
cpus = cpus,
disk = disk,
mem = mem,
gpus = gpus,
disabled = disabled,
concurrent = concurrent,
errorsSinceLastSuccess = errorsSinceLastSuccess,
Expand Down Expand Up @@ -433,6 +442,7 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
cpus = cpus,
disk = disk,
mem = mem,
gpus = gpus,
disabled = disabled,
concurrent = concurrent,
errorsSinceLastSuccess = errorsSinceLastSuccess,
Expand Down Expand Up @@ -469,6 +479,7 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
cpus = cpus,
disk = disk,
mem = mem,
gpus = gpus,
disabled = disabled,
errorsSinceLastSuccess = errorsSinceLastSuccess,
fetch = fetch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class JobSerializer extends JsonSerializer[BaseJob] {
json.writeFieldName("mem")
json.writeNumber(baseJob.mem)

json.writeFieldName("gpus")
json.writeNumber(baseJob.gpus)

json.writeFieldName("disabled")
json.writeBoolean(baseJob.disabled)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MesosTaskBuilderSpec extends SpecificationWithJUnit with Mockito {

ScheduleBasedJob("FOO/BAR/BAM", "AJob", "noop", 10L, 20L,
"fooexec", "fooflags", "none", 7, "[email protected]", "Foo", "Test schedule based job", "TODAY",
"YESTERDAY", cpus = 2, disk = 3, mem = 5, container = container, environmentVariables = Seq(),
"YESTERDAY", cpus = 2, disk = 3, mem = 5, gpus = 0, container = container, environmentVariables = Seq(),
shell = true, arguments = Seq(), softError = true, constraints = constraints)
}

Expand All @@ -58,6 +58,7 @@ class MesosTaskBuilderSpec extends SpecificationWithJUnit with Mockito {
"CHRONOS_RESOURCE_MEM" -> job.mem.toString,
"CHRONOS_RESOURCE_CPU" -> job.cpus.toString,
"CHRONOS_RESOURCE_DISK" -> job.disk.toString,
"CHRONOS_RESOURCE_GPU" -> job.gpus.toString,
"CHRONOS_JOB_RUN_TIME" -> start.toString,
"CHRONOS_JOB_RUN_ATTEMPT" -> attempt.toString
)
Expand Down