Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ The following settings are available:
: The maximum size of the local cache used by the Fusion client.

`fusion.containerConfigUrl`
: The URL of the container layer that provides the Fusion client.
: The URL of the container layer that provides the Fusion client. Supports `http(s)://...` and `file:/...` (absolute, no authority) schemes — the latter is intended for local development against a custom manifest, e.g. `file:/path/to/manifest.json`.

`fusion.enabled`
: Enable the Fusion file system (default: `false`).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.regex.Pattern

import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.util.logging.Slf4j
import nextflow.Global
import nextflow.Session
import nextflow.SysEnv
Expand All @@ -38,6 +39,7 @@ import nextflow.util.MemoryUnit
@Description("""
The `fusion` scope provides advanced configuration for the use of the [Fusion file system](https://docs.seqera.io/fusion).
""")
@Slf4j
@CompileStatic
class FusionConfig implements ConfigScope {

Expand Down Expand Up @@ -72,7 +74,7 @@ class FusionConfig implements ConfigScope {

@ConfigOption
@Description("""
The URL of the container layer that provides the Fusion client.
The URL of the container layer that provides the Fusion client. Supports `http(s)://...` and `file:/...` (absolute, no authority) schemes.
""")
final String containerConfigUrl

Expand Down Expand Up @@ -132,8 +134,8 @@ class FusionConfig implements ConfigScope {

boolean snapshotsEnabled() { snapshots }

URL containerConfigUrl() {
this.containerConfigUrl ? new URL(this.containerConfigUrl) : null
URI containerConfigURI() {
containerConfigUrl ? new URI(containerConfigUrl) : null
}

boolean privileged() {
Expand All @@ -156,7 +158,7 @@ class FusionConfig implements ConfigScope {
this.targetVersion = opts.targetVersion as String

if( containerConfigUrl && !validProtocol(containerConfigUrl))
throw new IllegalArgumentException("Fusion container config URL should start with 'http:' or 'https:' protocol prefix - offending value: $containerConfigUrl")
throw new IllegalArgumentException("Fusion container config URL must be 'http(s)://...' or 'file:/...' (absolute, no authority) - offending value: $containerConfigUrl")
}

static private String parseTags(Object value) {
Expand All @@ -170,7 +172,17 @@ class FusionConfig implements ConfigScope {
}

protected boolean validProtocol(String url) {
url.startsWith('http://') || url.startsWith('https://') || url.startsWith('file:/')
if( url.startsWith('http://') || url.startsWith('https://') )
return true
try {
// accept only absolute file URIs without authority, e.g. `file:/path` or `file:///path`
final uri = new URI(url)
return uri.scheme == 'file' && !uri.authority && uri.path?.startsWith('/')
}
catch( URISyntaxException e ) {
log.debug "Invalid Fusion container config URL: $url - cause: ${e.message}"
return false
}
}

static FusionConfig getConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ class FusionConfigTest extends Specification {
}

@Unroll
def 'should create container config url' () {
def 'should create container config uri' () {
when:
def opts = new FusionConfig(OPTS, ENV)
then:
opts.containerConfigUrl() == (EXPECTED ? new URL(EXPECTED) : null)
opts.containerConfigURI() == (EXPECTED ? new URI(EXPECTED) : null)

where:
OPTS | ENV | EXPECTED
Expand All @@ -57,6 +57,39 @@ class FusionConfigTest extends Specification {

}

@Unroll
def 'should reject invalid container config url: #VALUE' () {
when:
new FusionConfig([containerConfigUrl: VALUE])
then:
thrown(IllegalArgumentException)

where:
VALUE << [
'ftp://foo.com/x.json',
'/abs/path.json',
'relative/path.json',
'gs://bucket/x.json',
'file://host/tmp/manifest.json',
'file:relative.json',
'file:./x.json',
]
}

@Unroll
def 'should accept container config url: #VALUE' () {
expect:
new FusionConfig([containerConfigUrl: VALUE]).containerConfigURI() == new URI(VALUE)

where:
VALUE << [
'http://foo.com/x.json',
'https://foo.com/x.json',
'file:/tmp/manifest.json',
'file:///tmp/manifest.json',
]
}

@Unroll
def 'should get export aws key' () {
expect:
Expand Down
46 changes: 35 additions & 11 deletions plugins/nf-wave/src/main/io/seqera/wave/plugin/WaveClient.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.nio.file.Path
import java.nio.file.Paths
import java.time.Duration
import java.time.Instant
import java.time.OffsetDateTime
Expand Down Expand Up @@ -403,39 +404,62 @@ class WaveClient {
: new URL(DEFAULT_S5CMD_AMD64_URL)
}

protected static URL replaceFusionArch(URL url, String platform) {
protected static URI replaceFusionArch(URI uri, String platform) {
if( uri.scheme == 'file' )
return uri
final isArm = platform.tokenize('/')?.contains('arm64')
final targetArch = isArm ? 'arm64' : 'amd64'
final replaced = url.toString().replaceAll(/(?<=[-_])(amd64|arm64)(?=\.)/, targetArch)
return replaced != url.toString() ? new URL(replaced) : url
final original = uri.toString()
final replaced = original.replaceAll(/(?<=[-_])(amd64|arm64)(?=\.)/, targetArch)
return replaced != original ? new URI(replaced) : uri
}

ContainerConfig resolveContainerConfig(String platform = DEFAULT_DOCKER_PLATFORM) {
final urls = new ArrayList<URL>(config.containerConfigUrl())
final uris = new ArrayList<URI>(config.containerConfigUrl().collect { it.toURI() })
final platforms = platform ? platform.tokenize(',') : List.of(DEFAULT_DOCKER_PLATFORM)
if( fusion.enabled() ) {
final customUrl = fusion.containerConfigUrl()
final customUri = fusion.containerConfigURI()
for( String p : platforms ) {
final fusionUrl = customUrl ? replaceFusionArch(customUrl, p.trim()) : defaultFusionUrl(p.trim())
urls.add(fusionUrl)
final fusionUri = customUri ? replaceFusionArch(customUri, p.trim()) : defaultFusionUrl(p.trim()).toURI()
uris.add(fusionUri)
}
}
if( awsFargate ) {
final s5cmdUrl = s5cmdConfigUrl ?: defaultS5cmdUrl(platform)
urls.add(s5cmdUrl)
uris.add(s5cmdUrl.toURI())
}
if( !urls )
if( !uris )
return null
def result = new ContainerConfig()
for( URL it : urls ) {
for( URI it : uris ) {
// append each config to the other - the last has priority
result += fetchContainerConfig(it)
}
return result
}

@Memoized
synchronized protected ContainerConfig fetchContainerConfig(URL configUrl) {
synchronized protected ContainerConfig fetchContainerConfig(URI configURI) {
log.debug "Wave fetch container config: $configURI"
final scheme = configURI.scheme
if( scheme == 'http' || scheme == 'https' )
return fetchContainerConfig(configURI.toURL())
if( scheme == 'file' )
return fetchContainerConfig(Paths.get(configURI))
throw new IllegalArgumentException("Unsupported container config URI scheme: $scheme")
}

protected ContainerConfig fetchContainerConfig(Path configPath) {
log.debug "Wave read local container config: $configPath"
try {
return jsonToContainerConfig(configPath.text)
}
catch( Exception e ) {
throw new IllegalStateException("Cannot read Fusion container config from $configPath", e)
}
}

protected ContainerConfig fetchContainerConfig(URL configUrl) {
log.debug "Wave request container config: $configUrl"
final req = HttpRequest.newBuilder()
.uri(configUrl.toURI())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,46 @@ class WaveClientTest extends Specification {
| CONFIG_RESP.get('combined')
}

def 'should resolve container config from local file' () {
given:
def folder = Files.createTempDirectory('wave-local-manifest')
def manifest = folder.resolve('manifest.json')
manifest.text = JsonOutput.toJson([entrypoint: ['entry.sh']])
and:
def session = Mock(Session) { getConfig() >> [fusion: [enabled: true, containerConfigUrl: manifest.toUri().toString()]] }
def client = new WaveClient(session)

expect:
client.resolveContainerConfig() == new ContainerConfig(entrypoint: ['entry.sh'])

cleanup:
folder?.deleteDir()
}

def 'should reject unsupported scheme for container config' () {
given:
def session = Mock(Session) { getConfig() >> [:] }
def client = new WaveClient(session)

when:
client.fetchContainerConfig(new URI('ftp://example.com/manifest.json'))

then:
thrown(IllegalArgumentException)
}

@Unroll
def 'should replace fusion arch in URI' () {
expect:
WaveClient.replaceFusionArch(new URI(URI_VALUE), PLATFORM).toString() == EXPECTED

where:
URI_VALUE | PLATFORM | EXPECTED
'https://fusionfs.seqera.io/releases/v2.5-amd64.json' | 'linux/arm64' | 'https://fusionfs.seqera.io/releases/v2.5-arm64.json'
'https://fusionfs.seqera.io/releases/v2.5-arm64.json' | 'linux/amd64' | 'https://fusionfs.seqera.io/releases/v2.5-amd64.json'
'file:///tmp/local-manifest.json' | 'linux/arm64' | 'file:///tmp/local-manifest.json'
}

@Unroll
def 'should get fusion default url' () {
given:
Expand Down
Loading