From b6e17d2a68496049acde74744482b1f13da135fe Mon Sep 17 00:00:00 2001 From: Robrecht Cannoodt Date: Thu, 2 Oct 2025 15:51:20 +0200 Subject: [PATCH] Add onFileStaged to TraceObserverV2 Signed-off-by: Robrecht Cannoodt --- .../src/main/groovy/nextflow/Session.groovy | 5 +++ .../groovy/nextflow/file/FilePorter.groovy | 10 +++++ .../nextflow/trace/TraceObserverV2.groovy | 8 ++++ .../trace/event/FileStagingEvent.groovy | 40 +++++++++++++++++++ 4 files changed, 63 insertions(+) create mode 100644 modules/nextflow/src/main/groovy/nextflow/trace/event/FileStagingEvent.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 2a07576f09..b4ba47e4e5 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -77,6 +77,7 @@ import nextflow.trace.TraceObserverV2 import nextflow.trace.TraceRecord import nextflow.trace.WorkflowStatsObserver import nextflow.trace.event.FilePublishEvent +import nextflow.trace.event.FileStagingEvent import nextflow.trace.event.TaskEvent import nextflow.trace.event.WorkflowOutputEvent import nextflow.util.Barrier @@ -1116,6 +1117,10 @@ class Session implements ISession { notifyEvent(observersV2, ob -> ob.onFilePublish(event)) } + void notifyFileStaged(FileStagingEvent event) { + notifyEvent(observersV2, ob -> ob.onFileStaged(event)) + } + void notifyFlowComplete() { notifyEvent(observersV1, ob -> ob.onFlowComplete()) notifyEvent(observersV2, ob -> ob.onFlowComplete()) diff --git a/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy b/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy index 89e720bc7b..b551a3d403 100644 --- a/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy @@ -39,6 +39,7 @@ import groovy.transform.ToString import groovy.util.logging.Slf4j import nextflow.Session import nextflow.exception.ProcessStageException +import nextflow.trace.event.FileStagingEvent import nextflow.extension.FilesEx import nextflow.util.CacheHelper import nextflow.util.Duration @@ -100,6 +101,15 @@ class FilePorter { if( batch.size() ) { log.trace "Stage foreign files: $batch" submitStagingActions(batch.foreignPaths) + + // Notify observers about file staging completion events + for( FileCopy copy : batch.foreignPaths ) { + session.notifyFileStaged(new FileStagingEvent( + source: copy.source, + target: copy.target + )) + } + log.trace "Stage foreign files completed: $batch" } } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy index 82beed1e90..bd9f5a745f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy @@ -21,6 +21,7 @@ import groovy.transform.CompileStatic import nextflow.Session import nextflow.processor.TaskProcessor import nextflow.trace.event.FilePublishEvent +import nextflow.trace.event.FileStagingEvent import nextflow.trace.event.TaskEvent import nextflow.trace.event.WorkflowOutputEvent @@ -135,4 +136,11 @@ interface TraceObserverV2 { */ default void onFilePublish(FilePublishEvent event) {} + /** + * Invoked when a file staging operation completes (after the file has been copied). + * + * @param event + */ + default void onFileStaged(FileStagingEvent event) {} + } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/event/FileStagingEvent.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/event/FileStagingEvent.groovy new file mode 100644 index 0000000000..e633de2edd --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/trace/event/FileStagingEvent.groovy @@ -0,0 +1,40 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.trace.event + +import java.nio.file.Path + +import groovy.transform.Canonical +import groovy.transform.CompileStatic + +/** + * Models a file staging event. + * + * @author Robrecht Cannoodt + */ +@Canonical +@CompileStatic +class FileStagingEvent { + /** + * The original source path (e.g., remote URL or path). + */ + Path source + /** + * The target staged path in the work directory. + */ + Path target +}