Skip to content
1 change: 1 addition & 0 deletions api/V0ComposeKillTask.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (e *API) V0ComposeKillTask(w http.ResponseWriter, r *http.Request) {
logrus.WithField("func", "api.V0ComposeKillTask").Infof("Kill Task (%s)", taskID)

task.State = "__KILL"
task.ExpectedState = "__KILL"
task.Restart = "no"
task.Instances = 0
e.Redis.SaveTaskRedis(task)
Expand Down
4 changes: 3 additions & 1 deletion api/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ func (e *API) getGPUs(cmd *cfg.Command) []*mesosproto.Parameter {
if e.Service.GPUs.Driver == "nvidia" && e.Service.GPUs.Device >= 0 {
i := strconv.Itoa(e.Service.GPUs.Device)
param = e.addDockerParameter(param, "gpus", "device="+i)
cmd.GPUs, _ = strconv.ParseFloat(i, 64)
if e.Config.EnableGPUAllocation {
cmd.GPUs, _ = strconv.ParseFloat(i, 64)
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
- FIX: [API] Restart service
- DEL: [API] Remove useless restart task API
- FIX: Force suppress after successfull framework subscription to prevent unwanted offers
- ADD: GPU Allocation Option in Mesos. Can still use GPU's on the host but will not be allocated in mesos.
- ADD/FIX: TASK_LOST Update causes TASKS to be killed
- FIX: Unwanted Decline Offer causing duplicate declines removed
- ADD: Host constraint: Option to only accept/request offers from particular hosts.
- FIX: Continously accepting offers when no task running.

## v1.1.3

Expand Down
14 changes: 14 additions & 0 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ func init() {
config.SkipSSL = false
}

// Enable GPU Allocation in Mesos.. If false, GPU can still be used but allocation wont be impacted in mesos
if strings.Compare(util.Getenv("ENABLE_GPU_ALLOCATION", "true"), "true") == 0 {
config.EnableGPUAllocation = true
} else {
config.EnableGPUAllocation = false
}

listen := fmt.Sprintf(":%s", framework.FrameworkPort)

failoverTimeout := 5000.0
Expand All @@ -101,6 +108,13 @@ func init() {
webuiurl = os.Getenv("FRAMEWORK_WEBUIURL")
}

// Set Hostname Constraints to not block offers of other hosts, optional
HostConstraintList := util.Getenv("HOST_CONSTRAINT_LIST", "")
if HostConstraintList != "" {
hostLists := strings.Split(HostConstraintList, ",")
config.HostConstraintsList = append(config.HostConstraintsList, hostLists...)
}

framework.CommandChan = make(chan cfg.Command, 100)
config.Hostname = framework.FrameworkHostname
config.Listen = listen
Expand Down
36 changes: 36 additions & 0 deletions mesos/mesos.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,42 @@ func (e *Mesos) Subscribe() {
},
}

if len(e.Config.HostConstraintsList) > 0 {

offerConstraintGroups := []*mesosproto.OfferConstraints_RoleConstraints_Group{}
for _, hostname := range e.Config.HostConstraintsList {
offerConstraint := mesosproto.OfferConstraints_RoleConstraints_Group{
AttributeConstraints: []*mesosproto.AttributeConstraint{
{
Selector: &mesosproto.AttributeConstraint_Selector{
Selector: &mesosproto.AttributeConstraint_Selector_PseudoattributeType_{
PseudoattributeType: mesosproto.AttributeConstraint_Selector_HOSTNAME,
},
},
Predicate: &mesosproto.AttributeConstraint_Predicate{
Predicate: &mesosproto.AttributeConstraint_Predicate_TextEquals_{
TextEquals: &mesosproto.AttributeConstraint_Predicate_TextEquals{
Value: &hostname,
},
},
},
},
},
}
offerConstraintGroups = append(offerConstraintGroups, &offerConstraint)
}

offerConstraints := &mesosproto.OfferConstraints{
RoleConstraints: map[string]*mesosproto.OfferConstraints_RoleConstraints{
e.Framework.FrameworkRole: {
Groups: offerConstraintGroups,
},
},
}

subscribeCall.Subscribe.OfferConstraints = offerConstraints
}

logrus.WithField("func", "scheduler.Subscribe").Debug(subscribeCall)
body, _ := marshaller.Marshal(subscribeCall)
client := &http.Client{}
Expand Down
1 change: 0 additions & 1 deletion scheduler/handle_offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func (e *Scheduler) getOffer(offers *mesosproto.Event_Offers, cmd *cfg.Command)
if offerret.GetHostname() != "" {
offerIds = e.removeOffer(offerIds, offerret.GetId().GetValue())
}
e.Mesos.Call(e.Mesos.DeclineOffer(offerIds))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should it be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unwanted offers were being declined twice .. Once in here and the other when these offerIds were returned to the HandleOffers function at the end to decline the unneeded offers ..This caused errors in mesos saying offer already declined

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but there are two cases where the outside decline (line 81) does not catch. Both of them are errors. So maybe it makes more sense to keep decline (line 160) and remove the one in line 81? What do you think?

return offerret, offerIds
}

Expand Down
17 changes: 11 additions & 6 deletions scheduler/handle_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,22 @@ func (e *Scheduler) HandleUpdate(event *mesosproto.Event) {
e.Redis.DelRedisKey(task.TaskName + ":" + task.TaskID)
task.TaskID = e.API.IncreaseTaskCount(task.TaskID)
task.State = ""
break
case mesosproto.TaskState_TASK_LOST:
if task.TaskID == "" {
e.Mesos.Call(msg)
return
}
logrus.WithField("func", "scheduler.HandleUpdate").Warn("Task State: " + task.State + " " + task.TaskID + " (" + task.TaskName + ")")
e.Redis.DelRedisKey(task.TaskName + ":" + task.TaskID)
e.Mesos.ForceSuppressFramework()
e.Mesos.Call(msg)
return

if task.ExpectedState != "__KILL" {
task.TaskID = e.API.IncreaseTaskCount(task.TaskID)
task.State = ""
} else {
e.Mesos.ForceSuppressFramework()
e.Mesos.Call(msg)
}

case mesosproto.TaskState_TASK_RUNNING:
if !e.Mesos.IsSuppress {
logrus.WithField("func", "scheduler.HandleUpdate").Info("Task State: " + task.State + " " + task.TaskID + " (" + task.TaskName + ")")
Expand All @@ -92,8 +97,8 @@ func (e *Scheduler) HandleUpdate(event *mesosproto.Event) {
task.MesosAgent = e.Mesos.GetAgentInfo(update.Status.GetAgentId().GetValue())
task.NetworkInfo = e.Mesos.GetNetworkInfo(task.TaskID)
task.Agent = update.Status.GetAgentId().GetValue()

e.Mesos.SuppressFramework()
default:
logrus.WithField("func", "scheduler.HandleUpdate").Warn("Task State: " + task.State + " " + task.TaskID + " (" + task.TaskName + "). State not handled, no action has been taken")
}

// save the new state
Expand Down
3 changes: 3 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Config struct {
Plugins map[string]*plugin.Plugin
PluginsEnable bool
ThreadEnable bool
EnableGPUAllocation bool
HostConstraintsList []string
}

// UserCredentials - The Username and Password to authenticate against this framework
Expand Down Expand Up @@ -215,6 +217,7 @@ type Command struct {
Agent string
Labels []*mesosproto.Label
State string
ExpectedState string
StateTime time.Time
Instances int
LinuxInfo *mesosproto.LinuxInfo `protobuf:"bytes,11,opt,name=linux_info,json=linuxInfo" json:"linux_info,omitempty"`
Expand Down