diff --git a/api/compose.go b/api/compose.go index b5bff2e..0cf590c 100644 --- a/api/compose.go +++ b/api/compose.go @@ -132,7 +132,9 @@ func (e *API) getGPUs(cmd *cfg.Command) []*mesosproto.Parameter { if e.Service.GPUs.Driver == "nvidia" && e.Service.GPUs.Device >= 0 { i := strconv.Itoa(e.Service.GPUs.Device) param = e.addDockerParameter(param, "gpus", "device="+i) - cmd.GPUs, _ = strconv.ParseFloat(i, 64) + if e.Config.EnableGPUAllocation { + cmd.GPUs, _ = strconv.ParseFloat(i, 64) + } } } diff --git a/changelog.md b/changelog.md index 6c067b0..da736ab 100644 --- a/changelog.md +++ b/changelog.md @@ -14,6 +14,11 @@ - FIX: [API] Restart service - DEL: [API] Remove useless restart task API - FIX: Force suppress after successfull framework subscription to prevent unwanted offers +- ADD: GPU Allocation Option in Mesos. Can still use GPU's on the host but will not be allocated in mesos. +- ADD/FIX: TASK_LOST Update causes TASKS to be killed +- FIX: Unwanted Decline Offer causing duplicate declines removed +- ADD: Host constraint: Option to only accept/request offers from particular hosts. +- FIX: Continously accepting offers when no task running. ## v1.1.3 diff --git a/init.go b/init.go index 837d5f4..2e304dd 100644 --- a/init.go +++ b/init.go @@ -58,6 +58,7 @@ func init() { config.VaultTimeout, _ = time.ParseDuration(util.Getenv("VAULT_TIMEOUT", "10s")) config.DiscoveryInfoNameDelimiter = util.Getenv("DISCOVERY_INFONAME_DELIMITER", ".") config.DiscoveryPortNameDelimiter = util.Getenv("DISCOVERY_PORTNAME_DELIMITER", "_") + config.TaskLostRemovesTask, _ = strconv.ParseBool(util.Getenv("TASK_LOST_REMOVE_TASK", "true")) // Enable Threads if strings.Compare(util.Getenv("THREAD_ENABLE", "false"), "false") == 0 { @@ -87,6 +88,13 @@ func init() { config.SkipSSL = false } + // Enable GPU Allocation in Mesos.. If false, GPU can still be used but allocation wont be impacted in mesos + if strings.Compare(util.Getenv("ENABLE_GPU_ALLOCATION", "true"), "true") == 0 { + config.EnableGPUAllocation = true + } else { + config.EnableGPUAllocation = false + } + listen := fmt.Sprintf(":%s", framework.FrameworkPort) failoverTimeout := 5000.0 @@ -101,6 +109,13 @@ func init() { webuiurl = os.Getenv("FRAMEWORK_WEBUIURL") } + // Set Hostname Constraints to not block offers of other hosts, optional + HostConstraintList := util.Getenv("HOST_CONSTRAINT_LIST", "") + if HostConstraintList != "" { + hostLists := strings.Split(HostConstraintList, ",") + config.HostConstraintsList = append(config.HostConstraintsList, hostLists...) + } + framework.CommandChan = make(chan cfg.Command, 100) config.Hostname = framework.FrameworkHostname config.Listen = listen diff --git a/mesos/mesos.go b/mesos/mesos.go index bff8e4b..ebd5019 100644 --- a/mesos/mesos.go +++ b/mesos/mesos.go @@ -54,6 +54,42 @@ func (e *Mesos) Subscribe() { }, } + if len(e.Config.HostConstraintsList) > 0 { + + offerConstraintGroups := []*mesosproto.OfferConstraints_RoleConstraints_Group{} + for _, hostname := range e.Config.HostConstraintsList { + offerConstraint := mesosproto.OfferConstraints_RoleConstraints_Group{ + AttributeConstraints: []*mesosproto.AttributeConstraint{ + { + Selector: &mesosproto.AttributeConstraint_Selector{ + Selector: &mesosproto.AttributeConstraint_Selector_PseudoattributeType_{ + PseudoattributeType: mesosproto.AttributeConstraint_Selector_HOSTNAME, + }, + }, + Predicate: &mesosproto.AttributeConstraint_Predicate{ + Predicate: &mesosproto.AttributeConstraint_Predicate_TextEquals_{ + TextEquals: &mesosproto.AttributeConstraint_Predicate_TextEquals{ + Value: &hostname, + }, + }, + }, + }, + }, + } + offerConstraintGroups = append(offerConstraintGroups, &offerConstraint) + } + + offerConstraints := &mesosproto.OfferConstraints{ + RoleConstraints: map[string]*mesosproto.OfferConstraints_RoleConstraints{ + e.Framework.FrameworkRole: { + Groups: offerConstraintGroups, + }, + }, + } + + subscribeCall.Subscribe.OfferConstraints = offerConstraints + } + logrus.WithField("func", "scheduler.Subscribe").Debug(subscribeCall) body, _ := marshaller.Marshal(subscribeCall) client := &http.Client{} diff --git a/scheduler/handle_offers.go b/scheduler/handle_offers.go index c47f833..50c3629 100644 --- a/scheduler/handle_offers.go +++ b/scheduler/handle_offers.go @@ -157,7 +157,6 @@ func (e *Scheduler) getOffer(offers *mesosproto.Event_Offers, cmd *cfg.Command) if offerret.GetHostname() != "" { offerIds = e.removeOffer(offerIds, offerret.GetId().GetValue()) } - e.Mesos.Call(e.Mesos.DeclineOffer(offerIds)) return offerret, offerIds } diff --git a/scheduler/handle_update.go b/scheduler/handle_update.go index 7f7e014..5d13a91 100644 --- a/scheduler/handle_update.go +++ b/scheduler/handle_update.go @@ -73,7 +73,6 @@ func (e *Scheduler) HandleUpdate(event *mesosproto.Event) { e.Redis.DelRedisKey(task.TaskName + ":" + task.TaskID) task.TaskID = e.API.IncreaseTaskCount(task.TaskID) task.State = "" - break case mesosproto.TaskState_TASK_LOST: if task.TaskID == "" { e.Mesos.Call(msg) @@ -81,9 +80,16 @@ func (e *Scheduler) HandleUpdate(event *mesosproto.Event) { } logrus.WithField("func", "scheduler.HandleUpdate").Warn("Task State: " + task.State + " " + task.TaskID + " (" + task.TaskName + ")") e.Redis.DelRedisKey(task.TaskName + ":" + task.TaskID) - e.Mesos.ForceSuppressFramework() - e.Mesos.Call(msg) - return + + // Remove task if lost as a default behaviour.. This can be overriden using ENV variable "TASK_LOST_REMOVE_TASK" which will restart the task + if e.Config.TaskLostRemovesTask { + e.Mesos.ForceSuppressFramework() + e.Mesos.Call(msg) + } else { + task.TaskID = e.API.IncreaseTaskCount(task.TaskID) + task.State = "" + } + case mesosproto.TaskState_TASK_RUNNING: if !e.Mesos.IsSuppress { logrus.WithField("func", "scheduler.HandleUpdate").Info("Task State: " + task.State + " " + task.TaskID + " (" + task.TaskName + ")") @@ -92,8 +98,8 @@ func (e *Scheduler) HandleUpdate(event *mesosproto.Event) { task.MesosAgent = e.Mesos.GetAgentInfo(update.Status.GetAgentId().GetValue()) task.NetworkInfo = e.Mesos.GetNetworkInfo(task.TaskID) task.Agent = update.Status.GetAgentId().GetValue() - - e.Mesos.SuppressFramework() + default: + logrus.WithField("func", "scheduler.HandleUpdate").Warn("Task State: " + task.State + " " + task.TaskID + " (" + task.TaskName + "). State not handled, no action has been taken") } // save the new state diff --git a/types/types.go b/types/types.go index 7cadc41..2f1185c 100644 --- a/types/types.go +++ b/types/types.go @@ -42,6 +42,9 @@ type Config struct { Plugins map[string]*plugin.Plugin PluginsEnable bool ThreadEnable bool + EnableGPUAllocation bool + HostConstraintsList []string + TaskLostRemovesTask bool } // UserCredentials - The Username and Password to authenticate against this framework