diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/Executor.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/Executor.groovy index 4fda581fc6..05d06ce320 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/Executor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/Executor.groovy @@ -16,6 +16,8 @@ package nextflow.executor +import nextflow.fusion.FusionHelper + import java.nio.file.Path import groovy.transform.CompileStatic @@ -128,6 +130,8 @@ abstract class Executor { } boolean isForeignFile(Path path) { + if ( isFusionEnabled() && path.scheme in FusionHelper.SUPPORTED_CLOUD_SCHEMES ) + return false path.scheme != getStageDir().scheme } diff --git a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionEnv.groovy b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionEnv.groovy index 28b10aa4c5..c6859f2e16 100644 --- a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionEnv.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionEnv.groovy @@ -17,6 +17,8 @@ package nextflow.fusion +import java.nio.file.Path + /** * Allow importing platform specific env variables in the Fusion context * @@ -25,4 +27,6 @@ package nextflow.fusion interface FusionEnv { Map getEnvironment(String scheme, FusionConfig config) + + Map getEnvironmentFromPath(Path path, FusionConfig config) } diff --git a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionEnvProvider.groovy b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionEnvProvider.groovy index 6e965f6db7..7460848148 100644 --- a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionEnvProvider.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionEnvProvider.groovy @@ -21,6 +21,9 @@ import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.exception.ReportWarningException import nextflow.plugin.Plugins + +import java.nio.file.Path + /** * Provider strategy for {@link FusionEnv} * @@ -49,6 +52,19 @@ class FusionEnvProvider { result.FUSION_CACHE_SIZE = "${config.cacheSize().toMega()}M" return result } + Map getEnvironmentFromPath(Path p) { + final config = FusionConfig.getConfig() + final list =Plugins.getExtensions(FusionEnv) + log.debug "Fusion environment extensions=$list" + final result = new HashMap() + for( FusionEnv it : list ) { + final env = it.getEnvironmentFromPath(p,config) + log.debug "Env for $p: $env" + if( env ) + result.putAll(env) + } + return result + } protected Map getFusionEnvironment(String scheme, FusionConfig config) { final list = Plugins.getExtensions(FusionEnv) @@ -56,6 +72,7 @@ class FusionEnvProvider { final result = new HashMap() for( FusionEnv it : list ) { final env = it.getEnvironment(scheme,config) + log.debug "Env for $scheme: $env" if( env ) result.putAll(env) } diff --git a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionHelper.groovy b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionHelper.groovy index 8119ab9d80..51247056a1 100644 --- a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionHelper.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionHelper.groovy @@ -35,6 +35,7 @@ import nextflow.io.BucketParser */ @CompileStatic class FusionHelper { + public static List SUPPORTED_CLOUD_SCHEMES = ['s3', 'az'] @Memoized static boolean isFusionEnabled(Session session) { diff --git a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionScriptLauncher.groovy b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionScriptLauncher.groovy index cde3ceb5eb..64b9e7ed7a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionScriptLauncher.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionScriptLauncher.groovy @@ -40,6 +40,7 @@ class FusionScriptLauncher extends BashWrapperBuilder { private String scheme private Path remoteWorkDir private Map env + private List externalInputs static FusionScriptLauncher create(TaskBean bean, String scheme) { @@ -50,9 +51,16 @@ class FusionScriptLauncher extends BashWrapperBuilder { bean.workDir = toContainerMount(bean.workDir, scheme) bean.targetDir = toContainerMount(bean.targetDir, scheme) + final externalInputs = new LinkedList() // remap input files to container mounted paths for( Map.Entry entry : new HashMap<>(bean.inputFiles).entrySet() ) { - bean.inputFiles.put( entry.key, toContainerMount(entry.value, scheme) ) + def inputScheme = scheme + if (entry.value.scheme && entry.value.scheme != scheme){ + inputScheme = entry.value.scheme + externalInputs.add(entry.value) + } + + bean.inputFiles.put( entry.key, toContainerMount(entry.value, inputScheme) ) } // make it change to the task work dir @@ -61,14 +69,15 @@ class FusionScriptLauncher extends BashWrapperBuilder { if( bean.scratch==null ) bean.scratch = false - return new FusionScriptLauncher(bean, scheme, remoteWorkDir) + return new FusionScriptLauncher(bean, scheme, remoteWorkDir, externalInputs) } - FusionScriptLauncher(TaskBean bean, String scheme, Path remoteWorkDir) { + FusionScriptLauncher(TaskBean bean, String scheme, Path remoteWorkDir, List external = List.of()) { super(bean) // keep track the google storage work dir this.scheme = scheme this.remoteWorkDir = remoteWorkDir + this.externalInputs = external } static protected String headerScript(TaskBean bean) { @@ -87,6 +96,9 @@ class FusionScriptLauncher extends BashWrapperBuilder { // foreign env final provider = new FusionEnvProvider() result.putAll(provider.getEnvironment(scheme)) + for (Path p: this.externalInputs){ + result.putAll(provider.getEnvironmentFromPath(p)) + } env = result } return env diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/fusion/AwsFusionEnv.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/fusion/AwsFusionEnv.groovy index 289ab3ed17..b5ee6ef868 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/fusion/AwsFusionEnv.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/fusion/AwsFusionEnv.groovy @@ -22,6 +22,9 @@ import nextflow.cloud.aws.config.AwsConfig import nextflow.fusion.FusionConfig import nextflow.fusion.FusionEnv import org.pf4j.Extension + +import java.nio.file.Path + /** * Implements {@link FusionEnv} for AWS cloud * @@ -71,4 +74,8 @@ class AwsFusionEnv implements FusionEnv { else return List.of() } + + Map getEnvironmentFromPath(Path path, FusionConfig config) { + return getEnvironment(path.scheme,config) + } } diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/fusion/AzFusionEnv.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/fusion/AzFusionEnv.groovy index 659934d6f8..24afb911db 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/fusion/AzFusionEnv.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/fusion/AzFusionEnv.groovy @@ -26,6 +26,8 @@ import nextflow.fusion.FusionConfig import nextflow.fusion.FusionEnv import org.pf4j.Extension +import java.nio.file.Path + /** * Implement environment provider for Azure specific variables * @@ -47,7 +49,14 @@ class AzFusionEnv implements FusionEnv { */ @Override Map getEnvironment(String scheme, FusionConfig config) { - if (scheme != 'az') { + if (scheme != 'az') { + return Collections. emptyMap() + } + return getEnvironmentFromPath(Global.session.workDir, config) + } + + Map getEnvironmentFromPath(Path workDir, FusionConfig config) { + if (workDir.scheme != 'az') { return Collections. emptyMap() } @@ -76,7 +85,7 @@ class AzFusionEnv implements FusionEnv { } // If no managed identity, use the standard environment with SAS token - result.AZURE_STORAGE_SAS_TOKEN = getOrCreateSasToken() + result.AZURE_STORAGE_SAS_TOKEN = getOrCreateSasToken(workDir) return result } @@ -85,7 +94,7 @@ class AzFusionEnv implements FusionEnv { * Return the SAS token if it is defined in the configuration, otherwise generate one based on the requested * authentication method. */ - synchronized String getOrCreateSasToken() { + synchronized String getOrCreateSasToken(Path workDir = Global.session.workDir) { final cfg = AzConfig.config // Check for incompatible configuration @@ -101,10 +110,10 @@ class AzFusionEnv implements FusionEnv { // For Active Directory and Managed Identity, we cannot generate an *account* SAS token, but we can generate // a *container* SAS token for the work directory. if (cfg.activeDirectory().isConfigured() || cfg.managedIdentity().isConfigured()) { - return AzHelper.generateContainerSasWithActiveDirectory(Global.session.workDir, cfg.storage().tokenDuration) + return AzHelper.generateContainerSasWithActiveDirectory(workDir, cfg.storage().tokenDuration) } // Shared Key authentication can use an account SAS token - return AzHelper.generateAccountSasWithAccountKey(Global.session.workDir, cfg.storage().tokenDuration) + return AzHelper.generateAccountSasWithAccountKey(workDir, cfg.storage().tokenDuration) } }