Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove daemon, improve tests, add tests, fix bugs #137

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ dependencies {
compile group: 'commons-cli', name: 'commons-cli', version: '1.2'
compile group: 'org.apache.commons', name: 'commons-text', version: '1.1'
compile 'com.google.guava:guava:23.0'
// compile 'com.github.eilslabs:RoddyToolLib:master-SNAPSHOT'
compile 'com.github.theroddywms:RoddyToolLib:2.0.0'
compile 'com.github.theroddywms:RoddyToolLib:2.1.0'
}

task writePom {
Expand Down
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.1.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.8-all.zip
2 changes: 1 addition & 1 deletion gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`

# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS=""
DEFAULT_JVM_OPTS='"-Xmx64m"'

# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
Expand Down
2 changes: 1 addition & 1 deletion gradlew.bat
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%

@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS=
set DEFAULT_JVM_OPTS="-Xmx64m"

@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ abstract class BatchEuphoriaJobManager<C extends Command> {

protected boolean queryOnlyStartedJobs

protected final Duration maxAgeOfJobsForQueries
protected final Duration maxTrackingTimeForFinishedJobs

protected String userIDForQueries

Expand Down Expand Up @@ -85,7 +85,7 @@ abstract class BatchEuphoriaJobManager<C extends Command> {

this.isTrackingOfUserJobsEnabled = parms.userIdForJobQueries as boolean
this.queryOnlyStartedJobs = parms.trackOnlyStartedJobs
this.maxAgeOfJobsForQueries = parms.maxAgeOfJobsForJobQueries
this.maxTrackingTimeForFinishedJobs = parms.maxTrackingTimeForFinishedJobs
this.userIDForQueries = parms.userIdForJobQueries

this.userEmail = parms.userEmail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class GenericJobInfo {
ResourceSet askedResources
ResourceSet usedResources
String jobName
File tool
String command
BEJobID jobID

/** The date-time the job entered the queue. */
Expand Down Expand Up @@ -81,9 +81,9 @@ class GenericJobInfo {
ZonedDateTime timeOfCalculation


GenericJobInfo(String jobName, File tool, BEJobID jobID, Map<String, String> parameters, List<String> parentJobIDs) {
GenericJobInfo(String jobName, String command, BEJobID jobID, Map<String, String> parameters, List<String> parentJobIDs) {
this.jobName = jobName
this.tool = tool
this.command = command
this.jobID = jobID
this.parameters = parameters
this.parentJobIDs = parentJobIDs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class JobManagerOptions {

String userIdForJobQueries

Duration maxAgeOfJobsForJobQueries
Duration maxTrackingTimeForFinishedJobs

boolean trackOnlyStartedJobs

Expand Down Expand Up @@ -91,7 +91,7 @@ class JobManagerOptionsBuilder {
JobManagerOptionsBuilder() {
trackOnlyStartedJobs = false
updateInterval = Duration.ofMinutes(5)
maxAgeOfJobsForJobQueries = Duration.ofDays(14)
maxTrackingTimeForFinishedJobs = Duration.ofDays(14)
createDaemon = false
requestMemoryIsEnabled = true
requestWalltimeIsEnabled = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ import de.dkfz.roddy.tools.BufferValue
import de.dkfz.roddy.tools.ComplexLine
import de.dkfz.roddy.tools.TimeUnit
import groovy.transform.CompileStatic
import static de.dkfz.roddy.StringConstants.SPLIT_COLON
import static de.dkfz.roddy.StringConstants.SPLIT_COMMA
import static de.dkfz.roddy.StringConstants.SPLIT_EQUALS

import static de.dkfz.roddy.StringConstants.*

/**
* Used to convert commands from cli to e.g. GenericJobInfo
Expand Down Expand Up @@ -53,11 +52,11 @@ class LSFCommandParser {

if (!commandString.startsWith("bsub")) return // It is obviously not a PBS call

String[] splitted = line.splitBy(" ").findAll { it }
Collection<String> splitted = line.splitBy(" ").findAll { it }
script = splitted[-1]
jobName = "not readable"

for (int i = 0; i < splitted.length - 1; i++) {
for (int i = 0; i < splitted.size() - 1; i++) {
String option = splitted[i]
if (!option.startsWith("-")) continue // It is not an option but a parameter or a text (e.g. bsub, script)

Expand Down Expand Up @@ -124,7 +123,7 @@ class LSFCommandParser {
}

GenericJobInfo toGenericJobInfo() {
GenericJobInfo jInfo = new GenericJobInfo(jobName, new File(script), jobID, parameters, dependencies)
GenericJobInfo jInfo = new GenericJobInfo(jobName, script, jobID, parameters, dependencies)
ResourceSet askedResources = new ResourceSet(null, memory ? new BufferValue(memory as Integer, bufferUnit) : null,
cores ? cores as Integer : null, nodes ? nodes as Integer : null, walltime ? new TimeUnit(walltime) : null,
null, null, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,24 @@ class LSFJobManager extends AbstractLSFJobManager {
return date
}

/**
* Important here is, that LSF puts " L" or other status codes at the end of some dates, e.g. FINISH_DATE
* Thus said, " L" does not apply for all dates reported by LSF! This method just removes the last two characters
* of the time string.
*/
static String stripAwayStatusInfo(String time) {
dankwart-de marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There unfortunately, is not the slightest check here that the input really is a time string. Please harden the parser by asserting preconditions on the input.

if (time)
return time[0..-3]
return null
}

@Override
Map<BEJobID, GenericJobInfo> queryExtendedJobStateById(List<BEJobID> jobIds) {
Map<BEJobID, GenericJobInfo> queriedExtendedStates = [:]
for (BEJobID id : jobIds) {
Map<String, Object> jobDetails = runBjobs([id], true).get(id)
queriedExtendedStates.put(id, convertJobDetailsMapToGenericJobInfoobject(jobDetails))
Map<String, String> jobDetails = runBjobs([id], true)[id]
if (jobDetails) // Ignore filtered / nonexistent ids
queriedExtendedStates.put(id, convertJobDetailsMapToGenericJobInfoObject(jobDetails))
}
return queriedExtendedStates
}
Expand Down Expand Up @@ -102,7 +114,7 @@ class LSFJobManager extends AbstractLSFJobManager {

}

Map<BEJobID, Map<String, Object>> runBjobs(List<BEJobID> jobIDs, boolean extended) {
Map<BEJobID, Map<String, String>> runBjobs(List<BEJobID> jobIDs, boolean extended) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you add a description of the output map structure?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still awaiting this ...

StringBuilder queryCommand = new StringBuilder(extended ? LSF_COMMAND_QUERY_EXTENDED_STATES : LSF_COMMAND_QUERY_STATES)

// user argument must be passed before the job IDs
Expand All @@ -121,12 +133,12 @@ class LSFJobManager extends AbstractLSFJobManager {
throw new BEException(error)
}

Map<BEJobID, Map<String, Object>> result = convertBJobsJsonOutputToResultMap(resultLines.join("\n"))
return filterJobMapByAge(result, maxAgeOfJobsForQueries)
Map<BEJobID, Map<String, String>> result = convertBJobsJsonOutputToResultMap(resultLines.join("\n"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

description of output map structure

return filterJobMapByAge(result, maxTrackingTimeForFinishedJobs)
}

static Map<BEJobID, Map<String, Object>> convertBJobsJsonOutputToResultMap(String rawJson) {
Map<BEJobID, Map<String, Object>> result = [:]
static Map<BEJobID, Map<String, String>> convertBJobsJsonOutputToResultMap(String rawJson) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

method documentation

Map<BEJobID, Map<String, String>> result = [:]

if (!rawJson)
return result
Expand All @@ -135,7 +147,7 @@ class LSFJobManager extends AbstractLSFJobManager {
List records = (List) parsedJson["RECORDS"]
for (record in records) {
BEJobID jobID = new BEJobID(record["JOBID"] as String)
result[jobID] = record as Map<String, Object>
result[jobID] = record as Map<String, String>
}

result
Expand All @@ -150,17 +162,16 @@ class LSFJobManager extends AbstractLSFJobManager {
* @return The map of records where too old entries are filtered out.
*/
@CompileStatic
static Map<BEJobID, Map<String, Object>> filterJobMapByAge(
Map<BEJobID, Map<String, Object>> records,
static Map<BEJobID, Map<String, String>> filterJobMapByAge(
Map<BEJobID, Map<String, String>> records,
Duration maxJobKeepDuration
) {
records.findAll { def k, def record ->
String finishTime = record["FINISH_TIME"]
boolean youngEnough = true
if (finishTime) {
String timeString = "${finishTime[0..-3]}"
withCaughtAndLoggedException {
ZonedDateTime _finishTime = parseTime(timeString)
ZonedDateTime _finishTime = parseTime(stripAwayStatusInfo(finishTime))
Duration timeSpan = Duration.between(_finishTime.toLocalDateTime(), LocalDateTime.now())
if (dateTimeHelper.durationExceeds(timeSpan, maxJobKeepDuration))
youngEnough = false
Expand All @@ -173,82 +184,95 @@ class LSFJobManager extends AbstractLSFJobManager {
/**
* Used by @getJobDetails to set JobInfo
*/
GenericJobInfo convertJobDetailsMapToGenericJobInfoobject(Map<String, Object> jobResult) {
GenericJobInfo convertJobDetailsMapToGenericJobInfoObject(Map<String, String> _jobResult) {
// Remove empty entries first to keep the output clean (use null, where the value is null or empty.)
Map<String, String> jobResult = _jobResult.findAll { String k, String v -> v }

GenericJobInfo jobInfo
BEJobID jobID
String JOBID = jobResult["JOBID"]
try {
jobID = new BEJobID(jobResult["JOBID"] as String)
jobID = new BEJobID(JOBID)
} catch (Exception exp) {
throw new BEException("Job ID '${jobResult["JOBID"]}' could not be transformed to BEJobID ")
throw new BEException("Job ID '${JOBID}' could not be transformed to BEJobID ")
}

List<String> dependIDs = ((String) jobResult["DEPENDENCY"]) ? ((String) jobResult["DEPENDENCY"]).tokenize(/&/).collect { it.find(/\d+/) } : null
jobInfo = new GenericJobInfo(jobResult["JOB_NAME"] as String ?: null, jobResult["COMMAND"] as String ? new File(jobResult["COMMAND"] as String) : null, jobID, null, dependIDs)

String queue = jobResult["QUEUE"] ?: null
Duration runTime = withCaughtAndLoggedException {
jobResult["RUN_TIME"] ? parseColonSeparatedHHMMSSDuration(jobResult["RUN_TIME"] as String) : null
}
List<String> dependIDs = jobResult["DEPENDENCY"]?.tokenize(/&/)?.collect { it.find(/\d+/) }
jobInfo = new GenericJobInfo(jobResult["JOB_NAME"], jobResult["COMMAND"], jobID, null, dependIDs)

/** Common */
jobInfo.user = jobResult["USER"]
jobInfo.userGroup = jobResult["USER_GROUP"]
jobInfo.description = jobResult["JOB_DESCRIPTION"]
jobInfo.projectName = jobResult["PROJ_NAME"]
jobInfo.jobGroup = jobResult["JOB_GROUP"]
jobInfo.priority = jobResult["JOB_PRIORITY"]
jobInfo.pidStr = jobResult["PIDS"]?.split(",")?.toList()
jobInfo.submissionHost = jobResult["FROM_HOST"]
jobInfo.executionHosts = jobResult["EXEC_HOST"]?.split(":")?.toList()

/** Resources */
String queue = jobResult["QUEUE"]
Duration runLimit = safelyParseColonSeparatedDuration(jobResult["RUNTIMELIMIT"])
Duration runTime = safelyParseColonSeparatedDuration(jobResult["RUN_TIME"])
BufferValue memory = safelyCastToBufferValue(jobResult["MAX_MEM"])
BufferValue swap = withCaughtAndLoggedException {
jobResult["SWAP"] ? new BufferValue((jobResult["SWAP"] as String).find("\\d+"), BufferUnit.m) : null
}
BufferValue memory = withCaughtAndLoggedException {
String unit = (jobResult["MAX_MEM"] as String).find("[a-zA-Z]+")
BufferUnit bufferUnit
if (unit == "Gbytes")
bufferUnit = BufferUnit.g
else
bufferUnit = BufferUnit.m
jobResult["MAX_MEM"] ? new BufferValue((jobResult["MAX_MEM"] as String).find("([0-9]*[.])?[0-9]+"), bufferUnit) : null
}
Duration runLimit = withCaughtAndLoggedException {
jobResult["RUNTIMELIMIT"] ? parseColonSeparatedHHMMSSDuration(jobResult["RUNTIMELIMIT"] as String) : null
String SWAP = jobResult["SWAP"]
SWAP ? new BufferValue(SWAP.find("\\d+"), BufferUnit.m) : null
}
Integer nodes = withCaughtAndLoggedException { jobResult["SLOTS"] ? jobResult["SLOTS"] as Integer : null }

ResourceSet usedResources = new ResourceSet(memory, null, nodes, runTime, null, queue, null)
jobInfo.setUsedResources(usedResources)

ResourceSet askedResources = new ResourceSet(null, null, null, runLimit, null, queue, null)
jobInfo.setAskedResources(askedResources)

jobInfo.setUser(jobResult["USER"] as String ?: null)
jobInfo.setDescription(jobResult["JOB_DESCRIPTION"] as String ?: null)
jobInfo.setProjectName(jobResult["PROJ_NAME"] as String ?: null)
jobInfo.setJobGroup(jobResult["JOB_GROUP"] as String ?: null)
jobInfo.setPriority(jobResult["JOB_PRIORITY"] as String ?: null)
jobInfo.setPidStr(jobResult["PIDS"] as String ? (jobResult["PIDS"] as String).split(",").toList() : null)
jobInfo.setJobState(parseJobState(jobResult["STAT"] as String))
jobInfo.setExitCode(jobInfo.jobState == JobState.COMPLETED_SUCCESSFUL ? 0 : (jobResult["EXIT_CODE"] ? Integer.valueOf(jobResult["EXIT_CODE"] as String) : null))
jobInfo.setSubmissionHost(jobResult["FROM_HOST"] as String ?: null)
jobInfo.setExecutionHosts(jobResult["EXEC_HOST"] as String ? (jobResult["EXEC_HOST"] as String).split(":").toList() : null)
Integer nodes = withCaughtAndLoggedException { jobResult["SLOTS"] as Integer }

jobInfo.usedResources = new ResourceSet(memory, null, nodes, runTime, null, queue, null)
jobInfo.askedResources = new ResourceSet(null, null, null, runLimit, null, queue, null)
jobInfo.resourceReq = jobResult["EFFECTIVE_RESREQ"]
jobInfo.runTime = runTime
jobInfo.cpuTime = safelyParseColonSeparatedDuration(jobResult["CPU_USED"])

/** Status info */
jobInfo.jobState = parseJobState(jobResult["STAT"])
jobInfo.exitCode = jobInfo.jobState == JobState.COMPLETED_SUCCESSFUL ? 0 : (jobResult["EXIT_CODE"] as Integer)
jobInfo.pendReason = jobResult["PEND_REASON"]

/** Directories and files */
jobInfo.cwd = jobResult["SUB_CWD"]
jobInfo.execCwd = jobResult["EXEC_CWD"]
jobInfo.logFile = getBjobsFile(jobResult["OUTPUT_FILE"], jobID, "out")
jobInfo.errorLogFile = getBjobsFile(jobResult["ERROR_FILE"], jobID, "err")
jobInfo.inputFile = jobResult["INPUT_FILE"] ? new File(jobResult["INPUT_FILE"]) : null
jobInfo.execHome = jobResult["EXEC_HOME"]

/** Timestamps */
jobInfo.submitTime = safelyParseTime(jobResult["SUBMIT_TIME"])
jobInfo.startTime = safelyParseTime(jobResult["START_TIME"])
jobInfo.endTime = safelyParseTime(stripAwayStatusInfo(jobResult["FINISH_TIME"]))

return jobInfo
}

Duration safelyParseColonSeparatedDuration(String value) {
withCaughtAndLoggedException {
jobInfo.setCpuTime(jobResult["CPU_USED"] ? parseColonSeparatedHHMMSSDuration(jobResult["CPU_USED"] as String) : null)
value ? parseColonSeparatedHHMMSSDuration(value) : null
}
jobInfo.setRunTime(runTime)
jobInfo.setUserGroup(jobResult["USER_GROUP"] as String ?: null)
jobInfo.setCwd(jobResult["SUB_CWD"] as String ?: null)
jobInfo.setPendReason(jobResult["PEND_REASON"] as String ?: null)
jobInfo.setExecCwd(jobResult["EXEC_CWD"] as String ?: null)
jobInfo.setLogFile(getBjobsFile(jobResult["OUTPUT_FILE"] as String, jobID, "out"))
jobInfo.setErrorLogFile(getBjobsFile(jobResult["ERROR_FILE"] as String, jobID, "err"))
jobInfo.setInputFile(jobResult["INPUT_FILE"] ? new File(jobResult["INPUT_FILE"] as String) : null)
jobInfo.setResourceReq(jobResult["EFFECTIVE_RESREQ"] as String ?: null)
jobInfo.setExecHome(jobResult["EXEC_HOME"] as String ?: null)

String submissionTime = jobResult["SUBMIT_TIME"]
String startTime = jobResult["START_TIME"]
String finishTime = jobResult["FINISH_TIME"]

if (submissionTime)
withCaughtAndLoggedException { jobInfo.setSubmitTime(parseTime(submissionTime)) }
if (startTime)
withCaughtAndLoggedException { jobInfo.setStartTime(parseTime(startTime as String)) }
if (finishTime)
withCaughtAndLoggedException { jobInfo.setEndTime(parseTime(finishTime[0..-3])) }
}

return jobInfo
ZonedDateTime safelyParseTime(String time) {
if (time)
return withCaughtAndLoggedException {
return parseTime(time)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the inner return necessary?

}
return null
}

BufferValue safelyCastToBufferValue(String MAX_MEM) {
withCaughtAndLoggedException {
if (MAX_MEM) {
String bufferSize = MAX_MEM.find("([0-9]*[.])?[0-9]+")
String unit = MAX_MEM.find("[a-zA-Z]+")
BufferUnit bufferUnit = unit == "Gbytes" ? BufferUnit.g : BufferUnit.m
return new BufferValue(bufferSize, bufferUnit)
}
return null
}
}

private File getBjobsFile(String s, BEJobID jobID, String type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ class LSFRestJobManager extends AbstractLSFJobManager {
*/
private GenericJobInfo setJobInfoForJobDetails(NodeChild jobDetails) {

GenericJobInfo jobInfo = new GenericJobInfo(jobDetails.getProperty("jobName").toString(), new File(jobDetails.getProperty("command").toString()), new BEJobID(jobDetails.getProperty("jobId").toString()), null, null)
GenericJobInfo jobInfo = new GenericJobInfo(jobDetails.getProperty("jobName").toString(), jobDetails.getProperty("command").toString(), new BEJobID(jobDetails.getProperty("jobId").toString()), null, null)

String queue = jobDetails.getProperty("queue").toString()
BufferValue swap = jobDetails.getProperty("swap") ? withCaughtAndLoggedException { new BufferValue(jobDetails.getProperty("swap").toString(), BufferUnit.m) } : null
Expand Down
Loading