diff --git a/hydra-main/src/main/java/com/addthis/hydra/minion/CommandTaskDeleteRunner.java b/hydra-main/src/main/java/com/addthis/hydra/minion/CommandTaskDeleteRunner.java index 2fbf7a3a7..c4473e209 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/minion/CommandTaskDeleteRunner.java +++ b/hydra-main/src/main/java/com/addthis/hydra/minion/CommandTaskDeleteRunner.java @@ -43,7 +43,7 @@ public void run() { task.setDeleted(true); minion.tasks.remove(task.getJobKey().toString()); log.warn("[task.delete] {} terminated={}", task.getJobKey(), terminated); - minion.writeState(); + minion.writeState(true); } File taskDirFile = new File(minion.rootDir + "/" + delete.getJobUuid() + (delete.getNodeID() != null ? "/" + delete.getNodeID() : "")); if (taskDirFile.exists() && taskDirFile.isDirectory()) { @@ -52,6 +52,5 @@ public void run() { } finally { minion.minionStateLock.unlock(); } - } } diff --git a/hydra-main/src/main/java/com/addthis/hydra/minion/CommandTaskRevertRunner.java b/hydra-main/src/main/java/com/addthis/hydra/minion/CommandTaskRevertRunner.java index 566440015..a5bff10e1 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/minion/CommandTaskRevertRunner.java +++ b/hydra-main/src/main/java/com/addthis/hydra/minion/CommandTaskRevertRunner.java @@ -63,6 +63,6 @@ public void run() { log.warn("[task.revert] " + task.getJobKey() + " completed in " + (System.currentTimeMillis() - time) + "ms."); } } - minion.writeState(); + minion.writeState(true); } } diff --git a/hydra-main/src/main/java/com/addthis/hydra/minion/CommandTaskStopRunner.java b/hydra-main/src/main/java/com/addthis/hydra/minion/CommandTaskStopRunner.java index 6f61d19e1..5aec1ff4a 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/minion/CommandTaskStopRunner.java +++ b/hydra-main/src/main/java/com/addthis/hydra/minion/CommandTaskStopRunner.java @@ -63,7 +63,7 @@ public void run() { task.sendEndStatus(task.findLastJobStatus()); } } - minion.writeState(); + minion.writeState(true); } } diff --git a/hydra-main/src/main/java/com/addthis/hydra/minion/JobTask.java b/hydra-main/src/main/java/com/addthis/hydra/minion/JobTask.java index 9072c1668..eb9f61d16 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/minion/JobTask.java +++ b/hydra-main/src/main/java/com/addthis/hydra/minion/JobTask.java @@ -22,10 +22,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.function.Function; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.function.Function; import com.addthis.basis.util.LessBytes; import com.addthis.basis.util.LessFiles; @@ -35,7 +35,11 @@ import com.addthis.codec.annotations.FieldConfig; import com.addthis.codec.codables.Codable; import com.addthis.codec.json.CodecJSON; -import com.addthis.hydra.job.*; +import com.addthis.hydra.job.BackupWorkItem; +import com.addthis.hydra.job.JobTaskErrorCode; +import com.addthis.hydra.job.JobTaskState; +import com.addthis.hydra.job.ReplicateWorkItem; +import com.addthis.hydra.job.RunTaskWorkItem; import com.addthis.hydra.job.backup.DailyBackup; import com.addthis.hydra.job.backup.GoldBackup; import com.addthis.hydra.job.backup.HourlyBackup; @@ -446,7 +450,7 @@ private List assembleBackupCommandsForHost(boolean local, ReplicaTarget } } } - minion.writeState(); + minion.writeState(true); return copyCommands; } @@ -1255,7 +1259,7 @@ public boolean stopWait(File[] pidFiles, boolean kill) { return result; } - private void resetStartTime() { + protected void resetStartTime() { if (isRunning()) { startTime = 0; } else if (isReplicating()) { @@ -1263,7 +1267,7 @@ private void resetStartTime() { } else if (isBackingUp()) { backupStartTime = 0; } - minion.writeState(); + minion.writeState(true); } public File getLiveDir() { diff --git a/hydra-main/src/main/java/com/addthis/hydra/minion/Minion.java b/hydra-main/src/main/java/com/addthis/hydra/minion/Minion.java index dedb6353f..83dedb0f8 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/minion/Minion.java +++ b/hydra-main/src/main/java/com/addthis/hydra/minion/Minion.java @@ -278,7 +278,7 @@ private Minion(@JsonProperty("dataDir") File rootDir, if (liveEverywhereMarkerFile.createNewFile()) { log.info("cutover to live-everywhere tasks"); } - writeState(); + writeState(false); if (!Strings.isNullOrEmpty(queueType)) { runner = new TaskRunner(this); runner.start(); @@ -421,7 +421,7 @@ public void insertJobKickMessage(CommandTaskKick kick) { } finally { minionStateLock.unlock(); } - writeState(); + writeState(true); } void kickNextJob() throws Exception { @@ -462,7 +462,7 @@ void kickNextJob() throws Exception { log.warn("[kick] exception while trying to kick {}", task.getName(), ex); task.sendEndStatus(JobTaskErrorCode.EXIT_SCRIPT_EXEC_ERROR); } - writeState(); + writeState(true); return; } } finally { @@ -581,10 +581,10 @@ private boolean updateTaskMeta(String jobID, File taskRoot) throws IOException { } } - void writeState() { + void writeState(boolean append) { minionStateLock.lock(); try { - LessFiles.write(stateFile, LessBytes.toBytes(CodecJSON.encodeString(this)), false); + LessFiles.write(stateFile, LessBytes.toBytes(CodecJSON.encodeString(this)), append); } catch (IOException io) { log.warn("Error writing minion state to disk: ", io); /* assume disk failure: set diskReadOnly=true and exit */ @@ -797,7 +797,7 @@ JobTask createNewTask(String jobID, int node) throws ExecException { log.info("[task.new] restore {}/{} root={}", task.id, task.node, task.taskRoot); tasks.put(task.getJobKey().toString(), task); task.initializeFileVariables(); - writeState(); + writeState(true); return task; } @@ -842,7 +842,7 @@ public boolean getShutdown() { minionGroupMembership.removeFromGroup("/minion/up", getUUID()); zkClient.close(); } - writeState(); + writeState(true); } } } diff --git a/hydra-main/src/main/java/com/addthis/hydra/minion/TaskRunner.java b/hydra-main/src/main/java/com/addthis/hydra/minion/TaskRunner.java index 209c25dce..6be0120f9 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/minion/TaskRunner.java +++ b/hydra-main/src/main/java/com/addthis/hydra/minion/TaskRunner.java @@ -60,7 +60,13 @@ public void stopTaskRunner() { minion.channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } catch (InterruptedException ex) { log.warn("Interrupted while processing task messages"); - minion.shutdown(); + try { + minion.close(); + } catch (Exception e) { + log.error("Minion close throws an exception", e); + } finally { + minion.shutdown(); + } } catch (ShutdownSignalException shutdownException) { log.warn("Received unexpected shutdown exception from rabbitMQ", shutdownException); try { @@ -80,7 +86,13 @@ public void stopTaskRunner() { if (!(ex instanceof ExecException)) { log.error("Error nacking message", ex); } - minion.shutdown(); + try { + minion.close(); + } catch (Exception e) { + log.error("Minion close throws an exception", e); + } finally { + minion.shutdown(); + } } } }