Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package nextflow.processor

import nextflow.Global
import nextflow.Session
import nextflow.fusion.FusionHelper

import static nextflow.processor.TaskStatus.*

import java.nio.file.NoSuchFileException
Expand Down Expand Up @@ -249,6 +253,9 @@ abstract class TaskHandler {
catch( IOException e ) {
log.debug "[WARN] Cannot read trace file: $file -- Cause: ${e.message}"
}
// If Fusion is enabled read parse the use of accelerator form .command.log
if( Global.session && FusionHelper.isFusionEnabled(Global.session as Session) )
record.parseFusionAcceleratorUsage(task.workDir?.resolve(TaskRun.CMD_LOG))
}

return record
Expand Down
45 changes: 45 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class TraceRecord implements Serializable {
transient private CloudMachineInfo machineInfo
transient private ContainerMeta containerMeta
transient private Integer numSpotInterruptions
transient private Boolean acceleratorUsage

/**
* Convert the given value to a string
Expand Down Expand Up @@ -627,4 +628,48 @@ class TraceRecord implements Serializable {
void setContainerMeta(ContainerMeta meta) {
this.containerMeta = meta
}

Boolean getAcceleratorUsage() {
return acceleratorUsage
}

void setAcceleratorUsage(Boolean acc) {
this.acceleratorUsage = acc
}

void parseFusionAcceleratorUsage(Path file) {
this.acceleratorUsage = parseFusionAccelerator0(file)
}

/**
* Parses the Fusion accelerator value.
* Fusion writes FUSION_GPU_USED=true|false in the first line of the log file.
*/
private Boolean parseFusionAccelerator0(Path file) {
if ( !file.exists() ) {
return null
}

String line = file.withReader { it.readLine() }

if (!line) {
return null
}

line = line.trim()

if (!line.startsWith("FUSION_GPU_USED=")) {
return null
}

String value = line.substring("FUSION_GPU_USED=".length()).trim()

if (!value.equalsIgnoreCase("true") && !value.equalsIgnoreCase("false")) {
return null
}

return value.toBoolean()
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -369,4 +369,78 @@ class TraceRecordTest extends Specification {
rec2.getNumSpotInterruptions() == null
}

def 'should manage accelerator field and not persist it across serialization'() {
given:
def rec = new TraceRecord()

expect:
rec.getAcceleratorUsage() == null
and:
rec.acceleratorUsage == null

when:
rec.setAcceleratorUsage(true)

then:
rec.getAcceleratorUsage() == true
rec.acceleratorUsage == true

when:
rec.setAcceleratorUsage(false)

then:
rec.getAcceleratorUsage() == false
rec.acceleratorUsage == false

when:
def buf = rec.serialize()
def rec2 = TraceRecord.deserialize(buf)

then:
rec2.getAcceleratorUsage() == null
}

@Unroll
def 'should parse fusion accelerator from file'() {
given:
def rec = new TraceRecord()
def file = TestHelper.createInMemTempFile('fusion-log')
file.text = CONTENT

when:
rec.parseFusionAcceleratorUsage(file)

then:
rec.getAcceleratorUsage() == EXPECTED

where:
CONTENT | EXPECTED
'FUSION_GPU_USED=true\n' | true
'FUSION_GPU_USED=false\n' | false
'FUSION_GPU_USED=TRUE\n' | true
'FUSION_GPU_USED=FALSE\n' | false
' FUSION_GPU_USED=true \n' | true
' FUSION_GPU_USED=false \n' | false
'FUSION_GPU_USED=true \nother line' | true
'FUSION_GPU_USED=false\nother line' | false
'other content\n' | null
'FUSION_GPU=true\n' | null
'FUSION_GPU_USED=\n' | null
'FUSION_GPU_USED=invalid\n' | null
'FUSION_GPU_USED=123\n' | null
'' | null
}

def 'should parse fusion accelerator when file does not exist'() {
given:
def rec = new TraceRecord()
def file = Path.of('/non/existent/file.log')

when:
rec.parseFusionAcceleratorUsage(file)

then:
rec.getAcceleratorUsage() == null
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ class TowerClient implements TraceObserverV2 {
record.machineType = trace.getMachineInfo()?.type
record.priceModel = trace.getMachineInfo()?.priceModel?.toString()
record.numSpotInterruptions = trace.getNumSpotInterruptions()
record.acceleratorUsage = trace.getAcceleratorUsage()

return record
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,4 +560,29 @@ class TowerClientTest extends Specification {
req.tasks[0].numSpotInterruptions == 3
}

def 'should include accelerator in task map'() {
given:
def client = Spy(new TowerClient())
client.getWorkflowProgress(true) >> new WorkflowProgress()

def now = System.currentTimeMillis()
def trace = new TraceRecord([
taskId: 42,
process: 'foo',
workdir: "/work/dir",
cpus: 1,
submit: now-2000,
start: now-1000,
complete: now
])
trace.setAcceleratorUsage(true)

when:
def req = client.makeTasksReq([trace])

then:
req.tasks.size() == 1
req.tasks[0].acceleratorUsage == true
}

}
Loading