From e435414b61fcf30fdb6c926f01f26896d80b2a7c Mon Sep 17 00:00:00 2001 From: "Philip R. Kensche" Date: Tue, 11 Oct 2022 09:07:35 +0200 Subject: [PATCH] Fixed windows-style linefeeds. Updated changelog. Planned bump to 0.1.0 --- README.md | 3 + .../execution/jobs/SubmissionCommand.groovy | 308 +++---- .../jobs/cluster/slurm/SlurmJobManager.groovy | 776 +++++++++--------- .../slurm/SlurmSubmissionCommand.groovy | 268 +++--- 4 files changed, 679 insertions(+), 676 deletions(-) diff --git a/README.md b/README.md index d38d5873..9ce4d9da 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,9 @@ To start the integration tests, please fill in host and user settings (password # Change Logs +* 0.1.0 + - Minor: SLURM support + * 0.0.14 - Minor: Added execution time out feature diff --git a/src/main/groovy/de/dkfz/roddy/execution/jobs/SubmissionCommand.groovy b/src/main/groovy/de/dkfz/roddy/execution/jobs/SubmissionCommand.groovy index 45c29669..3899a28b 100644 --- a/src/main/groovy/de/dkfz/roddy/execution/jobs/SubmissionCommand.groovy +++ b/src/main/groovy/de/dkfz/roddy/execution/jobs/SubmissionCommand.groovy @@ -1,154 +1,154 @@ -/* - * Copyright (c) 2021 German Cancer Research Center (Deutsches Krebsforschungszentrum, DKFZ). - * - * Distributed under the MIT License (license terms are at https://www.github.com/TheRoddyWMS/Roddy/LICENSE.txt). - */ -package de.dkfz.roddy.execution.jobs - -import de.dkfz.roddy.StringConstants -import de.dkfz.roddy.config.JobLog -import de.dkfz.roddy.tools.BashUtils -import groovy.transform.CompileStatic - -import static de.dkfz.roddy.StringConstants.EMPTY - -@CompileStatic -abstract class SubmissionCommand extends Command { - - /** - * Should the local environment during the submission be copied to the execution hosts? - * This is an Optional, because the actual value will be calculated from both the Job/Command configuration and - * the JobManager. - */ - Optional passEnvironment = Optional.empty() - - /** - * The command which should be called - */ - protected String command - - protected List dependencyIDs - - protected final List processingParameters - /** - * A command to be executed on the cluster head node, in particular qsub, bsub, qstat, etc. - * - * @param parentJobManager - * @param job - * @param jobName - * @param environmentVariables - * - */ - protected SubmissionCommand(BatchEuphoriaJobManager parentJobManager, BEJob job, String jobName, - List processingParameters, - Map environmentVariables, List dependencyIDs, - String command) { - super(parentJobManager, job, jobName, environmentVariables) - this.processingParameters = processingParameters - this.command = command - this.dependencyIDs = dependencyIDs ?: new LinkedList() - } - - /** - * Should the local environment be passed? - * - * JobManager and SubmissionCommand together determine, whether the environment should be passed. - * - * * The Command has precedence over the JobManager value. - * * If the Command value is not set, the JobManager value is the fallback. - * * If neither JobManager nor Command are defined, only copy the requested variables to remote. - * - * @return - */ - Boolean getPassLocalEnvironment() { - passEnvironment.orElse(parentJobManager.passEnvironment) - } - - @Override - String toBashCommandString() { - String email = parentJobManager.getUserEmail() - String umask = parentJobManager.getUserMask() - String groupList = parentJobManager.getUserGroup() - boolean holdJobsOnStart = parentJobManager.isHoldJobsEnabled() - - // collect parameters for job submission - List parameters = [] - parameters << assembleVariableExportParameters() - parameters << getAccountNameParameter() - parameters << getJobNameParameter() - if (holdJobsOnStart) parameters << getHoldParameter() - parameters << getWorkingDirectoryParameter() - parameters << getLoggingParameter(job.jobLog) - parameters << getEmailParameter(email) - if (groupList && groupList != "UNDEFINED") parameters << getGroupListParameter(groupList) - parameters << getUmaskString(umask) - parameters << assembleProcessingCommands() - parameters << assembleDependencyParameter(creatingJob.parentJobIDs) - parameters << getAdditionalCommandParameters() - - // create job submission command call - StringBuilder command = new StringBuilder(EMPTY) - - if (environmentString) { - command << "${environmentString} " - } - - if (job.toolScript) { - command << "echo " << BashUtils.strongQuote("#!/bin/bash " + System.lineSeparator() + job.toolScript) << " | " - } - - command << parentJobManager.submissionCommand - command << " ${parameters.join(" ")} " - - if (job.tool) { - command << " " << job.tool.absolutePath - } - - return command - } - - abstract protected String getJobNameParameter() - - abstract protected String getHoldParameter() - - protected String getAccountNameParameter() { - return "" - } - - abstract protected String getWorkingDirectoryParameter() - - abstract protected String getLoggingParameter(JobLog jobLog) - - abstract protected String getEmailParameter(String address) - - abstract protected String getGroupListParameter(String groupList) - - abstract protected String getUmaskString(String umask) - - abstract protected String assembleDependencyParameter(List jobIds) - - abstract protected String getAdditionalCommandParameters() - - abstract protected String getEnvironmentString() - - /** If passLocalEnvironment is true, all local variables will be forwarded to the execution host. - * If passLocalEnvironment is false, no local variables will be forwarded by default. - * In both cases arbitrary variables can be set to specific values or be declared to be forwarded as defined in the local environment (according - * to the parameters field; null-value parameters are copied as locally defined). - * - * @return A set of parameters for the submission command to achieve the requested variable exports. - */ - abstract protected String assembleVariableExportParameters() - - String assembleProcessingCommands() { - StringBuilder commands = new StringBuilder() - for (ProcessingParameters pcmd in job.getListOfProcessingParameters()) { - if (!(pcmd instanceof ProcessingParameters)) continue - ProcessingParameters command = (ProcessingParameters) pcmd - if (command == null) - continue - commands << StringConstants.WHITESPACE << command.getProcessingCommandString() - } - return commands.toString() - } -} +/* + * Copyright (c) 2021 German Cancer Research Center (Deutsches Krebsforschungszentrum, DKFZ). + * + * Distributed under the MIT License (license terms are at https://www.github.com/TheRoddyWMS/Roddy/LICENSE.txt). + */ +package de.dkfz.roddy.execution.jobs + +import de.dkfz.roddy.StringConstants +import de.dkfz.roddy.config.JobLog +import de.dkfz.roddy.tools.BashUtils +import groovy.transform.CompileStatic + +import static de.dkfz.roddy.StringConstants.EMPTY + +@CompileStatic +abstract class SubmissionCommand extends Command { + + /** + * Should the local environment during the submission be copied to the execution hosts? + * This is an Optional, because the actual value will be calculated from both the Job/Command configuration and + * the JobManager. + */ + Optional passEnvironment = Optional.empty() + + /** + * The command which should be called + */ + protected String command + + protected List dependencyIDs + + protected final List processingParameters + /** + * A command to be executed on the cluster head node, in particular qsub, bsub, qstat, etc. + * + * @param parentJobManager + * @param job + * @param jobName + * @param environmentVariables + * + */ + protected SubmissionCommand(BatchEuphoriaJobManager parentJobManager, BEJob job, String jobName, + List processingParameters, + Map environmentVariables, List dependencyIDs, + String command) { + super(parentJobManager, job, jobName, environmentVariables) + this.processingParameters = processingParameters + this.command = command + this.dependencyIDs = dependencyIDs ?: new LinkedList() + } + + /** + * Should the local environment be passed? + * + * JobManager and SubmissionCommand together determine, whether the environment should be passed. + * + * * The Command has precedence over the JobManager value. + * * If the Command value is not set, the JobManager value is the fallback. + * * If neither JobManager nor Command are defined, only copy the requested variables to remote. + * + * @return + */ + Boolean getPassLocalEnvironment() { + passEnvironment.orElse(parentJobManager.passEnvironment) + } + + @Override + String toBashCommandString() { + String email = parentJobManager.getUserEmail() + String umask = parentJobManager.getUserMask() + String groupList = parentJobManager.getUserGroup() + boolean holdJobsOnStart = parentJobManager.isHoldJobsEnabled() + + // collect parameters for job submission + List parameters = [] + parameters << assembleVariableExportParameters() + parameters << getAccountNameParameter() + parameters << getJobNameParameter() + if (holdJobsOnStart) parameters << getHoldParameter() + parameters << getWorkingDirectoryParameter() + parameters << getLoggingParameter(job.jobLog) + parameters << getEmailParameter(email) + if (groupList && groupList != "UNDEFINED") parameters << getGroupListParameter(groupList) + parameters << getUmaskString(umask) + parameters << assembleProcessingCommands() + parameters << assembleDependencyParameter(creatingJob.parentJobIDs) + parameters << getAdditionalCommandParameters() + + // create job submission command call + StringBuilder command = new StringBuilder(EMPTY) + + if (environmentString) { + command << "${environmentString} " + } + + if (job.toolScript) { + command << "echo " << BashUtils.strongQuote("#!/bin/bash " + System.lineSeparator() + job.toolScript) << " | " + } + + command << parentJobManager.submissionCommand + command << " ${parameters.join(" ")} " + + if (job.tool) { + command << " " << job.tool.absolutePath + } + + return command + } + + abstract protected String getJobNameParameter() + + abstract protected String getHoldParameter() + + protected String getAccountNameParameter() { + return "" + } + + abstract protected String getWorkingDirectoryParameter() + + abstract protected String getLoggingParameter(JobLog jobLog) + + abstract protected String getEmailParameter(String address) + + abstract protected String getGroupListParameter(String groupList) + + abstract protected String getUmaskString(String umask) + + abstract protected String assembleDependencyParameter(List jobIds) + + abstract protected String getAdditionalCommandParameters() + + abstract protected String getEnvironmentString() + + /** If passLocalEnvironment is true, all local variables will be forwarded to the execution host. + * If passLocalEnvironment is false, no local variables will be forwarded by default. + * In both cases arbitrary variables can be set to specific values or be declared to be forwarded as defined in the local environment (according + * to the parameters field; null-value parameters are copied as locally defined). + * + * @return A set of parameters for the submission command to achieve the requested variable exports. + */ + abstract protected String assembleVariableExportParameters() + + String assembleProcessingCommands() { + StringBuilder commands = new StringBuilder() + for (ProcessingParameters pcmd in job.getListOfProcessingParameters()) { + if (!(pcmd instanceof ProcessingParameters)) continue + ProcessingParameters command = (ProcessingParameters) pcmd + if (command == null) + continue + commands << StringConstants.WHITESPACE << command.getProcessingCommandString() + } + return commands.toString() + } +} diff --git a/src/main/groovy/de/dkfz/roddy/execution/jobs/cluster/slurm/SlurmJobManager.groovy b/src/main/groovy/de/dkfz/roddy/execution/jobs/cluster/slurm/SlurmJobManager.groovy index 8bb3b68c..e1983f15 100644 --- a/src/main/groovy/de/dkfz/roddy/execution/jobs/cluster/slurm/SlurmJobManager.groovy +++ b/src/main/groovy/de/dkfz/roddy/execution/jobs/cluster/slurm/SlurmJobManager.groovy @@ -1,388 +1,388 @@ -/* - * Copyright (c) 2022 German Cancer Research Center (DKFZ). - * - * Distributed under the MIT License (license terms are at https://www.github.com/eilslabs/Roddy/LICENSE.txt). - */ - -package de.dkfz.roddy.execution.jobs.cluster.slurm - -import com.google.common.collect.LinkedHashMultimap -import de.dkfz.roddy.BEException -import de.dkfz.roddy.config.ResourceSet -import de.dkfz.roddy.execution.BEExecutionService -import de.dkfz.roddy.execution.io.ExecutionResult -import de.dkfz.roddy.execution.jobs.* -import de.dkfz.roddy.execution.jobs.cluster.GridEngineBasedJobManager -import de.dkfz.roddy.tools.* -import groovy.json.JsonSlurper -import groovy.transform.CompileStatic - -import java.time.* -import java.time.format.DateTimeFormatter - -@CompileStatic -class SlurmJobManager extends GridEngineBasedJobManager { - - private final ZoneId TIME_ZONE_ID - - SlurmJobManager(BEExecutionService executionService, JobManagerOptions parms) { - super(executionService, parms) - TIME_ZONE_ID = parms.timeZoneId - } - - @Override - protected SlurmSubmissionCommand createCommand(BEJob job) { - SlurmSubmissionCommand ssc = new SlurmSubmissionCommand(this, job, job.jobName, [], job.parameters, job.parentJobIDs*.id, - job.tool?.getAbsolutePath() ?: job.getToolScript()) - return ssc - } - - @Override - GenericJobInfo parseGenericJobInfo(String commandString) { - throw new BEException("parseGenericJobInfo is not implemented") - } - - - @Override - String parseJobID(String commandOutput) { - return commandOutput - } - - @Override - protected int getColumnOfJobID() { - return 0 - } - - @Override - protected int getColumnOfJobState() { - return 1 - } - - @Override - protected Map queryJobStates(List jobIDs, - Duration timeout = Duration.ZERO) { - StringBuilder queryCommand = new StringBuilder(getQueryJobStatesCommand()) - if (jobIDs && !jobIDs.empty) { - queryCommand << " -j " << jobIDs.collect { it }.join(",") - } - - if (isTrackingOfUserJobsEnabled) - queryCommand << " -u $userIDForQueries " - - ExecutionResult er = executionService.execute(queryCommand.toString()) - List resultLines = er.stdout - - Map result = [:] - - if (!er.successful) { - throw new BEException("Execution failed. ${er.toStatusLine()}") - } else { - if (resultLines.size() > 0) { - for (String line : resultLines) { - line = line.trim() - if (line.length() == 0) continue - if (!RoddyConversionHelperMethods.isInteger(line.substring(0, 1))) - continue //Filter out lines which have been missed which do not start with a number. - - String[] split = line.split("\\|") - final int ID = getColumnOfJobID() - final int JOBSTATE = getColumnOfJobState() - - BEJobID jobID = new BEJobID(split[ID]) - JobState js = parseJobState(split[JOBSTATE]) - - result.put(jobID, js) - } - } - } - return result - } - - @Override - protected JobState parseJobState(String stateString) { - JobState js = JobState.UNKNOWN - - if (stateString == "RUNNING") - js = JobState.RUNNING - if (stateString == "SUSPENDED") - js = JobState.SUSPENDED - if (stateString == "PENDING") - js = JobState.HOLD - if (stateString == "CANCELLED+") - js = JobState.ABORTED - if (stateString == "COMPLETED") - js = JobState.COMPLETED_SUCCESSFUL - if (stateString == "FAILED") - js = JobState.FAILED - - return js - } - - @Override - Map queryExtendedJobStateById(List jobIds, - Duration timeout = Duration.ZERO) { - Map queriedExtendedStates = [:] - for (int i = 0; i < jobIds.size(); i++) { - ExecutionResult er = executionService.execute(getExtendedQueryJobStatesCommand() + " " + jobIds[i] + " -o") - - if (er != null && er.successful) { - queriedExtendedStates = this.processExtendedOutput(er.stdout.join("\n"), queriedExtendedStates) - } else { - er = executionService.execute("sacct -j " + jobIds[i] + " -o --json") - if (er != null && er.successful) { - queriedExtendedStates = this.processExtendedOutputFromJson(er.stdout.join("\n"), queriedExtendedStates) - } else { - throw new BEException("Extended job states couldn't be retrieved: ${er.toStatusLine()}") - } - } - } - return queriedExtendedStates - } - - /** - * Reads the scontrol output and creates GenericJobInfo objects - * @param resultLines - Input of ExecutionResult object - * @return map with jobid as key - */ - protected Map processExtendedOutput(String stdout, Map result) { - // Create a complex line object which will be used for further parsing. - ComplexLine line = new ComplexLine(stdout) - - Collection splitted = line.splitBy(" ").findAll { it } - if (splitted.size() > 1) { - Map jobResult = [:] - - for (int i = 0; i < splitted.size(); i++) { - String[] jobKeyValue = splitted[i].split("=") - if (jobKeyValue.size() > 1) { - jobResult.put(jobKeyValue[0], jobKeyValue[1]) - } - } - BEJobID jobID - String JOBID = jobResult["JobId"] - try { - jobID = new BEJobID(JOBID) - } catch (Exception exp) { - throw new BEException("Job ID '${JOBID}' could not be transformed to BEJobID ") - } - List dependIDs = [] - GenericJobInfo jobInfo = new GenericJobInfo(jobResult["JobName"], new File(jobResult["Command"]), jobID, null, dependIDs) - - /** Directories and files */ - jobInfo.inputFile = new File(jobResult["StdIn"]) - jobInfo.logFile = new File(jobResult["StdOut"]) - jobInfo.user = jobResult["UserId"] - jobInfo.submissionHost = jobResult["BatchHost"] - jobInfo.errorLogFile = new File(jobResult["StdErr"]) - jobInfo.execHome = jobResult["WorkDir"] - - /** Status info */ - jobInfo.jobState = parseJobState(jobResult["JobState"]) - jobInfo.exitCode = jobInfo.jobState == JobState.COMPLETED_SUCCESSFUL ? 0 : (jobResult["ExitCode"].split(":")[0] as Integer) - jobInfo.pendReason = jobResult["Reason"] - - /** Resources */ - String queue = jobResult["Partition"] - Duration runLimit = safelyParseColonSeparatedDuration(jobResult["TimeLimit"]) - Duration runTime = safelyParseColonSeparatedDuration(jobResult["RunTime"]) - jobInfo.runTime = runTime - BufferValue memory = safelyCastToBufferValue(jobResult["mem"]) - Integer cores = withCaughtAndLoggedException { jobResult["NumCPUs"] as Integer } - Integer nodes = withCaughtAndLoggedException { jobResult["NumNodes"] as Integer } - jobInfo.askedResources = new ResourceSet(null, null, null, runLimit, null, queue, null) - jobInfo.usedResources = new ResourceSet(memory, cores, nodes, runTime, null, queue, null) - - /** Timestamps */ - jobInfo.submitTime = parseTime(jobResult["SubmitTime"]) - jobInfo.startTime = parseTime(jobResult["StartTime"]) - jobInfo.endTime = parseTime(jobResult["EndTime"]) - jobInfo.eligibleTime = parseTime(jobResult["EligibleTime"]) - - result.put(jobID, jobInfo) - } - - return result - } - - Duration safelyParseColonSeparatedDuration(String value) { - withCaughtAndLoggedException { - value ? parseColonSeparatedHHMMSSDuration(value) : null - } - } - - /** - * Reads the sacct output as Json and creates GenericJobInfo objects - * @param resultLines - Input of ExecutionResult object - * @return map with jobid as key - */ - protected Map processExtendedOutputFromJson(String rawJson, Map result) { - if (!rawJson) - return result - - Object parsedJson = new JsonSlurper().parseText(rawJson) - List records = (List) parsedJson["jobs"] - for (jobResult in records) { - GenericJobInfo jobInfo - BEJobID jobID - String JOBID = jobResult["job_id"] - try { - jobID = new BEJobID(JOBID) - } catch (Exception exp) { - throw new BEException("Job ID '${JOBID}' could not be transformed to BEJobID ") - } - - List dependIDs = [] - jobInfo = new GenericJobInfo(jobResult["name"] as String, null, jobID, null, dependIDs) - - /** Common */ - jobInfo.user = jobResult["user"] - jobInfo.userGroup = jobResult["group"] - jobInfo.jobGroup = jobResult["group"] - jobInfo.priority = jobResult["priority"] - jobInfo.executionHosts = jobResult["nodes"] as List - - /** Status info */ - jobInfo.jobState = parseJobState(jobResult["state"]["current"] as String) - jobInfo.exitCode = jobInfo.jobState == JobState.COMPLETED_SUCCESSFUL ? 0 : (jobResult["exit_code"]["return_code"] as Integer) - jobInfo.pendReason = jobResult["state"]["reason"] - - /** Resources */ - String queue = jobResult["partition"] - Duration runTime = Duration.ofSeconds(jobResult["time"]["elapsed"] as long) - BufferValue memory = safelyCastToBufferValue(jobResult["required"]["memory"] as String) - Integer cores = withCaughtAndLoggedException { jobResult["required"]["CPUs"] as Integer } - cores = cores == 0 ? null : cores - Integer nodes = withCaughtAndLoggedException { jobResult["allocation_nodes"] as Integer } - - jobInfo.usedResources = new ResourceSet(memory, cores, nodes, runTime, null, queue, null) - jobInfo.askedResources = new ResourceSet(null, null, null, null, null, queue, null) - jobInfo.runTime = runTime - - /** Directories and files */ - jobInfo.execHome = jobResult["working_directory"] - - /** Timestamps */ - jobInfo.submitTime = parseTimeOfEpochSecond(jobResult["time"]["submission"] as String) - jobInfo.startTime = parseTimeOfEpochSecond(jobResult["time"]["start"] as String) - jobInfo.endTime = parseTimeOfEpochSecond(jobResult["time"]["end"] as String) - - result.put(jobID, jobInfo) - } - return result - } - - private ZonedDateTime parseTime(String str) { - return withCaughtAndLoggedException { - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss") - LocalDateTime localDateTime = LocalDateTime.parse(str, formatter) - return ZonedDateTime.of(localDateTime, TIME_ZONE_ID) - } - } - - private ZonedDateTime parseTimeOfEpochSecond(String str) { - return withCaughtAndLoggedException { ZonedDateTime.ofInstant(Instant.ofEpochSecond(str as long), TIME_ZONE_ID) } - } - - BufferValue safelyCastToBufferValue(String MAX_MEM) { - withCaughtAndLoggedException { - if (MAX_MEM) { - String bufferSize = MAX_MEM.find("([0-9]*[.])?[0-9]+") - String unit = MAX_MEM.find("[a-zA-Z]+") - BufferUnit bufferUnit = unit == "G" ? BufferUnit.g : BufferUnit.m - return new BufferValue(bufferSize, bufferUnit) - } - return null - } - } - - - @Override - protected ExecutionResult executeStartHeldJobs(List jobIDs) { - String command = "scontrol release ${jobIDs*.id.join(",")}" - return executionService.execute(command, false) - } - - @Override - ExecutionResult executeKillJobs(List jobIDs) { - String command = "scancel ${jobIDs*.id.join(",")}" - return executionService.execute(command, false) - } - - @Override - List getEnvironmentVariableGlobs() { - return Collections.unmodifiableList(["SLURM_*"]) - } - - @Override - String getQueryJobStatesCommand() { - return "squeue --format='%i|%T'" - } - - @Override - String getExtendedQueryJobStatesCommand() { - return "scontrol show job" - } - - @Override - void createComputeParameter(ResourceSet resourceSet, LinkedHashMultimap parameters) { - int nodes = resourceSet.isNodesSet() ? resourceSet.getNodes() : 1 - int cores = resourceSet.isCoresSet() ? resourceSet.getCores() : 1 - String nVal = "--nodes=" + nodes - String cVal = " --cores-per-socket=" + cores - parameters.put(nVal, cVal) - } - - void createQueueParameter(LinkedHashMultimap parameters, String queue) { - parameters.put('-p', queue) - } - - @Override - void createWalltimeParameter(LinkedHashMultimap parameters, ResourceSet resourceSet) { - parameters.put('--time=' + TimeUnit.fromDuration(resourceSet.walltime).toHourString(), " ") - } - - @Override - void createMemoryParameter(LinkedHashMultimap parameters, ResourceSet resourceSet) { - parameters.put('--mem=' + resourceSet.getMem().toString(BufferUnit.M), " ") - } - - @Override - void createStorageParameters(LinkedHashMultimap parameters, ResourceSet resourceSet) { - } - - @Override - String getJobIdVariable() { - return "SLURM_JOB_ID" - } - - @Override - String getJobNameVariable() { - return "SLURM_JOB_NAME" - } - - @Override - String getNodeFileVariable() { - return "SLURM_JOB_NODELIST" - } - - @Override - String getSubmitHostVariable() { - return "SLURM_SUBMIT_HOST" - } - - @Override - String getSubmitDirectoryVariable() { - return "SLURM_SUBMIT_DIR" - } - - @Override - String getQueueVariable() { - return "SLURM_JOB_PARTITION" - } - - @Override - String getSubmissionCommand() { - return "sbatch" - } - -} +/* + * Copyright (c) 2022 German Cancer Research Center (DKFZ). + * + * Distributed under the MIT License (license terms are at https://www.github.com/eilslabs/Roddy/LICENSE.txt). + */ + +package de.dkfz.roddy.execution.jobs.cluster.slurm + +import com.google.common.collect.LinkedHashMultimap +import de.dkfz.roddy.BEException +import de.dkfz.roddy.config.ResourceSet +import de.dkfz.roddy.execution.BEExecutionService +import de.dkfz.roddy.execution.io.ExecutionResult +import de.dkfz.roddy.execution.jobs.* +import de.dkfz.roddy.execution.jobs.cluster.GridEngineBasedJobManager +import de.dkfz.roddy.tools.* +import groovy.json.JsonSlurper +import groovy.transform.CompileStatic + +import java.time.* +import java.time.format.DateTimeFormatter + +@CompileStatic +class SlurmJobManager extends GridEngineBasedJobManager { + + private final ZoneId TIME_ZONE_ID + + SlurmJobManager(BEExecutionService executionService, JobManagerOptions parms) { + super(executionService, parms) + TIME_ZONE_ID = parms.timeZoneId + } + + @Override + protected SlurmSubmissionCommand createCommand(BEJob job) { + SlurmSubmissionCommand ssc = new SlurmSubmissionCommand(this, job, job.jobName, [], job.parameters, job.parentJobIDs*.id, + job.tool?.getAbsolutePath() ?: job.getToolScript()) + return ssc + } + + @Override + GenericJobInfo parseGenericJobInfo(String commandString) { + throw new BEException("parseGenericJobInfo is not implemented") + } + + + @Override + String parseJobID(String commandOutput) { + return commandOutput + } + + @Override + protected int getColumnOfJobID() { + return 0 + } + + @Override + protected int getColumnOfJobState() { + return 1 + } + + @Override + protected Map queryJobStates(List jobIDs, + Duration timeout = Duration.ZERO) { + StringBuilder queryCommand = new StringBuilder(getQueryJobStatesCommand()) + if (jobIDs && !jobIDs.empty) { + queryCommand << " -j " << jobIDs.collect { it }.join(",") + } + + if (isTrackingOfUserJobsEnabled) + queryCommand << " -u $userIDForQueries " + + ExecutionResult er = executionService.execute(queryCommand.toString()) + List resultLines = er.stdout + + Map result = [:] + + if (!er.successful) { + throw new BEException("Execution failed. ${er.toStatusLine()}") + } else { + if (resultLines.size() > 0) { + for (String line : resultLines) { + line = line.trim() + if (line.length() == 0) continue + if (!RoddyConversionHelperMethods.isInteger(line.substring(0, 1))) + continue //Filter out lines which have been missed which do not start with a number. + + String[] split = line.split("\\|") + final int ID = getColumnOfJobID() + final int JOBSTATE = getColumnOfJobState() + + BEJobID jobID = new BEJobID(split[ID]) + JobState js = parseJobState(split[JOBSTATE]) + + result.put(jobID, js) + } + } + } + return result + } + + @Override + protected JobState parseJobState(String stateString) { + JobState js = JobState.UNKNOWN + + if (stateString == "RUNNING") + js = JobState.RUNNING + if (stateString == "SUSPENDED") + js = JobState.SUSPENDED + if (stateString == "PENDING") + js = JobState.HOLD + if (stateString == "CANCELLED+") + js = JobState.ABORTED + if (stateString == "COMPLETED") + js = JobState.COMPLETED_SUCCESSFUL + if (stateString == "FAILED") + js = JobState.FAILED + + return js + } + + @Override + Map queryExtendedJobStateById(List jobIds, + Duration timeout = Duration.ZERO) { + Map queriedExtendedStates = [:] + for (int i = 0; i < jobIds.size(); i++) { + ExecutionResult er = executionService.execute(getExtendedQueryJobStatesCommand() + " " + jobIds[i] + " -o") + + if (er != null && er.successful) { + queriedExtendedStates = this.processExtendedOutput(er.stdout.join("\n"), queriedExtendedStates) + } else { + er = executionService.execute("sacct -j " + jobIds[i] + " -o --json") + if (er != null && er.successful) { + queriedExtendedStates = this.processExtendedOutputFromJson(er.stdout.join("\n"), queriedExtendedStates) + } else { + throw new BEException("Extended job states couldn't be retrieved: ${er.toStatusLine()}") + } + } + } + return queriedExtendedStates + } + + /** + * Reads the scontrol output and creates GenericJobInfo objects + * @param resultLines - Input of ExecutionResult object + * @return map with jobid as key + */ + protected Map processExtendedOutput(String stdout, Map result) { + // Create a complex line object which will be used for further parsing. + ComplexLine line = new ComplexLine(stdout) + + Collection splitted = line.splitBy(" ").findAll { it } + if (splitted.size() > 1) { + Map jobResult = [:] + + for (int i = 0; i < splitted.size(); i++) { + String[] jobKeyValue = splitted[i].split("=") + if (jobKeyValue.size() > 1) { + jobResult.put(jobKeyValue[0], jobKeyValue[1]) + } + } + BEJobID jobID + String JOBID = jobResult["JobId"] + try { + jobID = new BEJobID(JOBID) + } catch (Exception exp) { + throw new BEException("Job ID '${JOBID}' could not be transformed to BEJobID ") + } + List dependIDs = [] + GenericJobInfo jobInfo = new GenericJobInfo(jobResult["JobName"], new File(jobResult["Command"]), jobID, null, dependIDs) + + /** Directories and files */ + jobInfo.inputFile = new File(jobResult["StdIn"]) + jobInfo.logFile = new File(jobResult["StdOut"]) + jobInfo.user = jobResult["UserId"] + jobInfo.submissionHost = jobResult["BatchHost"] + jobInfo.errorLogFile = new File(jobResult["StdErr"]) + jobInfo.execHome = jobResult["WorkDir"] + + /** Status info */ + jobInfo.jobState = parseJobState(jobResult["JobState"]) + jobInfo.exitCode = jobInfo.jobState == JobState.COMPLETED_SUCCESSFUL ? 0 : (jobResult["ExitCode"].split(":")[0] as Integer) + jobInfo.pendReason = jobResult["Reason"] + + /** Resources */ + String queue = jobResult["Partition"] + Duration runLimit = safelyParseColonSeparatedDuration(jobResult["TimeLimit"]) + Duration runTime = safelyParseColonSeparatedDuration(jobResult["RunTime"]) + jobInfo.runTime = runTime + BufferValue memory = safelyCastToBufferValue(jobResult["mem"]) + Integer cores = withCaughtAndLoggedException { jobResult["NumCPUs"] as Integer } + Integer nodes = withCaughtAndLoggedException { jobResult["NumNodes"] as Integer } + jobInfo.askedResources = new ResourceSet(null, null, null, runLimit, null, queue, null) + jobInfo.usedResources = new ResourceSet(memory, cores, nodes, runTime, null, queue, null) + + /** Timestamps */ + jobInfo.submitTime = parseTime(jobResult["SubmitTime"]) + jobInfo.startTime = parseTime(jobResult["StartTime"]) + jobInfo.endTime = parseTime(jobResult["EndTime"]) + jobInfo.eligibleTime = parseTime(jobResult["EligibleTime"]) + + result.put(jobID, jobInfo) + } + + return result + } + + Duration safelyParseColonSeparatedDuration(String value) { + withCaughtAndLoggedException { + value ? parseColonSeparatedHHMMSSDuration(value) : null + } + } + + /** + * Reads the sacct output as Json and creates GenericJobInfo objects + * @param resultLines - Input of ExecutionResult object + * @return map with jobid as key + */ + protected Map processExtendedOutputFromJson(String rawJson, Map result) { + if (!rawJson) + return result + + Object parsedJson = new JsonSlurper().parseText(rawJson) + List records = (List) parsedJson["jobs"] + for (jobResult in records) { + GenericJobInfo jobInfo + BEJobID jobID + String JOBID = jobResult["job_id"] + try { + jobID = new BEJobID(JOBID) + } catch (Exception exp) { + throw new BEException("Job ID '${JOBID}' could not be transformed to BEJobID ") + } + + List dependIDs = [] + jobInfo = new GenericJobInfo(jobResult["name"] as String, null, jobID, null, dependIDs) + + /** Common */ + jobInfo.user = jobResult["user"] + jobInfo.userGroup = jobResult["group"] + jobInfo.jobGroup = jobResult["group"] + jobInfo.priority = jobResult["priority"] + jobInfo.executionHosts = jobResult["nodes"] as List + + /** Status info */ + jobInfo.jobState = parseJobState(jobResult["state"]["current"] as String) + jobInfo.exitCode = jobInfo.jobState == JobState.COMPLETED_SUCCESSFUL ? 0 : (jobResult["exit_code"]["return_code"] as Integer) + jobInfo.pendReason = jobResult["state"]["reason"] + + /** Resources */ + String queue = jobResult["partition"] + Duration runTime = Duration.ofSeconds(jobResult["time"]["elapsed"] as long) + BufferValue memory = safelyCastToBufferValue(jobResult["required"]["memory"] as String) + Integer cores = withCaughtAndLoggedException { jobResult["required"]["CPUs"] as Integer } + cores = cores == 0 ? null : cores + Integer nodes = withCaughtAndLoggedException { jobResult["allocation_nodes"] as Integer } + + jobInfo.usedResources = new ResourceSet(memory, cores, nodes, runTime, null, queue, null) + jobInfo.askedResources = new ResourceSet(null, null, null, null, null, queue, null) + jobInfo.runTime = runTime + + /** Directories and files */ + jobInfo.execHome = jobResult["working_directory"] + + /** Timestamps */ + jobInfo.submitTime = parseTimeOfEpochSecond(jobResult["time"]["submission"] as String) + jobInfo.startTime = parseTimeOfEpochSecond(jobResult["time"]["start"] as String) + jobInfo.endTime = parseTimeOfEpochSecond(jobResult["time"]["end"] as String) + + result.put(jobID, jobInfo) + } + return result + } + + private ZonedDateTime parseTime(String str) { + return withCaughtAndLoggedException { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss") + LocalDateTime localDateTime = LocalDateTime.parse(str, formatter) + return ZonedDateTime.of(localDateTime, TIME_ZONE_ID) + } + } + + private ZonedDateTime parseTimeOfEpochSecond(String str) { + return withCaughtAndLoggedException { ZonedDateTime.ofInstant(Instant.ofEpochSecond(str as long), TIME_ZONE_ID) } + } + + BufferValue safelyCastToBufferValue(String MAX_MEM) { + withCaughtAndLoggedException { + if (MAX_MEM) { + String bufferSize = MAX_MEM.find("([0-9]*[.])?[0-9]+") + String unit = MAX_MEM.find("[a-zA-Z]+") + BufferUnit bufferUnit = unit == "G" ? BufferUnit.g : BufferUnit.m + return new BufferValue(bufferSize, bufferUnit) + } + return null + } + } + + + @Override + protected ExecutionResult executeStartHeldJobs(List jobIDs) { + String command = "scontrol release ${jobIDs*.id.join(",")}" + return executionService.execute(command, false) + } + + @Override + ExecutionResult executeKillJobs(List jobIDs) { + String command = "scancel ${jobIDs*.id.join(",")}" + return executionService.execute(command, false) + } + + @Override + List getEnvironmentVariableGlobs() { + return Collections.unmodifiableList(["SLURM_*"]) + } + + @Override + String getQueryJobStatesCommand() { + return "squeue --format='%i|%T'" + } + + @Override + String getExtendedQueryJobStatesCommand() { + return "scontrol show job" + } + + @Override + void createComputeParameter(ResourceSet resourceSet, LinkedHashMultimap parameters) { + int nodes = resourceSet.isNodesSet() ? resourceSet.getNodes() : 1 + int cores = resourceSet.isCoresSet() ? resourceSet.getCores() : 1 + String nVal = "--nodes=" + nodes + String cVal = " --cores-per-socket=" + cores + parameters.put(nVal, cVal) + } + + void createQueueParameter(LinkedHashMultimap parameters, String queue) { + parameters.put('-p', queue) + } + + @Override + void createWalltimeParameter(LinkedHashMultimap parameters, ResourceSet resourceSet) { + parameters.put('--time=' + TimeUnit.fromDuration(resourceSet.walltime).toHourString(), " ") + } + + @Override + void createMemoryParameter(LinkedHashMultimap parameters, ResourceSet resourceSet) { + parameters.put('--mem=' + resourceSet.getMem().toString(BufferUnit.M), " ") + } + + @Override + void createStorageParameters(LinkedHashMultimap parameters, ResourceSet resourceSet) { + } + + @Override + String getJobIdVariable() { + return "SLURM_JOB_ID" + } + + @Override + String getJobNameVariable() { + return "SLURM_JOB_NAME" + } + + @Override + String getNodeFileVariable() { + return "SLURM_JOB_NODELIST" + } + + @Override + String getSubmitHostVariable() { + return "SLURM_SUBMIT_HOST" + } + + @Override + String getSubmitDirectoryVariable() { + return "SLURM_SUBMIT_DIR" + } + + @Override + String getQueueVariable() { + return "SLURM_JOB_PARTITION" + } + + @Override + String getSubmissionCommand() { + return "sbatch" + } + +} diff --git a/src/main/groovy/de/dkfz/roddy/execution/jobs/cluster/slurm/SlurmSubmissionCommand.groovy b/src/main/groovy/de/dkfz/roddy/execution/jobs/cluster/slurm/SlurmSubmissionCommand.groovy index b535825d..929e8a43 100644 --- a/src/main/groovy/de/dkfz/roddy/execution/jobs/cluster/slurm/SlurmSubmissionCommand.groovy +++ b/src/main/groovy/de/dkfz/roddy/execution/jobs/cluster/slurm/SlurmSubmissionCommand.groovy @@ -1,134 +1,134 @@ -/* - * Copyright (c) 2022 German Cancer Research Center (Deutsches Krebsforschungszentrum, DKFZ).. - * - * Distributed under the MIT License (license terms are at https://www.github.com/TheRoddyWMS/Roddy/LICENSE.txt). - */ - -package de.dkfz.roddy.execution.jobs.cluster.slurm - -import de.dkfz.roddy.config.JobLog -import de.dkfz.roddy.execution.jobs.BEJob -import de.dkfz.roddy.execution.jobs.BatchEuphoriaJobManager -import de.dkfz.roddy.execution.jobs.ProcessingParameters -import de.dkfz.roddy.execution.jobs.cluster.GridEngineBasedSubmissionCommand -import groovy.transform.CompileStatic - -import static de.dkfz.roddy.StringConstants.* - -@CompileStatic -class SlurmSubmissionCommand extends GridEngineBasedSubmissionCommand { - - public static final String NONE = "none" - public static final String AFTEROK = "afterok" - public static final String PARM_DEPENDS = " --dependency=" - - SlurmSubmissionCommand(BatchEuphoriaJobManager parentJobManager, BEJob job, String jobName, - List processingParameters, Map environmentVariables, - List dependencyIDs, String command) { - super(parentJobManager, job, jobName, processingParameters, environmentVariables, dependencyIDs, command) - } - - @Override - protected String getJobNameParameter() { - return "--job-name ${jobName}" as String - } - - @Override - protected String getHoldParameter() { - return "--hold" - } - - @Override - protected String getAccountNameParameter() { - return job.accountingName != null ? "--account=\"${job.accountingName}\"" : "" - } - - @Override - protected String getWorkingDirectoryParameter() { - return "--chdir ${job.getWorkingDirectory() ?: WORKING_DIRECTORY_DEFAULT}" as String - } - - @Override - protected String getLoggingParameter(JobLog jobLog) { - if (!jobLog.out && !jobLog.error) { - return "" - } else if (jobLog.out == jobLog.error) { - return "--output=${jobLog.out.replace(JobLog.JOB_ID, '%j')}" - } else { - return "--output=${jobLog.out.replace(JobLog.JOB_ID, '%j')} --error=${jobLog.error.replace(JobLog.JOB_ID, '%j')}" - } - } - - @Override - protected String getEmailParameter(String address) { - return address ? " --mail-user=" + address : "" - } - - protected String getParsableParameter() { - return "--parsable" - } - - @Override - protected String getGroupListParameter(String groupList) { - return " --grid=" + groupList - } - - @Override - protected String getUmaskString(String umask) { - return "" - } - - @Override - String getDependencyParameterName() { - return AFTEROK - } - - /** - * In this case i.e. afterokarray:...,afterok: - * A comma - * @return - */ - @Override - protected String getDependencyOptionSeparator() { - return ":" - } - - @Override - protected String getDependencyIDSeparator() { - return COLON - } - - @Override - protected String getAdditionalCommandParameters() { - return "${getParsableParameter()} --kill-on-invalid-dep=yes" as String - } - - @Override - protected String getEnvironmentString() { - return "" - } - - @Override - String assembleVariableExportParameters() { - List parameterStrings = [] - - if (passLocalEnvironment) - parameterStrings << "--get-user-env " - - List environmentStrings = parameters.collect { key, value -> - if (null == value) - "${key}" - else - "${key}=${value}" - } as List - - if (!environmentStrings.empty) - parameterStrings << "--export=\"${environmentStrings.join(COMMA)}\"".toString() - - return parameterStrings.join(WHITESPACE) - } - - protected String getDependsSuperParameter() { - PARM_DEPENDS - } -} \ No newline at end of file +/* + * Copyright (c) 2022 German Cancer Research Center (Deutsches Krebsforschungszentrum, DKFZ).. + * + * Distributed under the MIT License (license terms are at https://www.github.com/TheRoddyWMS/Roddy/LICENSE.txt). + */ + +package de.dkfz.roddy.execution.jobs.cluster.slurm + +import de.dkfz.roddy.config.JobLog +import de.dkfz.roddy.execution.jobs.BEJob +import de.dkfz.roddy.execution.jobs.BatchEuphoriaJobManager +import de.dkfz.roddy.execution.jobs.ProcessingParameters +import de.dkfz.roddy.execution.jobs.cluster.GridEngineBasedSubmissionCommand +import groovy.transform.CompileStatic + +import static de.dkfz.roddy.StringConstants.* + +@CompileStatic +class SlurmSubmissionCommand extends GridEngineBasedSubmissionCommand { + + public static final String NONE = "none" + public static final String AFTEROK = "afterok" + public static final String PARM_DEPENDS = " --dependency=" + + SlurmSubmissionCommand(BatchEuphoriaJobManager parentJobManager, BEJob job, String jobName, + List processingParameters, Map environmentVariables, + List dependencyIDs, String command) { + super(parentJobManager, job, jobName, processingParameters, environmentVariables, dependencyIDs, command) + } + + @Override + protected String getJobNameParameter() { + return "--job-name ${jobName}" as String + } + + @Override + protected String getHoldParameter() { + return "--hold" + } + + @Override + protected String getAccountNameParameter() { + return job.accountingName != null ? "--account=\"${job.accountingName}\"" : "" + } + + @Override + protected String getWorkingDirectoryParameter() { + return "--chdir ${job.getWorkingDirectory() ?: WORKING_DIRECTORY_DEFAULT}" as String + } + + @Override + protected String getLoggingParameter(JobLog jobLog) { + if (!jobLog.out && !jobLog.error) { + return "" + } else if (jobLog.out == jobLog.error) { + return "--output=${jobLog.out.replace(JobLog.JOB_ID, '%j')}" + } else { + return "--output=${jobLog.out.replace(JobLog.JOB_ID, '%j')} --error=${jobLog.error.replace(JobLog.JOB_ID, '%j')}" + } + } + + @Override + protected String getEmailParameter(String address) { + return address ? " --mail-user=" + address : "" + } + + protected String getParsableParameter() { + return "--parsable" + } + + @Override + protected String getGroupListParameter(String groupList) { + return " --grid=" + groupList + } + + @Override + protected String getUmaskString(String umask) { + return "" + } + + @Override + String getDependencyParameterName() { + return AFTEROK + } + + /** + * In this case i.e. afterokarray:...,afterok: + * A comma + * @return + */ + @Override + protected String getDependencyOptionSeparator() { + return ":" + } + + @Override + protected String getDependencyIDSeparator() { + return COLON + } + + @Override + protected String getAdditionalCommandParameters() { + return "${getParsableParameter()} --kill-on-invalid-dep=yes" as String + } + + @Override + protected String getEnvironmentString() { + return "" + } + + @Override + String assembleVariableExportParameters() { + List parameterStrings = [] + + if (passLocalEnvironment) + parameterStrings << "--get-user-env " + + List environmentStrings = parameters.collect { key, value -> + if (null == value) + "${key}" + else + "${key}=${value}" + } as List + + if (!environmentStrings.empty) + parameterStrings << "--export=\"${environmentStrings.join(COMMA)}\"".toString() + + return parameterStrings.join(WHITESPACE) + } + + protected String getDependsSuperParameter() { + PARM_DEPENDS + } +}