Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running multiple instances #52

Open
hansbogert opened this issue May 4, 2015 · 12 comments
Open

Running multiple instances #52

hansbogert opened this issue May 4, 2015 · 12 comments

Comments

@hansbogert
Copy link
Contributor

One of the Mesos promises was being to be able to run multiple instances of 1 type of framework, like Hadoop in this case. The original Mesos paper showed 2 concurrent Hadoop instances.

I've tried to setup an experiment where I have 2 jobtrackers each with one and the same Mesos backend, though if 1 jobtracker's queue is full enough it takes all resources and keeps them indefinitely, starving the other jobtracker.

Correct me if I'm wrong, but shouldn't the aim of this hadoop-on-mesos backend be, to cater this scenario of multiple instances?

@DarinJ
Copy link
Contributor

DarinJ commented May 4, 2015

I've done this, it's best to setup roles (and ussuaully mapred.mesos.role.strict) if your going to do this.

@hansbogert
Copy link
Contributor Author

Thanks for the tip, however now that I looked in to the whole role functionality, but if I understand correctly, this would just mimic a static partitioning of the cluster.
The intrinsic problem I think is that mesos semantics are not honored, which state that a framework should accept resource offers for outstanding tasks, however in hadoop-on-mesos it accepts resource offers for an undetermined period as long as a jobtracker's queue is full.

@tarnfeld
Copy link
Member

tarnfeld commented May 5, 2015

Interesting thread. Roles is one way to go, but as you say @hansbogert it's basically static partitioning at the cluster level. We actually have the described issue not only across hadoop, but also across different frameworks. Hadoop-on-mesos currently is still a little too greedy for my liking, especially on shared clusters, and the simple conclusion we came to is to find incremental changes we can make that allows more frequent scheduling decisions. The more often resources are put back into the mesos pool, the more often mesos can make scheduling decisions, and the faster cluster fairness will balance out.

Doing this allows us to rely on the Mesos allocator to maintain fairness across different frameworks in the cluster. This is something i'm currently working on, and there's a few new features on the way you might be interested in;

Launching task trackers (mesos executors) for Map and Reduce slots separately

Combine this with the logic currently implemented that will recycle some resources when slots become idle, it means that when we see an imbalance of map/reduce slots on the cluster idle resources can be freed up. For example, if over time you end up with lots of task trackers each with a few map and a few reduce slots, even when the reducers all become idle the map slots will be used and the task tracker will remain alive. Simply splitting into multiple more specific executors (separating map and reduce) increases the chance Mesos has to make scheduling decisions.

I've got a branch of this implemented and should be opening a PR in the next day or two, once tested properly on our test clusters.

Job Pools

The FairScheduler (one of the built in schedulers for hadoop) has support for a concept called Job Pools that basically allows you to assign a job to a particular pool, and each pool can be configured with various SLA parameters (min/max slots, preemption, etc). Currently the Hadoop-on-mesos framework will work with pools, but if you want to restrict Hadoop to only using a subset of your Mesos resources things won't work as expected.

Once the above patch lands, i'll be working on this. I had a brief design discussion with @brndnmtthws a few weeks ago and I think I've got a fairly solid plan about how to go about implementing the idea.

