diff --git a/modules/nextflow/src/main/groovy/nextflow/util/SimpleHttpClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/SimpleHttpClient.groovy index bb756c0af3..2bb6b528d8 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/SimpleHttpClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/SimpleHttpClient.groovy @@ -32,6 +32,7 @@ import nextflow.BuildInfo * Paolo Di Tommaso */ @Slf4j +@Deprecated @CompileStatic class SimpleHttpClient { diff --git a/modules/nf-commons/src/main/nextflow/util/RetryConfig.groovy b/modules/nf-commons/src/main/nextflow/util/RetryConfig.groovy index b9e6c202aa..36ff2c6fb0 100644 --- a/modules/nf-commons/src/main/nextflow/util/RetryConfig.groovy +++ b/modules/nf-commons/src/main/nextflow/util/RetryConfig.groovy @@ -37,11 +37,11 @@ import nextflow.SysEnv @CompileStatic class RetryConfig implements Retryable.Config { - private final static Duration DEFAULT_DELAY = Duration.of('350ms') - private final static Duration DEFAULT_MAX_DELAY = Duration.of('90s') - private final static Integer DEFAULT_MAX_ATTEMPTS = 5 - private final static Double DEFAULT_JITTER = 0.25 - static final public double DEFAULT_MULTIPLIER = 2.0 + public final static Duration DEFAULT_DELAY = Duration.of('350ms') + public final static Duration DEFAULT_MAX_DELAY = Duration.of('90s') + public final static Integer DEFAULT_MAX_ATTEMPTS = 5 + public final static Double DEFAULT_JITTER = 0.25 + public final static double DEFAULT_MULTIPLIER = 2.0 private final static String ENV_PREFIX = 'NXF_RETRY_POLICY_' diff --git a/plugins/nf-tower/build.gradle b/plugins/nf-tower/build.gradle index 20cb9f491b..c65b593569 100644 --- a/plugins/nf-tower/build.gradle +++ b/plugins/nf-tower/build.gradle @@ -33,6 +33,7 @@ dependencies { compileOnly 'org.slf4j:slf4j-api:2.0.17' compileOnly 'org.pf4j:pf4j:3.12.0' + api 'io.seqera:lib-httpx:1.6.0' api "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.15.0" api "com.fasterxml.jackson.core:jackson-databind:2.12.7.1" diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy index 8d4159afb3..e3fa2618f2 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy @@ -17,6 +17,8 @@ package io.seqera.tower.plugin +import java.net.http.HttpClient +import java.net.http.HttpRequest import java.time.Instant import java.time.OffsetDateTime @@ -29,14 +31,15 @@ import groovy.json.JsonGenerator import groovy.json.JsonOutput import groovy.json.JsonSlurper import groovy.transform.CompileStatic -import groovy.transform.Memoized import groovy.transform.ToString import groovy.transform.TupleConstructor import groovy.util.logging.Slf4j +import io.seqera.http.HxClient +import io.seqera.http.HxConfig +import io.seqera.util.trace.TraceUtils +import nextflow.BuildInfo import nextflow.Session import nextflow.container.resolver.ContainerMeta -import nextflow.container.resolver.ContainerResolver -import nextflow.container.resolver.ContainerResolverProvider import nextflow.exception.AbortOperationException import nextflow.processor.TaskHandler import nextflow.processor.TaskId @@ -49,7 +52,6 @@ import nextflow.trace.event.TaskEvent import nextflow.util.Duration import nextflow.util.LoggerHelper import nextflow.util.ProcessHelper -import nextflow.util.SimpleHttpClient import nextflow.util.TestOnly import nextflow.util.Threads /** @@ -98,10 +100,7 @@ class TowerClient implements TraceObserverV2 { */ private String runId - /** - * Simple http client object that will send out messages - */ - private SimpleHttpClient httpClient + private HxClient httpClient private JsonGenerator generator @@ -139,12 +138,12 @@ class TowerClient implements TraceObserverV2 { private String accessToken - private String refreshToken - private String workspaceId private TowerReports reports + private TowerRetryPolicy retryPolicy + private Map allContainers = new ConcurrentHashMap<>() TowerClient(Session session, TowerConfig config) { @@ -152,6 +151,7 @@ class TowerClient implements TraceObserverV2 { this.endpoint = checkUrl(config.endpoint) this.accessToken = config.accessToken this.workspaceId = config.workspaceId + this.retryPolicy = config.retryPolicy this.schema = loadSchema() this.generator = TowerJsonGenerator.create(schema) this.reports = new TowerReports(session) @@ -278,9 +278,7 @@ class TowerClient implements TraceObserverV2 { this.aggregator = new ResourcesAggregator(session) this.runName = session.getRunName() this.runId = session.getUniqueId() - this.httpClient = new SimpleHttpClient() - // set the auth token - setAuthToken( httpClient, getAccessToken() ) + this.httpClient = newHttpClient() // send hello to verify auth final req = makeCreateReq(session) @@ -305,10 +303,28 @@ class TowerClient implements TraceObserverV2 { reports.flowCreate(workflowId) } - protected void setAuthToken(SimpleHttpClient client, String token) { + protected HxClient newHttpClient() { + final config = new HxConfig.Builder() + // auth settings + setupClientAuth(config, getAccessToken()) + // retry settings + config.withRetryConfig(this.retryPolicy) + // create the client object + final client = HttpClient + .newBuilder() + .followRedirects(HttpClient.Redirect.NORMAL) + .version(HttpClient.Version.HTTP_1_1) + .connectTimeout(java.time.Duration.ofSeconds(60)) + .build() + return HxClient.create(client, config.build()) + } + + protected void setupClientAuth(HxConfig.Builder config, String token) { // check for plain jwt token if( token.count('.')==2 ) { - client.setBearerToken(token) + config.withJwtToken(token) + config.withRefreshToken(env.get('TOWER_REFRESH_TOKEN')) + config.withRefreshTokenUrl("$endpoint/oauth/access_token") return } @@ -318,7 +334,10 @@ class TowerClient implements TraceObserverV2 { final p = plain.indexOf('.') if( p!=-1 && new JsonSlurper().parseText( plain.substring(0, p) ) ) { // ok this is bearer token - client.setBearerToken(token) + config.withJwtToken(token) + // setup the refresh + config.withRefreshToken(env.get('TOWER_REFRESH_TOKEN')) + config.withRefreshTokenUrl("$endpoint/oauth/access_token") return } } @@ -327,7 +346,7 @@ class TowerClient implements TraceObserverV2 { } // fallback on simple token - client.setBasicToken(TOKEN_PREFIX + token) + config.withBasicAuth(TOKEN_PREFIX + token) } protected Map makeCreateReq(Session session) { @@ -352,9 +371,6 @@ class TowerClient implements TraceObserverV2 { @Override void onFlowBegin() { // configure error retry - httpClient.maxRetries = maxRetries - httpClient.backOffBase = backOffBase - httpClient.backOffDelay = backOffDelay final req = makeBeginReq(session) final resp = sendHttpMessage(urlTraceBegin, req, 'PUT') @@ -479,37 +495,6 @@ class TowerClient implements TraceObserverV2 { reports.filePublish(event.target) } - protected void refreshToken(String refresh) { - log.debug "Token refresh request >> $refresh" - final url = "$endpoint/oauth/access_token" - httpClient.sendHttpMessage( - url, - method: 'POST', - contentType: "application/x-www-form-urlencoded", - body: "grant_type=refresh_token&refresh_token=${URLEncoder.encode(refresh, 'UTF-8')}" ) - - final authCookie = httpClient.getCookie('JWT') - final refreshCookie = httpClient.getCookie('JWT_REFRESH_TOKEN') - - // set the new bearer token - if( authCookie?.value ) { - log.trace "Updating http client bearer token=$authCookie.value" - httpClient.setBearerToken(authCookie.value) - } - else { - log.warn "Missing JWT cookie from refresh token response ~ $authCookie" - } - - // set the new refresh token - if( refreshCookie?.value ) { - log.trace "Updating http client refresh token=$refreshCookie.value" - refreshToken = refreshCookie.value - } - else { - log.warn "Missing JWT_REFRESH_TOKEN cookie from refresh token response ~ $refreshCookie" - } - } - /** * Little helper method that sends a HTTP POST message as JSON with * the current run status, ISO 8601 UTC timestamp, run name and the TraceRecord @@ -520,51 +505,48 @@ class TowerClient implements TraceObserverV2 { */ protected Response sendHttpMessage(String url, Map payload, String method='POST') { - int refreshTries=0 - final currentRefresh = refreshToken ?: env.get('TOWER_REFRESH_TOKEN') - - while ( true ) { - // The actual HTTP request - final String json = payload != null ? generator.toJson(payload) : null - final String debug = json != null ? JsonOutput.prettyPrint(json).indent() : '-' - log.trace "HTTP url=$url; payload:\n${debug}\n" - try { - if( refreshTries==1 ) { - refreshToken(currentRefresh) - } - - httpClient.sendHttpMessage(url, json, method) - return new Response(httpClient.responseCode, httpClient.getResponse()) + // The actual HTTP request + final String json = payload != null ? generator.toJson(payload) : null + final String debug = json != null ? JsonOutput.prettyPrint(json).indent() : '-' + log.trace "HTTP url=$url; payload:\n${debug}\n" + try { + final resp = httpClient.sendAsString(makeRequest(url, json, method)) + final status = resp.statusCode() + if( status == 401 ) { + final msg = 'Unauthorized Seqera Platform API access -- Make sure you have specified the correct access token' + return new Response(status, msg) } - catch( ConnectException e ) { - String msg = "Unable to connect to Seqera Platform API: ${getHostUrl(url)}" - return new Response(0, msg) - } - catch (IOException e) { - int code = httpClient.responseCode - if( code == 401 && ++refreshTries==1 && currentRefresh ) { - // when 401 Unauthorized error is returned - only the very first time - - // and a refresh token is available, make another iteration trying - // having refreshed the authorization token (see 'refreshToken' invocation above) - log.trace "Got 401 Unauthorized response ~ tries refreshing auth token" - continue - } - else { - log.trace("Got HTTP code $code - refreshTries=$refreshTries - currentRefresh=$currentRefresh", e) - } - - String msg - if( code == 401 ) { - msg = 'Unauthorized Seqera Platform API access -- Make sure you have specified the correct access token' - } - else { - msg = parseCause(httpClient.response) ?: "Unexpected response for request $url" - } - return new Response(code, msg, httpClient.response) + if( status>=400 ) { + final msg = parseCause(resp?.body()) ?: "Unexpected response for request $url" + return new Response(status, msg as String) } + else + return new Response(status, resp.body()) + } + catch( IOException e ) { + String msg = "Unable to connect to Seqera Platform API: ${getHostUrl(url)}" + return new Response(0, msg) } } + protected HttpRequest makeRequest(String url, String payload, String verb) { + assert payload, "Tower request cannot be empty" + + final builder = HttpRequest.newBuilder(URI.create(url)) + .header('Content-Type', 'application/json; charset=utf-8') + .header('User-Agent', "Nextflow/$BuildInfo.version") + .header('Traceparent', TraceUtils.rndTrace()) + + if(verb == 'PUT') + return builder.PUT(HttpRequest.BodyPublishers.ofString(payload)).build() + + if(verb == 'POST') + return builder.POST(HttpRequest.BodyPublishers.ofString(payload)).build() + + else + throw new IllegalArgumentException("Unsupported HTTP verb: $verb") + } + protected boolean isCliLogsEnabled() { return env.get('TOWER_ALLOW_NEXTFLOW_LOGS') == 'true' } @@ -847,8 +829,4 @@ class TowerClient implements TraceObserverV2 { } } - @Memoized - private ContainerResolver containerResolver() { - ContainerResolverProvider.load() - } } diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerConfig.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerConfig.groovy index f65003eaba..59e15ad918 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerConfig.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerConfig.groovy @@ -60,6 +60,8 @@ class TowerConfig implements ConfigScope { """) final String workspaceId + final TowerRetryPolicy retryPolicy + /* required by extension point -- do not remove */ TowerConfig() {} @@ -68,5 +70,6 @@ class TowerConfig implements ConfigScope { this.enabled = opts.enabled as boolean this.endpoint = PlatformHelper.getEndpoint(opts, env) this.workspaceId = PlatformHelper.getWorkspaceId(opts, env) + this.retryPolicy = new TowerRetryPolicy(opts.retryPolicy as Map ?: Map.of(), opts) } } diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFactory.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFactory.groovy index 5845dd5c91..a3154a3c70 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFactory.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFactory.groovy @@ -25,10 +25,9 @@ import nextflow.Session import nextflow.SysEnv import nextflow.file.http.XAuthProvider import nextflow.file.http.XAuthRegistry -import nextflow.trace.TraceObserverV2 import nextflow.trace.TraceObserverFactoryV2 +import nextflow.trace.TraceObserverV2 import nextflow.util.Duration -import nextflow.util.SimpleHttpClient /** * Create and register the Tower observer instance * @@ -70,10 +69,6 @@ class TowerFactory implements TraceObserverFactoryV2 { tower.aliveInterval = aliveInterval if( requestInterval ) tower.requestInterval = requestInterval - // error handling settings - tower.maxRetries = opts.maxRetries != null ? opts.maxRetries as int : 5 - tower.backOffBase = opts.backOffBase != null ? opts.backOffBase as int : SimpleHttpClient.DEFAULT_BACK_OFF_BASE - tower.backOffDelay = opts.backOffDelay != null ? opts.backOffDelay as int : SimpleHttpClient.DEFAULT_BACK_OFF_DELAY // register auth provider // note: this is needed to authorize access to resources via XFileSystemProvider used by NF diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerRetryPolicy.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerRetryPolicy.groovy new file mode 100644 index 0000000000..bf12b3dfc5 --- /dev/null +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerRetryPolicy.groovy @@ -0,0 +1,82 @@ +/* + * Copyright 2013-2025, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.seqera.tower.plugin + + +import io.seqera.util.retry.Retryable +import nextflow.config.schema.ConfigOption +import nextflow.config.schema.ConfigScope +import nextflow.script.dsl.Description +import nextflow.util.Duration +import nextflow.util.RetryConfig + +/** + * Configuration class for Tower retry policy settings. + * + * This class defines the retry behavior for Tower operations including HTTP requests + * and other potentially failing operations. It implements exponential backoff with + * jitter to handle transient failures gracefully. + * + * The retry policy supports: + * - Configurable initial delay before the first retry attempt + * - Maximum delay cap to prevent excessively long wait times + * - Limited number of retry attempts to avoid infinite loops + * - Jitter randomization to prevent thundering herd problems + * - Exponential backoff multiplier for progressive delay increases + * + * @author Paolo Di Tommaso + */ +class TowerRetryPolicy implements Retryable.Config, ConfigScope { + + @ConfigOption + @Description(""" + Initial delay before retrying a failed Tower operation (default: `350ms`). + """) + Duration delay + + @ConfigOption + @Description(""" + Maximum delay between retry attempts for Tower operations (default: `90s`). + """) + Duration maxDelay + + @ConfigOption + @Description(""" + Maximum number of retry attempts for Tower operations (default: `5`). + """) + int maxAttempts + + @ConfigOption + @Description(""" + Random jitter factor applied to retry delays to avoid thundering herd issues (default: `0.25`). + """) + double jitter + + @ConfigOption + @Description(""" + Multiplier factor for exponential backoff between retry attempts (default: `2.0`). + """) + double multiplier + + TowerRetryPolicy(Map opts, Map legacy=Map.of()) { + this.delay = opts.delay as Duration ?: legacy.backOffDelay as Duration ?: RetryConfig.DEFAULT_DELAY + this.maxDelay = opts.maxDelay as Duration ?: RetryConfig.DEFAULT_MAX_DELAY + this.maxAttempts = opts.maxAttemps as Integer ?: legacy.maxRetries as Integer ?: RetryConfig.DEFAULT_MAX_ATTEMPTS + this.jitter = opts.jitter as Double ?: RetryConfig.DEFAULT_JITTER + this.multiplier = opts.multiplier as Double ?: legacy.backOffBase as Double ?: RetryConfig.DEFAULT_MULTIPLIER + } +} diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy index cab9b4c897..1216843608 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy @@ -17,11 +17,14 @@ package io.seqera.tower.plugin +import java.net.http.HttpResponse import java.nio.file.Files import java.time.Instant import java.time.OffsetDateTime import java.time.ZoneId +import io.seqera.http.HxClient +import io.seqera.http.HxConfig import nextflow.Session import nextflow.cloud.types.CloudMachineInfo import nextflow.cloud.types.PriceModel @@ -34,7 +37,6 @@ import nextflow.trace.TraceRecord import nextflow.trace.WorkflowStats import nextflow.trace.WorkflowStatsObserver import nextflow.util.ProcessHelper -import nextflow.util.SimpleHttpClient import spock.lang.Specification /** * @@ -187,7 +189,7 @@ class TowerClientTest extends Specification { given: def URL = 'http://foo.com' def PROGRESS = Mock(WorkflowProgress) { getRunning()>>1; getSucceeded()>>2; getFailed()>>3 } - def client = Mock(SimpleHttpClient) + def client = Mock(HxClient) def observer = Spy(TowerClient) observer.@httpClient = client observer.@workflowId = 'xyz-123' @@ -236,7 +238,7 @@ class TowerClientTest extends Specification { when: observer.sendHttpMessage(URL, req) then: - 1 * client.sendHttpMessage(URL, _, 'POST') >> null + 1 * client.sendAsString(_) >> Mock(HttpResponse) } @@ -264,7 +266,7 @@ class TowerClientTest extends Specification { given: def sessionId = UUID.randomUUID() def dir = Files.createTempDirectory('test') - def http = Mock(SimpleHttpClient) + def http = Mock(HxClient) TowerClient client = Spy(new TowerClient([httpClient: http, env: ENV])) and: client.getOperationId() >> 'op-112233' @@ -436,7 +438,7 @@ class TowerClientTest extends Specification { def 'should set the auth token' () { given: - def http = Mock(SimpleHttpClient) + def http = Mock(HxConfig.Builder) def session = Mock(Session) def config = new TowerConfig([:], [:]) def client = Spy(new TowerClient(session, config)) @@ -445,19 +447,19 @@ class TowerClientTest extends Specification { def BEARER = 'eyJ0aWQiOiA1fS5jZmM1YjVhOThjZjM2MTk1NjBjZWU1YmMwODUxYzA1ZjkzMDdmN2Iz' when: - client.setAuthToken(http, SIMPLE) + client.setupClientAuth(http, SIMPLE) then: - http.setBasicToken('@token:' + SIMPLE) >> null + http.withBasicAuth('@token:' + SIMPLE) >> null when: - client.setAuthToken(http, SIMPLE) + client.setupClientAuth(http, SIMPLE) then: - http.setBasicToken('@token:' + SIMPLE) >> null + http.withBasicAuth('@token:' + SIMPLE) >> null when: - client.setAuthToken(http, BEARER) + client.setupClientAuth(http, BEARER) then: - http.setBearerToken(BEARER) >> null + http.withJwtToken(BEARER) >> null } def 'should fetch workflow meta' () { @@ -520,4 +522,16 @@ class TowerClientTest extends Specification { and: client.getNewContainers([trace1, trace2, trace3]) == [c2] } + + def 'should handle HTTP request with content'() { + given: 'a TowerClient' + def tower = new TowerClient() + def content = '{"test": "data"}' + def request = tower.makeRequest('http://example.com/test', content, 'POST') + + expect: 'the request should be created with the content' + request != null + request.method() == 'POST' + request.uri().toString() == 'http://example.com/test' + } } diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerRetryPolicyTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerRetryPolicyTest.groovy new file mode 100644 index 0000000000..e2932dcee5 --- /dev/null +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerRetryPolicyTest.groovy @@ -0,0 +1,73 @@ +/* + * Copyright 2013-2025, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.seqera.tower.plugin + +import nextflow.util.Duration +import nextflow.util.RetryConfig +import spock.lang.Specification + +/** + * Unit tests for TowerRetryPolicy + * + * @author Paolo Di Tommaso + */ +class TowerRetryPolicyTest extends Specification { + + def 'should validate default values of tower retry policy'() { + when: + def policy = new TowerRetryPolicy([:]) + + then: + policy.delay == RetryConfig.DEFAULT_DELAY + policy.maxDelay == RetryConfig.DEFAULT_MAX_DELAY + policy.maxAttempts == RetryConfig.DEFAULT_MAX_ATTEMPTS + policy.jitter == RetryConfig.DEFAULT_JITTER + policy.multiplier == RetryConfig.DEFAULT_MULTIPLIER + } + + def 'should use provided values when specified'() { + when: + def customOptions = [ + delay: '1s' as nextflow.util.Duration, + maxDelay: '60s' as nextflow.util.Duration, + maxAttemps: 3, + jitter: 0.5, + multiplier: 1.5 + ] + def policy = new TowerRetryPolicy(customOptions) + + then: + policy.delay == customOptions.delay + policy.maxDelay == customOptions.maxDelay + policy.maxAttempts == 3 + policy.jitter == 0.5d + policy.multiplier == 1.5d + } + + def 'should use provided values when specified'() { + when: + def policy = new TowerRetryPolicy([:], [backOffDelay: 500, maxRetries: 100, backOffBase: 5]) + + then: + policy.delay == Duration.of('500ms') + policy.maxAttempts == 100 + policy.multiplier == 5 + and: + policy.maxDelay == RetryConfig.DEFAULT_MAX_DELAY + policy.jitter == RetryConfig.DEFAULT_JITTER + } +}