diff --git a/README.md b/README.md index a9a4390..cd5ab0f 100644 --- a/README.md +++ b/README.md @@ -135,7 +135,7 @@ operation](http://hadoop.apache.org/docs/stable/single_node_setup.html#PseudoDis ``` -[More details on configuration propertios can be found here.](configuration.md) +[More details on configuration properties can be found here.](configuration.md) #### Start #### @@ -145,13 +145,13 @@ to the Mesos native library. On Linux: ``` -$ MESOS_NATIVE_LIBRARY=/path/to/libmesos.so hadoop jobtracker +$ MESOS_NATIVE_JAVA_LIBRARY=/path/to/libmesos.so hadoop jobtracker ``` And on OS X: ``` -$ MESOS_NATIVE_LIBRARY=/path/to/libmesos.dylib hadoop jobtracker +$ MESOS_NATIVE_JAVA_LIBRARY=/path/to/libmesos.dylib hadoop jobtracker ``` > **NOTE: You do not need to worry about distributing your Hadoop diff --git a/configuration.md b/configuration.md index 8990d58..ba6ae37 100644 --- a/configuration.md +++ b/configuration.md @@ -141,6 +141,36 @@ default values. role configured in "mapred.mesos.role". + + mapred.mesos.framework.name + hadoop + + This is the Mesos framework name. Defaults to Hadoop plus port information. + + + + mapred.mesos.framework.principal + hadoop + + This is the Mesos framework principal. It is used for framework authentication. + Consult the Mesos documentation for details. + + + + mapred.mesos.framework.secretfile + /location/secretfile + + Location of the file holding the Mesos framework secret. It is used for framework authentication. + Consult the Mesos documentation for details. Caution: avoid newline characters, some editor place these before end of file. + + + + mapred.mesos.framework.user + hadoop + + This is the user the Mesos framework runs as. If left unset, it defaults to the user running the scheduler. + + diff --git a/pom.xml b/pom.xml index f9b0832..5f11e32 100644 --- a/pom.xml +++ b/pom.xml @@ -1,10 +1,18 @@ - + + + oss-parent + org.sonatype.oss + 9 + + 4.0.0 - org.apache.mesos - hadoop-mesos - 0.1.0 + com.github.mesos + mesos-hadoop-mr1 + 0.1.1-SNAPSHOT + + https://github.com/mesos/hadoop UTF-8 @@ -15,8 +23,8 @@ 1.1.3 3.1 - 2.5.0-mr1-cdh5.2.0 - 0.21.0 + 2.6.0-mr1-cdh5.4.0 + 0.21.1 2.5.0 3.1.0 1.0.5 @@ -26,18 +34,6 @@ 1.9.5 - - - - cloudera - https://repository.cloudera.com/artifactory/cloudera-repos/ - - - clojars.org - http://clojars.org/repo - - - commons-logging @@ -108,23 +104,23 @@ - org.xerial.snappy - snappy-java - ${snappy-java.version} + org.xerial.snappy + snappy-java + ${snappy-java.version} - junit + junit junit ${junit.version} test + - org.apache.maven.plugins maven-compiler-plugin 3.1 @@ -134,9 +130,7 @@ true - - org.apache.maven.plugins maven-shade-plugin 2.2 @@ -160,7 +154,30 @@ + + org.apache.maven.plugins + maven-release-plugin + 2.5.1 + + + + + + cloudera + https://repository.cloudera.com/artifactory/cloudera-repos/ + + + clojars.org + http://clojars.org/repo + + + + + scm:git:git@github.com:mesos/hadoop.git + scm:git@github.com:mesos/hadoop.git + scm:git:git@github.com:mesos/hadoop.git + diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index 8ab2a4f..bd9b95b 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -2,6 +2,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.mesos.Executor; import org.apache.mesos.ExecutorDriver; import org.apache.mesos.MesosExecutorDriver; @@ -11,8 +12,13 @@ import java.io.*; +import java.util.Map; + import java.lang.reflect.Field; -import java.lang.ReflectiveOperationException; +import java.lang.reflect.Method; + +import java.lang.IllegalAccessException; +import java.lang.NoSuchFieldException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; @@ -21,49 +27,22 @@ public class MesosExecutor implements Executor { public static final Log LOG = LogFactory.getLog(MesosExecutor.class); - private SlaveInfo slaveInfo; - private TaskTracker taskTracker; protected final ScheduledExecutorService timerScheduler = - Executors.newScheduledThreadPool(1); + Executors.newScheduledThreadPool(1); + + private boolean suicideTimerScheduled = false; + private SlaveInfo slaveInfo; + private TaskTracker taskTracker; + private TaskID mapTaskId; + private TaskID reduceTaskId; public static void main(String[] args) { MesosExecutorDriver driver = new MesosExecutorDriver(new MesosExecutor()); System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1); } - private JobConf configure(final TaskInfo task) { - JobConf conf = new JobConf(false); - try { - byte[] bytes = task.getData().toByteArray(); - conf.readFields(new DataInputStream(new ByteArrayInputStream(bytes))); - } catch (IOException e) { - LOG.warn("Failed to deserialize configuration.", e); - System.exit(1); - } - - // Output the configuration as XML for easy debugging. - try { - StringWriter writer = new StringWriter(); - conf.writeXml(writer); - writer.flush(); - String xml = writer.getBuffer().toString(); - LOG.info("XML Configuration received:\n" + - org.apache.mesos.hadoop.Utils.formatXml(xml)); - } catch (Exception e) { - LOG.warn("Failed to output configuration as XML.", e); - } - - // Get hostname from Mesos to make sure we match what it reports - // to the JobTracker. - conf.set("slave.host.name", slaveInfo.getHostname()); - - // Set the mapred.local directory inside the executor sandbox, so that - // different TaskTrackers on the same host do not step on each other. - conf.set("mapred.local.dir", System.getenv("MESOS_DIRECTORY") + "/mapred"); - - return conf; - } +// --------------------- Interface Executor --------------------- @Override public void registered(ExecutorDriver driver, ExecutorInfo executorInfo, @@ -72,59 +51,93 @@ public void registered(ExecutorDriver driver, ExecutorInfo executorInfo, this.slaveInfo = slaveInfo; } + @Override + public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) { + LOG.info("Executor reregistered with the slave"); + } + + @Override + public void disconnected(ExecutorDriver driver) { + LOG.info("Executor disconnected from the slave"); + } + @Override public void launchTask(final ExecutorDriver driver, final TaskInfo task) { LOG.info("Launching task : " + task.getTaskId().getValue()); - // Get configuration from task data (prepared by the JobTracker). - JobConf conf = configure(task); + synchronized(this) { - // NOTE: We need to manually set the context class loader here because, - // the TaskTracker is unable to find LoginModule class otherwise. - Thread.currentThread().setContextClassLoader( - TaskTracker.class.getClassLoader()); - - try { - taskTracker = new TaskTracker(conf); - } catch (IOException e) { - LOG.fatal("Failed to start TaskTracker", e); - System.exit(1); - } catch (InterruptedException e) { - LOG.fatal("Failed to start TaskTracker", e); - System.exit(1); - } + // Keep track of all the IDs we've been asked to monitor + if (task.getTaskId().getValue().endsWith("_Map")) { + mapTaskId = task.getTaskId(); + } else if (task.getTaskId().getValue().endsWith("_Reduce")) { + reduceTaskId = task.getTaskId(); + } else { + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(task.getTaskId()) + .setState(TaskState.TASK_LOST).build()); - // Spin up a TaskTracker in a new thread. - new Thread("TaskTracker Run Thread") { - @Override - public void run() { - try { - taskTracker.run(); + return; + } - // Send a TASK_FINISHED status update. - // We do this here because we want to send it in a separate thread - // than was used to call killTask(). - driver.sendStatusUpdate(TaskStatus.newBuilder() - .setTaskId(task.getTaskId()) - .setState(TaskState.TASK_FINISHED) - .build()); + if (taskTracker == null) { + // Get configuration from task data (prepared by the JobTracker). + JobConf conf = configure(task); - // Give some time for the update to reach the slave. - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - LOG.error("Failed to sleep TaskTracker thread", e); - } + // NOTE: We need to manually set the context class loader here because, + // the TaskTracker is unable to find LoginModule class otherwise. + Thread.currentThread().setContextClassLoader( + TaskTracker.class.getClassLoader()); - // Stop the executor. - driver.stop(); - } catch (Throwable t) { - LOG.error("Caught exception, committing suicide.", t); - driver.stop(); + try { + taskTracker = new TaskTracker(conf); + } catch (IOException | InterruptedException e) { + LOG.fatal("Failed to start TaskTracker", e); System.exit(1); } + + // Spin up a TaskTracker in a new thread. + new Thread("TaskTracker Run Thread") { + @Override + public void run() { + try { + taskTracker.run(); + + // Send a TASK_FINISHED status update. + // We do this here because we want to send it in a separate thread + // than was used to call killTask(). + if (mapTaskId != null) { + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(mapTaskId) + .setState(TaskState.TASK_FINISHED) + .build()); + } + + if (reduceTaskId != null) { + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(reduceTaskId) + .setState(TaskState.TASK_FINISHED) + .build()); + } + + // Give some time for the update to reach the slave. + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + LOG.error("Failed to sleep TaskTracker thread", e); + } + + // Stop the executor. + driver.stop(); + } catch (Throwable t) { + LOG.fatal("Caught exception, committing suicide.", t); + driver.stop(); + System.exit(1); + } + } + }.start(); } - }.start(); + } driver.sendStatusUpdate(TaskStatus.newBuilder() .setTaskId(task.getTaskId()) @@ -134,31 +147,53 @@ public void run() { @Override public void killTask(final ExecutorDriver driver, final TaskID taskId) { LOG.info("Killing task : " + taskId.getValue()); - if (taskTracker != null) { - LOG.info("Revoking task tracker map/reduce slots"); - revokeSlots(); - - // Send the TASK_FINISHED status - new Thread("TaskFinishedUpdate") { - @Override - public void run() { - driver.sendStatusUpdate(TaskStatus.newBuilder() - .setTaskId(taskId) - .setState(TaskState.TASK_FINISHED) - .build()); + + new Thread("TaskTrackerKillThread") { + @Override + public void run() { + + // Commit suicide when no jobs are running + if (!suicideTimerScheduled) { + scheduleSuicideTimer(); + suicideTimerScheduled = true; } - }.start(); - } - } - @Override - public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) { - LOG.info("Executor reregistered with the slave"); - } + if (mapTaskId != null && taskId.equals(mapTaskId)) { + LOG.info("Revoking task tracker MAP slots"); - @Override - public void disconnected(ExecutorDriver driver) { - LOG.info("Executor disconnected from the slave"); + // Revoke the slots from the task tracker + try { + revokeSlots(taskTracker, TaskType.MAP); + } catch (NoSuchFieldException | IllegalAccessException e) { + LOG.error("Caught exception revoking MAP slots: ", e); + } + + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(taskId) + .setState(TaskState.TASK_FINISHED) + .build()); + + mapTaskId = null; + + } else if (reduceTaskId != null && taskId.equals(reduceTaskId)) { + LOG.info("Revoking task tracker REDUCE slots"); + + // Revoke the slots from the task tracker + try { + revokeSlots(taskTracker, TaskType.REDUCE); + } catch (NoSuchFieldException | IllegalAccessException e) { + LOG.error("Caught exception revoking REDUCE slots: ", e); + } + + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(taskId) + .setState(TaskState.TASK_FINISHED) + .build()); + + reduceTaskId = null; + } + } + }.start(); } @Override @@ -168,63 +203,83 @@ public void frameworkMessage(ExecutorDriver d, byte[] msg) { } @Override - public void error(ExecutorDriver d, String message) { - LOG.error("MesosExecutor.error: " + message); + public void shutdown(ExecutorDriver d) { + LOG.info("Executor asked to shutdown"); } @Override - public void shutdown(ExecutorDriver d) { - LOG.info("Executor asked to shutdown"); + public void error(ExecutorDriver d, String message) { + LOG.error("MesosExecutor.error: " + message); } - public void revokeSlots() { - if (taskTracker == null) { - LOG.error("Task tracker is not initialized"); - return; + private JobConf configure(final TaskInfo task) { + JobConf conf = new JobConf(false); + try { + byte[] bytes = task.getData().toByteArray(); + conf.readFields(new DataInputStream(new ByteArrayInputStream(bytes))); + } catch (IOException e) { + LOG.warn("Failed to deserialize configuration.", e); + System.exit(1); } - int maxMapSlots = 0; - int maxReduceSlots = 0; + // Output the configuration as XML for easy debugging. + try { + StringWriter writer = new StringWriter(); + conf.writeXml(writer); + writer.flush(); + String xml = writer.getBuffer().toString(); + LOG.info("XML Configuration received:\n" + + org.apache.mesos.hadoop.Utils.formatXml(xml)); + } catch (Exception e) { + LOG.warn("Failed to output configuration as XML.", e); + } - // TODO(tarnfeld): Sanity check that it's safe for us to change the slots. - // Be sure there's nothing running and nothing in the launcher queue. + // Get hostname from Mesos to make sure we match what it reports + // to the JobTracker. + conf.set("slave.host.name", slaveInfo.getHostname()); - // If we expect to have no slots, let's go ahead and terminate the task launchers - if (maxMapSlots == 0) { - try { - Field launcherField = taskTracker.getClass().getDeclaredField("mapLauncher"); - launcherField.setAccessible(true); + // Set the mapred.local directory inside the executor sandbox, so that + // different TaskTrackers on the same host do not step on each other. + conf.set("mapred.local.dir", System.getenv("MESOS_DIRECTORY") + "/mapred"); - // Kill the current map task launcher - TaskTracker.TaskLauncher launcher = ((TaskTracker.TaskLauncher) launcherField.get(taskTracker)); - launcher.notifySlots(); - launcher.interrupt(); - } catch (ReflectiveOperationException e) { - LOG.fatal("Failed updating map slots due to error with reflection", e); - } - } + return conf; + } - if (maxReduceSlots == 0) { - try { - Field launcherField = taskTracker.getClass().getDeclaredField("reduceLauncher"); - launcherField.setAccessible(true); - - // Kill the current reduce task launcher - TaskTracker.TaskLauncher launcher = ((TaskTracker.TaskLauncher) launcherField.get(taskTracker)); - launcher.notifySlots(); - launcher.interrupt(); - } catch (ReflectiveOperationException e) { - LOG.fatal("Failed updating reduce slots due to error with reflection", e); + /** + * revokeSlots will take away all slots of the given task type from + * the running task tracker and as a precaution, fail any tasks that are + * running in those slots. + */ + private void revokeSlots(TaskTracker tracker, TaskType type) throws NoSuchFieldException, IllegalAccessException { + synchronized(tracker) { + + String launcherProp = ""; + int maxSlots = 0; + + if (type == TaskType.MAP) { + taskTracker.setMaxMapSlots(0); + launcherProp = "mapLauncher"; + maxSlots = tracker.getMaxCurrentMapTasks(); + } else if (type == TaskType.REDUCE) { + taskTracker.setMaxReduceSlots(0); + launcherProp = "reduceLauncher"; + maxSlots = tracker.getMaxCurrentReduceTasks(); } - } - // Configure the new slot counts on the task tracker - taskTracker.setMaxMapSlots(maxMapSlots); - taskTracker.setMaxReduceSlots(maxReduceSlots); + // Nasty horrible hacks to get inside the task tracker and take over some + // of the state handling. Even if we were to subclass the task tracker + // these methods are all private so we wouldn't be able to use them. + Field f = tracker.getClass().getDeclaredField(launcherProp); + f.setAccessible(true); + + TaskTracker.TaskLauncher launcher = + (TaskTracker.TaskLauncher) f.get(tracker); - // If we have zero slots left, commit suicide when no jobs are running - if ((maxMapSlots + maxReduceSlots) == 0) { - scheduleSuicideTimer(); + // Here we add a negative amount of slots (bringing the launcher to zero) + // which causes the launcher to clean up any tasks in the launch queue + // and then we kill the thread to stop it doing anything else. + launcher.addFreeSlots(-maxSlots); + launcher.interrupt(); } } @@ -233,6 +288,7 @@ protected void scheduleSuicideTimer() { @Override public void run() { if (taskTracker == null) { + suicideTimerScheduled = false; return; } @@ -245,20 +301,17 @@ public void run() { try { taskTracker.shutdown(); - } catch (IOException e) { - LOG.error("Failed to shutdown TaskTracker", e); - } catch (InterruptedException e) { + } catch (IOException | InterruptedException e) { LOG.error("Failed to shutdown TaskTracker", e); } - } - else { + } else { try { Field field = taskTracker.getClass().getDeclaredField("tasksToCleanup"); field.setAccessible(true); BlockingQueue tasksToCleanup = ((BlockingQueue) field.get(taskTracker)); LOG.info("TaskTracker has " + taskTracker.tasks.size() + - " running tasks and " + tasksToCleanup + - " tasks to clean up."); + " running tasks and " + tasksToCleanup + + " tasks to clean up."); } catch (ReflectiveOperationException e) { LOG.fatal("Failed to get task counts from TaskTracker", e); } diff --git a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java index 3f1e63f..2cf9b6a 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java @@ -1,19 +1,23 @@ package org.apache.hadoop.mapred; import com.codahale.metrics.Meter; +import com.google.protobuf.ByteString; import org.apache.commons.httpclient.HttpHost; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.mesos.MesosSchedulerDriver; import org.apache.mesos.Protos; import org.apache.mesos.Protos.*; +import org.apache.mesos.Protos.TaskID; import org.apache.mesos.Scheduler; import org.apache.mesos.SchedulerDriver; import org.apache.mesos.hadoop.Metrics; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -36,42 +40,44 @@ public class MesosScheduler extends TaskScheduler implements Scheduler { public static final double SLOT_CPUS_DEFAULT = 1; // 1 cores. public static final int SLOT_DISK_DEFAULT = 1024; // 1 GB. public static final int SLOT_JVM_HEAP_DEFAULT = 1024; // 1024MB. + public static final double TASKTRACKER_CPUS_DEFAULT = 1.0; // 1 core. public static final int TASKTRACKER_MEM_DEFAULT = 1024; // 1 GB. public static final int TASKTRACKER_DISK_DEFAULT = 1024; // 1 GB. + // The default behavior in Hadoop is to use 4 slots per TaskTracker: public static final int MAP_SLOTS_DEFAULT = 2; public static final int REDUCE_SLOTS_DEFAULT = 2; - // The amount of time to wait for task trackers to launch before - // giving up. + + // The amount of time to wait for task trackers to launch before giving up. public static final long LAUNCH_TIMEOUT_MS = 300000; // 5 minutes public static final long PERIODIC_MS = 300000; // 5 minutes public static final long DEFAULT_IDLE_CHECK_INTERVAL = 5; // 5 seconds + // Destroy task trackers after being idle for N idle checks public static final long DEFAULT_IDLE_REVOCATION_CHECKS = 5; - private SchedulerDriver driver; + public Metrics metrics; protected TaskScheduler taskScheduler; protected JobTracker jobTracker; protected Configuration conf; protected File stateFile; + // Count of the launched trackers for TaskID generation. protected long launchedTrackers = 0; + // Use a fixed slot allocation policy? protected boolean policyIsFixed = false; protected ResourcePolicy policy; protected boolean enableMetrics = false; - public Metrics metrics; // Maintains a mapping from {tracker host:port -> MesosTracker}. // Used for tracking the slots of each TaskTracker and the corresponding // Mesos TaskID. - protected Map mesosTrackers = - new ConcurrentHashMap(); + protected Map mesosTrackers = new ConcurrentHashMap<>(); - protected final ScheduledExecutorService timerScheduler = - Executors.newScheduledThreadPool(1); + protected final ScheduledExecutorService timerScheduler = Executors.newScheduledThreadPool(1); protected JobInProgressListener jobListener = new JobInProgressListener() { @Override @@ -107,8 +113,7 @@ public void jobUpdated(JobChangeEvent event) { for (String hostname : flakyTrackers) { for (MesosTracker mesosTracker : mesosTrackers.values()) { if (mesosTracker.host.getHostName().startsWith(hostname)) { - LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host " - + mesosTracker.host + " because it has been marked as flaky"); + LOG.info("Killing tracker on host " + mesosTracker.host + " because it has been marked as flaky"); if (metrics != null) { metrics.flakyTrackerKilledMeter.mark(); } @@ -131,16 +136,14 @@ public void jobUpdated(JobChangeEvent event) { LOG.info("Completed job : " + job.getJobID()); // Remove the task from the map. - final Set trackers = new HashSet(mesosTrackers.keySet()); + final Set trackers = new HashSet<>(mesosTrackers.keySet()); for (HttpHost tracker : trackers) { MesosTracker mesosTracker = mesosTrackers.get(tracker); mesosTracker.jobs.remove(job.getJobID()); - // If the TaskTracker doesn't have any running job tasks assigned, - // kill it. + // If the TaskTracker doesn't have any running job tasks assigned, kill it. if (mesosTracker.jobs.isEmpty() && mesosTracker.active) { - LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host " - + mesosTracker.host + " because it is no longer needed"); + LOG.info("Killing tracker on host " + mesosTracker.host + " because it is no longer needed"); killTracker(mesosTracker); } @@ -148,202 +151,162 @@ public void jobUpdated(JobChangeEvent event) { } } }; + private SchedulerDriver driver; - // TaskScheduler methods. - @Override - public synchronized void start() throws IOException { - conf = getConf(); - String taskTrackerClass = conf.get("mapred.mesos.taskScheduler", - "org.apache.hadoop.mapred.JobQueueTaskScheduler"); + // --------------------- Interface Scheduler --------------------- - try { - taskScheduler = - (TaskScheduler) Class.forName(taskTrackerClass).newInstance(); - taskScheduler.setConf(conf); - taskScheduler.setTaskTrackerManager(taskTrackerManager); - } catch (ClassNotFoundException e) { - LOG.fatal("Failed to initialize the TaskScheduler", e); - System.exit(1); - } catch (InstantiationException e) { - LOG.fatal("Failed to initialize the TaskScheduler", e); - System.exit(1); - } catch (IllegalAccessException e) { - LOG.fatal("Failed to initialize the TaskScheduler", e); - System.exit(1); - } + // These are synchronized, where possible. Some of these methods need to access the + // JobTracker, which can lead to a deadlock: + // See: https://issues.apache.org/jira/browse/MESOS-429 + // The workaround employed here is to unsynchronize those methods needing access to + // the JobTracker state and use explicit synchronization instead as appropriate. + // TODO(bmahler): Provide a cleaner solution to this issue. One solution is to + // split up the Scheduler and TaskScheduler implementations in order to break the + // locking cycle. This would require a synchronized class to store the shared + // state across our Scheduler and TaskScheduler implementations, and provide + // atomic operations as needed. + @Override + public synchronized void registered(SchedulerDriver schedulerDriver, FrameworkID frameworkID, MasterInfo masterInfo) { + LOG.info("Registered as " + frameworkID.getValue() + " with master " + masterInfo); + } - // Add the job listener to get job related updates. - taskTrackerManager.addJobInProgressListener(jobListener); + @Override + public synchronized void reregistered(SchedulerDriver schedulerDriver, MasterInfo masterInfo) { + LOG.info("Re-registered with master " + masterInfo); + } - LOG.info("Starting MesosScheduler"); - jobTracker = (JobTracker) super.taskTrackerManager; + // This method uses explicit synchronization in order to avoid deadlocks when + // accessing the JobTracker. + @Override + public void resourceOffers(SchedulerDriver schedulerDriver, List offers) { + policy.resourceOffers(schedulerDriver, offers); + } - String master = conf.get("mapred.mesos.master", "local"); + @Override + public synchronized void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerID) { + LOG.warn("Rescinded offer: " + offerID.getValue()); + } - try { - FrameworkInfo frameworkInfo = FrameworkInfo - .newBuilder() - .setUser("") // Let Mesos fill in the user. - .setCheckpoint(conf.getBoolean("mapred.mesos.checkpoint", false)) - .setRole(conf.get("mapred.mesos.role", "*")) - .setName("Hadoop: (RPC port: " + jobTracker.port + "," - + " WebUI port: " + jobTracker.infoPort + ")").build(); - - driver = new MesosSchedulerDriver(this, frameworkInfo, master); - driver.start(); - } catch (Exception e) { - // If the MesosScheduler can't be loaded, the JobTracker won't be useful - // at all, so crash it now so that the user notices. - LOG.fatal("Failed to start MesosScheduler", e); - System.exit(1); - } + @Override + public synchronized void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) { + LOG.info("Status update of " + taskStatus.getTaskId().getValue() + " to " + taskStatus.getState().name() + " with message " + taskStatus.getMessage()); - String file = conf.get("mapred.mesos.state.file", ""); - if (!file.equals("")) { - this.stateFile = new File(file); + // Remove the TaskTracker if the corresponding Mesos task has reached a + // terminal state. + switch (taskStatus.getState()) { + case TASK_FINISHED: + case TASK_FAILED: + case TASK_KILLED: + case TASK_LOST: + case TASK_STAGING: + case TASK_STARTING: + case TASK_RUNNING: + break; + default: + LOG.error("Unexpected TaskStatus: " + taskStatus.getState().name()); + break; } - policyIsFixed = conf.getBoolean("mapred.mesos.scheduler.policy.fixed", - policyIsFixed); - - if (policyIsFixed) { - policy = new ResourcePolicyFixed(this); - } else { - policy = new ResourcePolicyVariable(this); + if (metrics != null) { + Meter meter = metrics.taskStateMeter.get(taskStatus.getState()); + if (meter != null) { + meter.mark(); + } } + } - enableMetrics = conf.getBoolean("mapred.mesos.metrics.enabled", - enableMetrics); + @Override + public synchronized void frameworkMessage(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID, byte[] bytes) { + LOG.info("Framework Message of " + bytes.length + " bytes" + " from executor " + executorID.getValue() + " on slave " + slaveID.getValue()); + } - if (enableMetrics) { - metrics = new Metrics(conf); - } + @Override + public synchronized void disconnected(SchedulerDriver schedulerDriver) { + LOG.warn("Disconnected from Mesos master."); + } - taskScheduler.start(); + @Override + public synchronized void slaveLost(SchedulerDriver schedulerDriver, SlaveID slaveID) { + LOG.warn("Slave lost: " + slaveID.getValue()); } @Override - public synchronized void terminate() throws IOException { - try { - LOG.info("Stopping MesosScheduler"); - driver.stop(); - } catch (Exception e) { - LOG.error("Failed to stop Mesos scheduler", e); - } + public synchronized void executorLost(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID, int status) { + LOG.warn("Executor " + executorID.getValue() + " lost with status " + status + " on slave " + slaveID); - taskScheduler.terminate(); + // TODO(tarnfeld): If the executor is lost what do we do? } @Override - public void checkJobSubmission(JobInProgress job) throws IOException { - taskScheduler.checkJobSubmission(job); + public synchronized void error(SchedulerDriver schedulerDriver, String s) { + LOG.error("Error from scheduler driver: " + s); } @Override - public List assignTasks(TaskTracker taskTracker) - throws IOException { - HttpHost tracker = new HttpHost(taskTracker.getStatus().getHost(), - taskTracker.getStatus().getHttpPort()); - - if (!mesosTrackers.containsKey(tracker)) { - LOG.info("Unknown/exited TaskTracker: " + tracker + ". "); - return null; - } - - MesosTracker mesosTracker = mesosTrackers.get(tracker); - - // Make sure we're not asked to assign tasks to any task trackers that have - // been stopped. This could happen while the task tracker has not been - // removed from the cluster e.g still in the heartbeat timeout period. - synchronized (this) { - if (mesosTracker.stopped) { - LOG.info("Asked to assign tasks to stopped tracker " + tracker + "."); - return null; - } - } + public List assignTasks(TaskTracker taskTracker) throws IOException { // Let the underlying task scheduler do the actual task scheduling. List tasks = taskScheduler.assignTasks(taskTracker); // The Hadoop Fair Scheduler is known to return null. - if (tasks == null) { - return null; - } + if (tasks != null) { + HttpHost tracker = new HttpHost(taskTracker.getStatus().getHost(), taskTracker.getStatus().getHttpPort()); + MesosTracker mesosTracker = mesosTrackers.get(tracker); + if (mesosTracker != null) { + synchronized (this) { + for (Iterator iterator = tasks.iterator(); iterator.hasNext();) { + + // Throw away any task types that don't match up with the current known + // slot allocation of the tracker. We do this here because when we change + // the slot allocation of a running Task Tracker it can take time for + // this information to propagate around the system and we can preemptively + // avoid scheduling tasks to task trackers we know not to have capacity. + Task task = iterator.next(); + if (task instanceof MapTask && mesosTracker.mapSlots == 0) { + LOG.debug("Removed map task from TT assignment due to mismatching slots"); + iterator.remove(); + continue; + } else if (task instanceof ReduceTask && mesosTracker.reduceSlots == 0) { + LOG.debug("Removed reduce task from TT assignment due to mismatching slots"); + iterator.remove(); + continue; + } - // Keep track of which TaskTracker contains which tasks. - for (Task task : tasks) { - mesosTracker.jobs.add(task.getJobID()); + // Keep track of which TaskTracker contains which tasks. + mesosTracker.jobs.add(task.getJobID()); + } + } + } } return tasks; } @Override - public synchronized Collection getJobs(String queueName) { - return taskScheduler.getJobs(queueName); - } - - @Override - public synchronized void refresh() throws IOException { - taskScheduler.refresh(); - } - - // Mesos Scheduler methods. - // These are synchronized, where possible. Some of these methods need to access the - // JobTracker, which can lead to a deadlock: - // See: https://issues.apache.org/jira/browse/MESOS-429 - // The workaround employed here is to unsynchronize those methods needing access to - // the JobTracker state and use explicit synchronization instead as appropriate. - // TODO(bmahler): Provide a cleaner solution to this issue. One solution is to - // split up the Scheduler and TaskScheduler implementations in order to break the - // locking cycle. This would require a synchronized class to store the shared - // state across our Scheduler and TaskScheduler implementations, and provide - // atomic operations as needed. - @Override - public synchronized void registered(SchedulerDriver schedulerDriver, - FrameworkID frameworkID, MasterInfo masterInfo) { - LOG.info("Registered as " + frameworkID.getValue() - + " with master " + masterInfo); + public void checkJobSubmission(JobInProgress job) throws IOException { + taskScheduler.checkJobSubmission(job); } @Override - public synchronized void reregistered(SchedulerDriver schedulerDriver, - MasterInfo masterInfo) { - LOG.info("Re-registered with master " + masterInfo); - } - - public void killTracker(MesosTracker tracker) { - if (metrics != null) { - metrics.killMeter.mark(); - } - synchronized (this) { - driver.killTask(tracker.taskId); - } - tracker.stop(); - if (mesosTrackers.get(tracker.host) == tracker) { - mesosTrackers.remove(tracker.host); - } - } - - public synchronized void scheduleTimer(Runnable command, - long delay, - TimeUnit unit) { - timerScheduler.schedule(command, delay, unit); + public synchronized Collection getJobs(String queueName) { + return taskScheduler.getJobs(queueName); } - // For some reason, pendingMaps() and pendingReduces() doesn't return the - // values we expect. We observed negative values, which may be related to - // https://issues.apache.org/jira/browse/MAPREDUCE-1238. Below is the - // algorithm that is used to calculate the pending tasks within the Hadoop - // JobTracker sources (see 'printTaskSummary' in - // src/org/apache/hadoop/mapred/jobdetails_jsp.java). + /** + * For some reason, pendingMaps() and pendingReduces() doesn't return the values we expect. We observed negative + * values, which may be related to https://issues.apache.org/jira/browse/MAPREDUCE-1238. Below is the algorithm + * that is used to calculate the pending tasks within the Hadoop JobTracker sources (see 'printTaskSummary' in + * src/org/apache/hadoop/mapred/jobdetails_jsp.java). + * + * @param tasks + * @return + */ public int getPendingTasks(TaskInProgress[] tasks) { int totalTasks = tasks.length; int runningTasks = 0; int finishedTasks = 0; int killedTasks = 0; - for (int i = 0; i < totalTasks; ++i) { - TaskInProgress task = tasks[i]; + for (TaskInProgress task : tasks) { if (task == null) { continue; } @@ -355,95 +318,134 @@ public int getPendingTasks(TaskInProgress[] tasks) { killedTasks += 1; } } - int pendingTasks = totalTasks - runningTasks - killedTasks - finishedTasks; - return pendingTasks; + return totalTasks - runningTasks - killedTasks - finishedTasks; } - // This method uses explicit synchronization in order to avoid deadlocks when - // accessing the JobTracker. - @Override - public void resourceOffers(SchedulerDriver schedulerDriver, - List offers) { - policy.resourceOffers(schedulerDriver, offers); + public void killTracker(MesosTracker tracker) { + killTrackerSlots(tracker, TaskType.MAP); + killTrackerSlots(tracker, TaskType.REDUCE); + } + + // killTrackerSlots will ask the given MesosTraker to revoke + // the allocated task slots, for the given type of slot (MAP/REDUCE). + public void killTrackerSlots(MesosTracker tracker, TaskType type) { + if (metrics != null) { + metrics.killMeter.mark(); + } + + TaskID taskId = tracker.getTaskId(type); + if (taskId != null) { + driver.killTask(taskId); + + if (type == TaskType.MAP) { + tracker.mapSlots = 0; + } else if (type == TaskType.REDUCE) { + tracker.reduceSlots = 0; + } + } } @Override - public synchronized void offerRescinded(SchedulerDriver schedulerDriver, - OfferID offerID) { - LOG.warn("Rescinded offer: " + offerID.getValue()); + public synchronized void refresh() throws IOException { + taskScheduler.refresh(); + } + + public synchronized void scheduleTimer(Runnable command, long delay, TimeUnit unit) { + timerScheduler.schedule(command, delay, unit); } + // TaskScheduler methods. @Override - public synchronized void statusUpdate(SchedulerDriver schedulerDriver, - Protos.TaskStatus taskStatus) { - LOG.info("Status update of " + taskStatus.getTaskId().getValue() - + " to " + taskStatus.getState().name() - + " with message " + taskStatus.getMessage()); + public synchronized void start() throws IOException { + conf = getConf(); + String taskTrackerClass = conf.get("mapred.mesos.taskScheduler", "org.apache.hadoop.mapred.JobQueueTaskScheduler"); - // Remove the TaskTracker if the corresponding Mesos task has reached a - // terminal state. - switch (taskStatus.getState()) { - case TASK_FINISHED: - case TASK_FAILED: - case TASK_KILLED: - case TASK_LOST: - // Make a copy to iterate over keys and delete values. - Set trackers = new HashSet(mesosTrackers.keySet()); + try { + taskScheduler = (TaskScheduler) Class.forName(taskTrackerClass).newInstance(); + taskScheduler.setConf(conf); + taskScheduler.setTaskTrackerManager(taskTrackerManager); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + LOG.fatal("Failed to initialize the TaskScheduler", e); + System.exit(1); + } - // Remove the task from the map. - for (HttpHost tracker : trackers) { - if (mesosTrackers.get(tracker).taskId.equals(taskStatus.getTaskId())) { - LOG.info("Removing terminated TaskTracker: " + tracker); - mesosTrackers.get(tracker).stop(); - mesosTrackers.remove(tracker); - } + // Add the job listener to get job related updates. + taskTrackerManager.addJobInProgressListener(jobListener); + + LOG.info("Starting MesosScheduler"); + jobTracker = (JobTracker) super.taskTrackerManager; + + String master = conf.get("mapred.mesos.master", "local"); + + try { + FrameworkInfo frameworkInfo; + FrameworkInfo.Builder frameworkInfoBuilder = FrameworkInfo.newBuilder() + .setUser(conf.get("mapred.mesos.framework.user", "")) // Let Mesos fill in the user. + .setCheckpoint(conf.getBoolean("mapred.mesos.checkpoint", false)) + .setRole(conf.get("mapred.mesos.role", "*")) + .setName(conf.get("mapred.mesos.framework.name", "Hadoop: (RPC port: " + jobTracker.port + "," + + " WebUI port: " + jobTracker.infoPort + ")")); + + Credential credential = null; + + String frameworkPrincipal = conf.get("mapred.mesos.framework.principal"); + if (frameworkPrincipal != null) { + frameworkInfoBuilder.setPrincipal(frameworkPrincipal); + String secretFile = conf.get("mapred.mesos.framework.secretfile"); + if (secretFile != null) { + credential = Credential.newBuilder() + .setSecret(ByteString.readFrom(new FileInputStream(secretFile))) + .setPrincipal(frameworkPrincipal) + .build(); } - break; - case TASK_STAGING: - case TASK_STARTING: - case TASK_RUNNING: - break; - default: - LOG.error("Unexpected TaskStatus: " + taskStatus.getState().name()); - break; + } + if (credential == null) { + LOG.info("Creating Schedule Driver"); + driver = new MesosSchedulerDriver(this, frameworkInfoBuilder.build(), master); + } else { + LOG.info("Creatingg Schedule Driver, attempting to authenticate with Principal: " + credential.getPrincipal() + + ", secret:" + credential.getSecret()); + driver = new MesosSchedulerDriver(this, frameworkInfoBuilder.build(), master, credential); + } + driver.start(); + } catch (Exception e) { + // If the MesosScheduler can't be loaded, the JobTracker won't be useful + // at all, so crash it now so that the user notices. + LOG.fatal("Failed to start MesosScheduler", e); + System.exit(1); } - if (metrics != null) { - Meter meter = metrics.taskStateMeter.get(taskStatus.getState()); - if (meter != null) { - meter.mark(); - } + String file = conf.get("mapred.mesos.state.file", ""); + if (!file.equals("")) { + this.stateFile = new File(file); } - } - @Override - public synchronized void frameworkMessage(SchedulerDriver schedulerDriver, - ExecutorID executorID, SlaveID slaveID, byte[] bytes) { - LOG.info("Framework Message of " + bytes.length + " bytes" - + " from executor " + executorID.getValue() - + " on slave " + slaveID.getValue()); - } + policyIsFixed = conf.getBoolean("mapred.mesos.scheduler.policy.fixed", policyIsFixed); - @Override - public synchronized void disconnected(SchedulerDriver schedulerDriver) { - LOG.warn("Disconnected from Mesos master."); - } + if (policyIsFixed) { + policy = new ResourcePolicyFixed(this); + } else { + policy = new ResourcePolicyVariable(this); + } - @Override - public synchronized void slaveLost(SchedulerDriver schedulerDriver, - SlaveID slaveID) { - LOG.warn("Slave lost: " + slaveID.getValue()); - } + enableMetrics = conf.getBoolean("mapred.mesos.metrics.enabled", enableMetrics); - @Override - public synchronized void executorLost(SchedulerDriver schedulerDriver, - ExecutorID executorID, SlaveID slaveID, int status) { - LOG.warn("Executor " + executorID.getValue() - + " lost with status " + status + " on slave " + slaveID); + if (enableMetrics) { + metrics = new Metrics(conf); + } + + taskScheduler.start(); } @Override - public synchronized void error(SchedulerDriver schedulerDriver, String s) { - LOG.error("Error from scheduler driver: " + s); + public synchronized void terminate() throws IOException { + try { + LOG.info("Stopping MesosScheduler"); + driver.stop(); + } catch (Exception e) { + LOG.error("Failed to stop Mesos scheduler", e); + } + + taskScheduler.terminate(); } } diff --git a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java index 021e0ea..c215729 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java @@ -3,6 +3,7 @@ import org.apache.commons.httpclient.HttpHost; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.mesos.Protos.TaskID; import java.util.Collection; @@ -16,26 +17,29 @@ * Used to track the our launched TaskTrackers. */ public class MesosTracker { - public static final Log LOG = LogFactory.getLog(MesosScheduler.class); public volatile HttpHost host; - public TaskID taskId; + public TaskID mapTaskId; + public TaskID reduceTaskId; public long mapSlots; public long reduceSlots; - public volatile long idleCounter = 0; + // Number of idle check cycles all map slots are idle + public volatile long idleMapCounter = 0; + // Number of idle check cycles all reduce slots are idle + public volatile long idleReduceCounter = 0; public volatile long idleCheckInterval = 0; public volatile long idleCheckMax = 0; public volatile boolean active = false; // Set once tracked by the JobTracker. - public volatile boolean stopped = false; public volatile MesosScheduler scheduler; // Tracks Hadoop jobs running on the tracker. public Set jobs = Collections.newSetFromMap(new ConcurrentHashMap()); public com.codahale.metrics.Timer.Context context; - public MesosTracker(HttpHost host, TaskID taskId, long mapSlots, - long reduceSlots, MesosScheduler scheduler) { + public MesosTracker(HttpHost host, TaskID mapTaskId, TaskID reduceTaskId, + long mapSlots, long reduceSlots, MesosScheduler scheduler) { this.host = host; - this.taskId = taskId; + this.mapTaskId = mapTaskId; + this.reduceTaskId = reduceTaskId; this.mapSlots = mapSlots; this.reduceSlots = reduceSlots; this.scheduler = scheduler; @@ -55,6 +59,16 @@ public MesosTracker(HttpHost host, TaskID taskId, long mapSlots, } } + public TaskID getTaskId(TaskType type) { + if (type == TaskType.MAP) { + return mapTaskId; + } else if (type == TaskType.REDUCE) { + return reduceTaskId; + } + + return null; + } + protected void scheduleStartupTimer() { scheduler.scheduleTimer(new Runnable() { @Override @@ -84,72 +98,19 @@ public void run() { if (MesosTracker.this.scheduler.metrics != null) { MesosTracker.this.scheduler.metrics.launchTimeout.mark(); } + LOG.warn("Tracker " + MesosTracker.this.host + " failed to launch within " + MesosScheduler.LAUNCH_TIMEOUT_MS / 1000 + " seconds, killing it"); + + // Kill the MAP and REDUCE slot tasks. This doesn't directly kill the + // task tracker but it will result in the task tracker receiving no + // tasks and ultimately lead to it's death. Best case the task is broken + // and it will never come up on Mesos. MesosTracker.this.scheduler.killTracker(MesosTracker.this); } }, MesosScheduler.LAUNCH_TIMEOUT_MS, TimeUnit.MILLISECONDS); } - protected void scheduleIdleCheck() { - scheduler.scheduleTimer(new Runnable() { - @Override - public void run() { - // We're not interested if the task tracker has been stopped. - if (MesosTracker.this.stopped) { - return; - } - - // If the task tracker isn't active, wait until it is active. - // TODO(tarnfeld): Do this based on some kind of lock/wait? - if (!MesosTracker.this.active) { - scheduleIdleCheck(); - return; - } - - boolean trackerIsIdle = false; - - // We're only interested in TaskTrackers which have jobs assigned to them - // but are completely idle. The MesosScheduler is in charge of destroying - // task trackers that are not handling any jobs, so we can leave those alone. - if (MesosTracker.this.idleCounter >= MesosTracker.this.idleCheckMax) { - LOG.info("Killing idle tasktracker: " + MesosTracker.this.host); - MesosTracker.this.scheduler.killTracker(MesosTracker.this); - scheduleIdleCheck(); - return; - } - - long idleMapSlots = 0; - long idleReduceSlots = 0; - - Collection taskTrackers = scheduler.jobTracker.taskTrackers(); - for (TaskTrackerStatus status : taskTrackers) { - HttpHost host = new HttpHost(status.getHost(), status.getHttpPort()); - if (host.toString().equals(MesosTracker.this.host.toString())) { - idleMapSlots += status.getAvailableMapSlots(); - idleReduceSlots += status.getAvailableReduceSlots(); - break; - } - } - - trackerIsIdle = idleMapSlots == MesosTracker.this.mapSlots && - idleReduceSlots == MesosTracker.this.reduceSlots; - - if (trackerIsIdle) { - LOG.info("TaskTracker appears idle right now: " + MesosTracker.this.host); - MesosTracker.this.idleCounter += 1; - } else { - if (MesosTracker.this.idleCounter > 0) { - LOG.info("TaskTracker is no longer idle: " + MesosTracker.this.host); - } - MesosTracker.this.idleCounter = 0; - } - - scheduleIdleCheck(); - } - }, MesosTracker.this.idleCheckInterval, TimeUnit.SECONDS); - } - protected void schedulePeriodic() { scheduler.scheduleTimer(new Runnable() { @Override @@ -158,7 +119,7 @@ public void run() { MesosTracker.this == MesosTracker.this.scheduler.mesosTrackers.get(host)) { // Periodically check if the jobs assigned to this TaskTracker are // still running (lazy GC). - final Set jobsCopy = new HashSet(MesosTracker.this.jobs); + final Set jobsCopy = new HashSet<>(MesosTracker.this.jobs); for (JobID id : jobsCopy) { JobStatus jobStatus = MesosTracker.this.scheduler.jobTracker.getJobStatus(id); if (jobStatus == null || jobStatus.isJobComplete()) { @@ -174,11 +135,84 @@ public void run() { }, MesosScheduler.PERIODIC_MS, TimeUnit.MILLISECONDS); } - public void stop() { - active = false; - stopped = true; - if (context != null) { - context.stop(); + protected void scheduleIdleCheck() { + scheduler.scheduleTimer(new Runnable() { + @Override + public void run() { + // If the task tracker isn't active, wait until it is active. + // If the task tracker has no jobs assigned to it, ignore it. We're + // only interested in a tracker that has jobs but isn't using any of + // the slots. + if (!MesosTracker.this.active || MesosTracker.this.jobs.isEmpty()) { + scheduleIdleCheck(); + return; + } + + // Perform the idle checks for map and reduce slots + if (MesosTracker.this.mapSlots > 0) { + idleMapCheck(); + } + + if (MesosTracker.this.reduceSlots > 0) { + idleReduceCheck(); + } + + scheduleIdleCheck(); + } + }, MesosTracker.this.idleCheckInterval, TimeUnit.SECONDS); + } + + protected void idleMapCheck() { + + // If the map slots has been idle for too long, kill them. + if (this.idleMapCounter >= MesosTracker.this.idleCheckMax) { + LOG.info("Killing MAP slots on idle Task Tracker " + MesosTracker.this.host); + MesosTracker.this.scheduler.killTrackerSlots(MesosTracker.this, TaskType.MAP); + return; + } + + long occupiedMapSlots = 0; + Collection taskTrackers = scheduler.jobTracker.taskTrackers(); + for (TaskTrackerStatus status : taskTrackers) { + HttpHost host = new HttpHost(status.getHost(), status.getHttpPort()); + if (host.toString().equals(MesosTracker.this.host.toString())) { + occupiedMapSlots += status.countOccupiedMapSlots(); + break; + } + } + + if (occupiedMapSlots == 0) { + LOG.info("TaskTracker MAP slots appear idle right now: " + MesosTracker.this.host); + MesosTracker.this.idleMapCounter += 1; + } else { + MesosTracker.this.idleMapCounter = 0; + } + } + + protected void idleReduceCheck() { + + // If the reduce slots has been idle for too long, kill them. + if (this.idleReduceCounter >= MesosTracker.this.idleCheckMax) { + LOG.info("Killing REDUCE slots on idle Task Tracker " + MesosTracker.this.host); + MesosTracker.this.scheduler.killTrackerSlots(MesosTracker.this, TaskType.REDUCE); + return; + } + + long occupiedReduceSlots = 0; + Collection taskTrackers = scheduler.jobTracker.taskTrackers(); + for (TaskTrackerStatus status : taskTrackers) { + HttpHost host = new HttpHost(status.getHost(), status.getHttpPort()); + if (host.toString().equals(MesosTracker.this.host.toString())) { + occupiedReduceSlots += status.countOccupiedReduceSlots(); + break; + } + } + + if (occupiedReduceSlots == 0) { + LOG.info("TaskTracker REDUCE slots appear idle right now: " + MesosTracker.this.host); + MesosTracker.this.idleReduceCounter += 1; + } else { + MesosTracker.this.idleReduceCounter = 0; } } } diff --git a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java index 3a52888..95b4b37 100644 --- a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java +++ b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java @@ -17,7 +17,7 @@ import static org.apache.hadoop.util.StringUtils.join; -public class ResourcePolicy { +public abstract class ResourcePolicy { public static final Log LOG = LogFactory.getLog(ResourcePolicy.class); public volatile MesosScheduler scheduler; public int neededMapSlots; @@ -71,142 +71,13 @@ public ResourcePolicy(MesosScheduler scheduler) { containerMem = tasktrackerMem; } - public void computeNeededSlots(List jobsInProgress, - Collection taskTrackers) { - // Compute the number of pending maps and reduces. - int pendingMaps = 0; - int pendingReduces = 0; - int runningMaps = 0; - int runningReduces = 0; - - for (JobInProgress progress : jobsInProgress) { - // JobStatus.pendingMaps/Reduces may return the wrong value on - // occasion. This seems to be safer. - pendingMaps += scheduler.getPendingTasks(progress.getTasks(TaskType.MAP)); - pendingReduces += scheduler.getPendingTasks(progress.getTasks(TaskType.REDUCE)); - runningMaps += progress.runningMaps(); - runningReduces += progress.runningReduces(); - - // If the task is waiting to launch the cleanup task, let us make sure we have - // capacity to run the task. - if (!progress.isCleanupLaunched()) { - pendingMaps += scheduler.getPendingTasks(progress.getTasks(TaskType.JOB_CLEANUP)); - } - } - - // Mark active (heartbeated) TaskTrackers and compute idle slots. - int idleMapSlots = 0; - int idleReduceSlots = 0; - int unhealthyTrackers = 0; - - for (TaskTrackerStatus status : taskTrackers) { - if (!status.getHealthStatus().isNodeHealthy()) { - // Skip this node if it's unhealthy. - ++unhealthyTrackers; - continue; - } - - HttpHost host = new HttpHost(status.getHost(), status.getHttpPort()); - if (scheduler.mesosTrackers.containsKey(host)) { - scheduler.mesosTrackers.get(host).active = true; - idleMapSlots += status.getAvailableMapSlots(); - idleReduceSlots += status.getAvailableReduceSlots(); - } - } - - // Consider the TaskTrackers that have yet to become active as being idle, - // otherwise we will launch excessive TaskTrackers. - int inactiveMapSlots = 0; - int inactiveReduceSlots = 0; - for (MesosTracker tracker : scheduler.mesosTrackers.values()) { - if (!tracker.active) { - inactiveMapSlots += tracker.mapSlots; - inactiveReduceSlots += tracker.reduceSlots; - } - } - - // To ensure Hadoop jobs begin promptly, we can specify a minimum number - // of 'hot slots' to be available for use. This addresses the - // TaskTracker spin up delay that exists with Hadoop on Mesos. This can - // be a nuisance with lower latency applications, such as ad-hoc Hive - // queries. - int minimumMapSlots = scheduler.conf.getInt("mapred.mesos.total.map.slots.minimum", 0); - int minimumReduceSlots = - scheduler.conf.getInt("mapred.mesos.total.reduce.slots.minimum", 0); - - // Compute how many slots we need to allocate. - neededMapSlots = Math.max( - minimumMapSlots - (idleMapSlots + inactiveMapSlots), - pendingMaps - (idleMapSlots + inactiveMapSlots)); - neededReduceSlots = Math.max( - minimumReduceSlots - (idleReduceSlots + inactiveReduceSlots), - pendingReduces - (idleReduceSlots + inactiveReduceSlots)); - - LOG.info(join("\n", Arrays.asList( - "JobTracker Status", - " Pending Map Tasks: " + pendingMaps, - " Pending Reduce Tasks: " + pendingReduces, - " Running Map Tasks: " + runningMaps, - " Running Reduce Tasks: " + runningReduces, - " Idle Map Slots: " + idleMapSlots, - " Idle Reduce Slots: " + idleReduceSlots, - " Inactive Map Slots: " + inactiveMapSlots - + " (launched but no hearbeat yet)", - " Inactive Reduce Slots: " + inactiveReduceSlots - + " (launched but no hearbeat yet)", - " Needed Map Slots: " + neededMapSlots, - " Needed Reduce Slots: " + neededReduceSlots, - " Unhealthy Trackers: " + unhealthyTrackers))); - - if (scheduler.stateFile != null) { - // Update state file - synchronized (this) { - Set hosts = new HashSet(); - for (MesosTracker tracker : scheduler.mesosTrackers.values()) { - hosts.add(tracker.host.getHostName()); - } - try { - File tmp = new File(scheduler.stateFile.getAbsoluteFile() + ".tmp"); - FileWriter fstream = new FileWriter(tmp); - fstream.write(join("\n", Arrays.asList( - "time=" + System.currentTimeMillis(), - "pendingMaps=" + pendingMaps, - "pendingReduces=" + pendingReduces, - "runningMaps=" + runningMaps, - "runningReduces=" + runningReduces, - "idleMapSlots=" + idleMapSlots, - "idleReduceSlots=" + idleReduceSlots, - "inactiveMapSlots=" + inactiveMapSlots, - "inactiveReduceSlots=" + inactiveReduceSlots, - "neededMapSlots=" + neededMapSlots, - "neededReduceSlots=" + neededReduceSlots, - "unhealthyTrackers=" + unhealthyTrackers, - "hosts=" + join(",", hosts), - ""))); - fstream.close(); - tmp.renameTo(scheduler.stateFile); - } catch (Exception e) { - LOG.error("Can't write state file: " + e.getMessage()); - } - } - } - } - - // This method computes the number of slots to launch for this offer, and - // returns true if the offer is sufficient. - // Must be overridden. - public boolean computeSlots() { - return false; - } - - public void resourceOffers(SchedulerDriver schedulerDriver, - List offers) { - final HttpHost jobTrackerAddress = - new HttpHost(scheduler.jobTracker.getHostname(), scheduler.jobTracker.getTrackerPort()); + public void resourceOffers(SchedulerDriver schedulerDriver, List offers) { +// final HttpHost jobTrackerAddress = +// new HttpHost(scheduler.jobTracker.getHostname(), scheduler.jobTracker.getTrackerPort()); final Collection taskTrackers = scheduler.jobTracker.taskTrackers(); - final List jobsInProgress = new ArrayList(); + final List jobsInProgress = new ArrayList<>(); for (JobStatus status : scheduler.jobTracker.jobsToComplete()) { jobsInProgress.add(scheduler.jobTracker.getJob(status.getJobID())); } @@ -229,7 +100,7 @@ public void resourceOffers(SchedulerDriver schedulerDriver, mem = -1.0; disk = -1.0; Set ports = new HashSet(); - String cpuRole = new String("*"); + String cpuRole = "*"; String memRole = cpuRole; String diskRole = cpuRole; String portsRole = cpuRole; @@ -385,7 +256,7 @@ public void resourceOffers(SchedulerDriver schedulerDriver, if (master == null) { throw new RuntimeException( "Expecting configuration property 'mapred.mesos.master'"); - } else if (master == "local") { + } else if (Objects.equals(master, "local")) { throw new RuntimeException( "Can not use 'local' for 'mapred.mesos.executor'"); } @@ -435,8 +306,8 @@ public void resourceOffers(SchedulerDriver schedulerDriver, } if (containerOptions != null) { - for (int i = 0; i < containerOptions.length; i++) { - containerInfo.addOptions(containerOptions[i]); + for (String containerOption : containerOptions) { + containerInfo.addOptions(containerOption); } } @@ -460,7 +331,7 @@ public void resourceOffers(SchedulerDriver schedulerDriver, ExecutorInfo executor = ExecutorInfo .newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue( - "executor_" + taskId.getValue())) + "Executor_" + taskId.getValue())) .setName("Hadoop TaskTracker") .setSource(taskId.getValue()) .addResources( @@ -499,51 +370,34 @@ public void resourceOffers(SchedulerDriver schedulerDriver, continue; } - // Create the TaskTracker TaskInfo - TaskInfo trackerTaskInfo = TaskInfo - .newBuilder() - .setName(taskId.getValue()) - .setTaskId(taskId) - .setSlaveId(offer.getSlaveId()) - .addResources( - Resource - .newBuilder() - .setName("ports") - .setType(Value.Type.RANGES) - .setRole(portsRole) - .setRanges( - Value.Ranges - .newBuilder() - .addRange(Value.Range.newBuilder() - .setBegin(httpAddress.getPort()) - .setEnd(httpAddress.getPort())) - .addRange(Value.Range.newBuilder() - .setBegin(reportAddress.getPort()) - .setEnd(reportAddress.getPort())))) - .addResources( - Resource - .newBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setRole(cpuRole) - .setScalar(Value.Scalar.newBuilder().setValue(taskCpus - containerCpus))) - .addResources( - Resource - .newBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setRole(memRole) - .setScalar(Value.Scalar.newBuilder().setValue(taskMem - containerCpus))) - .setData(taskData) - .setExecutor(executor) - .build(); + List tasks = new ArrayList(); + TaskID mapTaskId = null; + TaskID reduceTaskId = null; + + if (mapSlots > 0) { + TaskInfo mapTask = buildTaskInfo(executor, taskId.getValue() + "_Map", offer, + httpAddress.getPort(), reportAddress.getPort(), mapSlots, taskData, + portsRole, cpuRole, memRole); + + mapTaskId = mapTask.getTaskId(); + tasks.add(mapTask); + } + + if (reduceSlots > 0) { + TaskInfo reduceTask = buildTaskInfo(executor, taskId.getValue() + "_Reduce", offer, + httpAddress.getPort(), reportAddress.getPort(), reduceSlots, taskData, + portsRole, cpuRole, memRole); + + reduceTaskId = reduceTask.getTaskId(); + tasks.add(reduceTask); + } // Add this tracker to Mesos tasks. - scheduler.mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId, - mapSlots, reduceSlots, scheduler)); + scheduler.mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, + mapTaskId, reduceTaskId, mapSlots, reduceSlots, scheduler)); // Launch the task - schedulerDriver.launchTasks(Arrays.asList(offer.getId()), Arrays.asList(trackerTaskInfo)); + schedulerDriver.launchTasks(Arrays.asList(offer.getId()), tasks); neededMapSlots -= mapSlots; neededReduceSlots -= reduceSlots; @@ -559,4 +413,178 @@ public void resourceOffers(SchedulerDriver schedulerDriver, } } } + + protected TaskInfo buildTaskInfo(ExecutorInfo executor, String taskId, Offer offer, + Integer httpPort, Integer reportPort, long slots, ByteString taskData, + String portsRole, String cpuRole, String memRole) { + + TaskInfo taskInfo = TaskInfo + .newBuilder() + .setName(taskId) + .setTaskId(TaskID.newBuilder().setValue(taskId)) + .setSlaveId(offer.getSlaveId()) + .addResources( + Resource + .newBuilder() + .setName("ports") + .setType(Value.Type.RANGES) + .setRole(portsRole) + .setRanges( + Value.Ranges + .newBuilder() + .addRange(Value.Range.newBuilder() + .setBegin(httpPort) + .setEnd(httpPort)) + .addRange(Value.Range.newBuilder() + .setBegin(reportPort) + .setEnd(reportPort)))) + .addResources( + Resource + .newBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setRole(cpuRole) + .setScalar(Value.Scalar.newBuilder().setValue(slotCpus * slots))) + .addResources( + Resource + .newBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setRole(memRole) + .setScalar(Value.Scalar.newBuilder().setValue(slotMem * slots))) + .setData(taskData) + .setExecutor(executor) + .build(); + + return taskInfo; + } + + public void computeNeededSlots(List jobsInProgress, + Collection taskTrackers) { + // Compute the number of pending maps and reduces. + int pendingMaps = 0; + int pendingReduces = 0; + int runningMaps = 0; + int runningReduces = 0; + + for (JobInProgress progress : jobsInProgress) { + // JobStatus.pendingMaps/Reduces may return the wrong value on + // occasion. This seems to be safer. + pendingMaps += scheduler.getPendingTasks(progress.getTasks(TaskType.MAP)); + pendingReduces += scheduler.getPendingTasks(progress.getTasks(TaskType.REDUCE)); + runningMaps += progress.runningMaps(); + runningReduces += progress.runningReduces(); + + // If the task is waiting to launch the cleanup task, let us make sure we have + // capacity to run the task. + if (!progress.isCleanupLaunched()) { + pendingMaps += scheduler.getPendingTasks(progress.getTasks(TaskType.JOB_CLEANUP)); + } + } + + // Mark active (heartbeated) TaskTrackers and compute idle slots. + int idleMapSlots = 0; + int idleReduceSlots = 0; + int unhealthyTrackers = 0; + + for (TaskTrackerStatus status : taskTrackers) { + if (!status.getHealthStatus().isNodeHealthy()) { + // Skip this node if it's unhealthy. + ++unhealthyTrackers; + continue; + } + + HttpHost host = new HttpHost(status.getHost(), status.getHttpPort()); + if (scheduler.mesosTrackers.containsKey(host)) { + scheduler.mesosTrackers.get(host).active = true; + idleMapSlots += status.getAvailableMapSlots(); + idleReduceSlots += status.getAvailableReduceSlots(); + } + } + + // Consider the TaskTrackers that have yet to become active as being idle, + // otherwise we will launch excessive TaskTrackers. + int inactiveMapSlots = 0; + int inactiveReduceSlots = 0; + for (MesosTracker tracker : scheduler.mesosTrackers.values()) { + if (!tracker.active) { + inactiveMapSlots += tracker.mapSlots; + inactiveReduceSlots += tracker.reduceSlots; + } + } + + // To ensure Hadoop jobs begin promptly, we can specify a minimum number + // of 'hot slots' to be available for use. This addresses the + // TaskTracker spin up delay that exists with Hadoop on Mesos. This can + // be a nuisance with lower latency applications, such as ad-hoc Hive + // queries. + int minimumMapSlots = scheduler.conf.getInt("mapred.mesos.total.map.slots.minimum", 0); + int minimumReduceSlots = + scheduler.conf.getInt("mapred.mesos.total.reduce.slots.minimum", 0); + + // Compute how many slots we need to allocate. + neededMapSlots = Math.max( + minimumMapSlots - (idleMapSlots + inactiveMapSlots), + pendingMaps - (idleMapSlots + inactiveMapSlots)); + neededReduceSlots = Math.max( + minimumReduceSlots - (idleReduceSlots + inactiveReduceSlots), + pendingReduces - (idleReduceSlots + inactiveReduceSlots)); + + LOG.info(join("\n", Arrays.asList( + "JobTracker Status", + " Pending Map Tasks: " + pendingMaps, + " Pending Reduce Tasks: " + pendingReduces, + " Running Map Tasks: " + runningMaps, + " Running Reduce Tasks: " + runningReduces, + " Idle Map Slots: " + idleMapSlots, + " Idle Reduce Slots: " + idleReduceSlots, + " Inactive Map Slots: " + inactiveMapSlots + + " (launched but no hearbeat yet)", + " Inactive Reduce Slots: " + inactiveReduceSlots + + " (launched but no hearbeat yet)", + " Needed Map Slots: " + neededMapSlots, + " Needed Reduce Slots: " + neededReduceSlots, + " Unhealthy Trackers: " + unhealthyTrackers))); + + File stateFile = scheduler.stateFile; + if (stateFile != null) { + // Update state file + synchronized (this) { + Set hosts = new HashSet<>(); + for (MesosTracker tracker : scheduler.mesosTrackers.values()) { + hosts.add(tracker.host.getHostName()); + } + try { + File tmp = new File(stateFile.getAbsoluteFile() + ".tmp"); + FileWriter fstream = new FileWriter(tmp); + fstream.write(join("\n", Arrays.asList( + "time=" + System.currentTimeMillis(), + "pendingMaps=" + pendingMaps, + "pendingReduces=" + pendingReduces, + "runningMaps=" + runningMaps, + "runningReduces=" + runningReduces, + "idleMapSlots=" + idleMapSlots, + "idleReduceSlots=" + idleReduceSlots, + "inactiveMapSlots=" + inactiveMapSlots, + "inactiveReduceSlots=" + inactiveReduceSlots, + "neededMapSlots=" + neededMapSlots, + "neededReduceSlots=" + neededReduceSlots, + "unhealthyTrackers=" + unhealthyTrackers, + "hosts=" + join(",", hosts), + ""))); + fstream.close(); + if (!tmp.renameTo(stateFile)) { + LOG.error("Can't overwrite state " + stateFile.getAbsolutePath()); + } + } catch (Exception e) { + LOG.error("Can't write state file: " + e.getMessage()); + } + } + } + } + + // This method computes the number of slots to launch for this offer, and + // returns true if the offer is sufficient. + // Must be overridden. + public abstract boolean computeSlots(); } diff --git a/src/main/java/org/apache/hadoop/mapred/ResourcePolicyFixed.java b/src/main/java/org/apache/hadoop/mapred/ResourcePolicyFixed.java index 2898c4f..a567c76 100644 --- a/src/main/java/org/apache/hadoop/mapred/ResourcePolicyFixed.java +++ b/src/main/java/org/apache/hadoop/mapred/ResourcePolicyFixed.java @@ -1,18 +1,18 @@ package org.apache.hadoop.mapred; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - +/** + * @todo What is the difference between variable and fixed resource policy? + */ public class ResourcePolicyFixed extends ResourcePolicy { - - public static final Log LOG = LogFactory.getLog(ResourcePolicyFixed.class); - public ResourcePolicyFixed(MesosScheduler scheduler) { super(scheduler); } - // This method computes the number of slots to launch for this offer, and - // returns true if the offer is sufficient. + /** + * Computes the number of slots to launch for this offer + * + * @return true if the offer is sufficient + */ @Override public boolean computeSlots() { mapSlots = mapSlotsMax; @@ -24,9 +24,6 @@ public boolean computeSlots() { slots = (int) Math.min(slots, (disk - containerDisk) / slotDisk); // Is this offer too small for even the minimum slots? - if (slots < mapSlots + reduceSlots || slots < 1) { - return false; - } - return true; + return slots >= 1 && slots >= mapSlots + reduceSlots; } } diff --git a/src/main/java/org/apache/hadoop/mapred/ResourcePolicyVariable.java b/src/main/java/org/apache/hadoop/mapred/ResourcePolicyVariable.java index 7d9888a..c7c9c10 100644 --- a/src/main/java/org/apache/hadoop/mapred/ResourcePolicyVariable.java +++ b/src/main/java/org/apache/hadoop/mapred/ResourcePolicyVariable.java @@ -1,61 +1,62 @@ package org.apache.hadoop.mapred; +/** + * @todo What is the difference between variable and fixed resource policy? + */ public class ResourcePolicyVariable extends ResourcePolicy { - public ResourcePolicyVariable(MesosScheduler scheduler) { - super(scheduler); - } + public ResourcePolicyVariable(MesosScheduler scheduler) { + super(scheduler); + } - // This method computes the number of slots to launch for this offer, and - // returns true if the offer is sufficient. - @Override - public boolean computeSlots() { - // What's the minimum number of map and reduce slots we should try to - // launch? - mapSlots = 0; - reduceSlots = 0; + /** + * Computes the number of slots to launch for this offer + * + * @return true if the offer is sufficient + */ + @Override + public boolean computeSlots() { + // What's the minimum number of map and reduce slots we should try to + // launch? + mapSlots = 0; + reduceSlots = 0; - // Determine how many slots we can allocate. - int slots = mapSlotsMax + reduceSlotsMax; - slots = (int) Math.min(slots, (cpus - containerCpus) / slotCpus); - slots = (int) Math.min(slots, (mem - containerMem) / slotMem); - slots = (int) Math.min(slots, (disk - containerDisk) / slotDisk); + // Determine how many slots we can allocate. + int slots = mapSlotsMax + reduceSlotsMax; + slots = (int) Math.min(slots, (cpus - containerCpus) / slotCpus); + slots = (int) Math.min(slots, (mem - containerMem) / slotMem); + slots = (int) Math.min(slots, (disk - containerDisk) / slotDisk); - // Is this offer too small for even the minimum slots? - if (slots < 1) { - return false; - } + // Is this offer too small for even the minimum slots? + if (slots < 1) { + return false; + } - // Is the number of slots we need sufficiently small? If so, we can - // allocate exactly the number we need. - if (slots >= neededMapSlots + neededReduceSlots && neededMapSlots < - mapSlotsMax && neededReduceSlots < reduceSlotsMax) { - mapSlots = neededMapSlots; - reduceSlots = neededReduceSlots; - } else { - // Allocate slots fairly for this resource offer. - double mapFactor = (double) neededMapSlots / (neededMapSlots + neededReduceSlots); - double reduceFactor = (double) neededReduceSlots / (neededMapSlots + neededReduceSlots); - // To avoid map/reduce slot starvation, don't allow more than 50% - // spread between map/reduce slots when we need both mappers and - // reducers. - if (neededMapSlots > 0 && neededReduceSlots > 0) { - if (mapFactor < 0.25) { - mapFactor = 0.25; - } else if (mapFactor > 0.75) { - mapFactor = 0.75; - } - if (reduceFactor < 0.25) { - reduceFactor = 0.25; - } else if (reduceFactor > 0.75) { - reduceFactor = 0.75; - } + // Is the number of slots we need sufficiently small? If so, we can + // allocate exactly the number we need. + if (slots >= neededMapSlots + neededReduceSlots + && neededMapSlots < mapSlotsMax + && neededReduceSlots < reduceSlotsMax) { + mapSlots = neededMapSlots; + reduceSlots = neededReduceSlots; + } else { + // Allocate slots fairly for this resource offer. + double mapFactor = (double) neededMapSlots / (neededMapSlots + neededReduceSlots); + // To avoid map/reduce slot starvation, don't allow more than 50% + // spread between map/reduce slots when we need both mappers and + // reducers. + if (neededMapSlots > 0 && neededReduceSlots > 0) { + if (mapFactor < 0.25) { + mapFactor = 0.25; + } else if (mapFactor > 0.75) { + mapFactor = 0.75; } - mapSlots = Math.min(Math.min((long)Math.max(Math.round(mapFactor * slots), 1), mapSlotsMax), neededMapSlots); - - // The remaining slots are allocated for reduces. - slots -= mapSlots; - reduceSlots = Math.min(Math.min(slots, reduceSlotsMax), neededReduceSlots); } - return true; + mapSlots = Math.min(Math.min(Math.max(Math.round(mapFactor * slots), 1), mapSlotsMax), neededMapSlots); + + // The remaining slots are allocated for reduces. + slots -= mapSlots; + reduceSlots = Math.min(Math.min(slots, reduceSlotsMax), neededReduceSlots); } + return true; } +}