Per-job Task Trackers (#31)

Something else we've considered is launching Task Tracker instances that only process tasks for a specific job. This also is another idea aimed at increasing churn of mesos tasks/executors to allow for more frequent scheduling decisions. There is a much larger overhead here as launching TTs isn't that fast, and if you run lots of small jobs this might be damaging to performance.

Other things

There's a bunch of other ideas we've had to force hadoop to give up resources more often to avoid hogging Mesos clusters;

  • Max number of tasks or jobs per task tracker before it gets decommissioned
  • Massively increase the parallelism in jobs, combined with the above could yield good results

@hansbogert
Copy link
Contributor Author

@tarnfeld thanks for this thorough elaboration. I do have some questions/remarks though:

Launching task trackers (mesos executors) for Map and Reduce slots separately
Although this would increase chances for Mesos to do its thing, still if a first Jobtracker, with full enough queue, begins, it will still hoard resources as chances are still pretty high that both mappers and reducers are busy in all but the larger clusters.

Per-job Task Trackers
Basically this is what Spark does with its Coarse-grained Mesos back-end. The problem I see in Spark's case is that application (somewhat comparable to a Hadoop job) duration can vary so much that you can only depend on statistical multiplexing and hope that small applications in size and duration get at least some resources before a big application comes along which can hoard resources again. It's an inherent problem of thinking in these coarse-grained work units.

Spark's default back-end for Mesos, the fine-grained scheduler, allows for Mesos to do resource scheduling continuously and works as expected, but as you also fear for Hadoop's case, it incurs a LOT of overhead (in my case for a specific application, I see 3 times less throughput compared to coarse-grained), but that's mainly because Spark's tasks are in the millisecond order where communication overhead for accepting Mesos offers has a significant portion of the total task runtime.
If we'd take a comparable approach to Spark's fine-grained mode in hadoop-on-mesos i.e. accepting offers for every task (a map, or reducer), ignoring overhead for now, would that be a viable solution? At least the churn would be high, allowing Mesos' scheduler to give every framework its fair share.

A bit more off-topic, I think these shortcomings we have, stem from Mesos' model, which is a bit naive to assume that frameworks have some notion of playing along nicely or use the perfect granularity. It seems it always boils down to:

  • Use coarse grained scheduling, so we have little overhead, but Mesos can't schedule resources fairly or make guarantees, due to Job/Application duration variance

or

  • Use fine-grained work-units, but the overhead for small tasks is significant.

Thinking about this and ignoring any further implications this would have for the Mesos model, we'd want to accept offers once and use them until Mesos indicates this is no longer our fair share. This would reduce overhead but would give room for other contending frameworks. Noted that this is still a very naive way from the perspective of Mesos.

@tarnfeld
Copy link
Member

tarnfeld commented May 6, 2015

it will still hoard resources as chances are still pretty high that both mappers and reducers are busy in all but the larger clusters.

This is indeed the case, and you can kind of solve this with job pools (by restricting types of jobs to a max amount of resources) but ultimately you'll never end up with something that works well unless it's possible to pre-empt tasks. Without this we're basically just solving the problem blinded, because there's two schedulers at play that know nothing about eachother.

Basically this is what Spark does with its Coarse-grained Mesos back-end.

Indeed, it's also the same model for YARN.

Thinking about this and ignoring any further implications this would have for the Mesos model, we'd want to accept offers once and use them until Mesos indicates this is no longer our fair share.

This approach is known as pre-emption in the mesos world, and is something being actively worked on. Essentially it would be a primitive that allowed the cluster to revoke resources away from you (as a framework) with some warning, allowing you to tear down etc. The specifics of how this system would work are still being discussed within the community but there's a couple of JIRA tickets laying around that might be worth a look.

If you have any ideas for how we can start solving some of these problems in the short term please do share them!

@hansbogert
Copy link
Contributor Author

One last thing, in my previous post:

If we'd take a comparable approach to Spark's fine-grained mode in hadoop-on-mesos i.e. accepting offers for every task (a map, or reducer), ignoring overhead for now, would that be a viable solution? At least the churn would be high, allowing Mesos' scheduler to give every framework its fair share.

Can you share any thoughts on that?

@tarnfeld
Copy link
Member

Launching a TaskTracker per task would cause things to slow to a snails pace, the turnaround time for an individual task (well, depends on your workload) is so low compared to the latency involved in spawning a tracker that jobs would take an age.

Spark actually keeps the executor process up and uses mesos tasks (in fine grained mode) to handle resource management, which doesn't result in any processes actually being spawned. This relies on the fact that the executor can scale the number of tasks it is allocated dynamically, which is not possible (or at least supported) with Hadoop.

I've just opened #57 which launches a mesos task/executor pair for Map/Reduce slots in isolation, which combined with #33 should help in some situations.

@hansbogert
Copy link
Contributor Author

What you described in your second paragraph is exactly what I meant and what the original Mesos paper describes in its "Hadoop Port" section:

We wrote a Hadoop scheduler that connects to Mesos, launches TaskTrackers as its executors, and maps each Hadoop task to a Mesos task.

Apparently the paper is a bit too 'rosy', though in there, the experiments include a Hadoop instance with big jobs without greedily taking all of the resources.

Is this project a result/evolution of what @benh originally used?

@tarnfeld
Copy link
Member

This project evolved from the original experiments I believe, and was taken out of the Mesos code base a couple of years ago. I'm not sure of the details of the paper on this subject, but there are possibly other ways they could have achieved that, I think it's the multipurpose-ness of what we're discussing that makes the current implementation more challenging.

@DarinJ
Copy link
Contributor

DarinJ commented Jun 5, 2015

I've been contemplating better scaleup/scaledown policies based on expected response times of tasks in the queue (involves keeping some stats as jobs start/finish how they come on the queue) and available resources on the cluster (Rest calls to the master or keeping stats on offers). If there's interest I'm willing to write something up and discuss. My use case means I'm going to start working on a prototype anyway.

@tarnfeld
Copy link
Member

tarnfeld commented Jun 5, 2015

@DarinJ I'd be very much interested in that. We've got loads of individual job benchmarking data that we could plug into a system like that!

@DarinJ
Copy link
Contributor

DarinJ commented Jun 14, 2015

@tarnfeld here's a short write up of my idea. Sorry for the delay got swamped by other things. Any comments/suggestions would be appreciated. The idea is to keep the framework from eating up the cluster when things are moving fast enough. I'm working through where I'll need to make changes in the code, hoping to spend a couple days late next week early week after on it.

Currently the Hadoop Mesos framework greedily accepts offers from mesos to fill map and reduce slots. To release offers the framework waits for a number (default 5) of idle time periods (default 5 sec). For a standard map reduce job each task should finish relatively quickly, however the number of tasks will often exceed the total resources of the cluster for a long time or perhaps forever. This is an issue as it is very likely that the hadoop framework will starve other frameworks of resources.

To address the issue above I propose to model the tasks entering the Hadoop Mesos framework as a variation of M/M/c queue. Here the time each task takes to run will be modeled as an Exponential Random Variable Ti with parameter mu, and the arrival of new tasks will be modeled as a Marked Poisson Process (Wi,Yi), where Wi is a Poisson Random Variable with parameter lambda and Yi is a RV TBD (Guessing, may use another Poisson here) with parameters TBD. We can use an initial guess for the parameters and then update the via MAP or sliding windows.

This allows us to calculate the expected time for a task in the queue to be completed (response time). TODO: Rederive formula for marked Poisson process, shouldn’t be bad (especially if assuming Yi poisson), list equation. If the response to is low, there is no need to create new map/reduce slots and maybe we can some free some. If the response time is high we should spin up new tasks. We can further influence our decision based off available resources (REST Calls? Stats on offers?).

Caveats:

  • If you're cluster is extremely over subscribed with map/reduce users this isn't going to help much.
  • It assumes mostly homogeneous jobs. Giraph jobs and the like need a separate job tracker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants