diff --git a/CLAUDE.md b/CLAUDE.md index 785ac91429..711ac0f68e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -139,3 +139,10 @@ Follow these actions to make a new release: git commit -m "[release] Nextflow version 25.09.0-edge" git push origin master ``` + +## Active Technologies +- Groovy 4.0.29, Java 17 target (Java 21 toolchain) + AWS SDK v2 (ECS, EC2, CloudWatch Logs, S3), GPars 1.2.1 (001-ecs-executor) +- AWS S3 via Seqera Fusion filesystem (001-ecs-executor) + +## Recent Changes +- 001-ecs-executor: Added Groovy 4.0.29, Java 17 target (Java 21 toolchain) + AWS SDK v2 (ECS, EC2, CloudWatch Logs, S3), GPars 1.2.1 diff --git a/plugins/nf-amazon/build.gradle b/plugins/nf-amazon/build.gradle index 8e87b6a654..c7daa05346 100644 --- a/plugins/nf-amazon/build.gradle +++ b/plugins/nf-amazon/build.gradle @@ -28,6 +28,7 @@ nextflowPlugin { generateSpec = false extensionPoints = [ 'nextflow.cloud.aws.batch.AwsBatchExecutor', + 'nextflow.cloud.aws.ecs.AwsEcsExecutor', 'nextflow.cloud.aws.config.AwsConfig', 'nextflow.cloud.aws.fusion.AwsFusionEnv', 'nextflow.cloud.aws.mail.AwsMailProvider', diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsConfig.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsConfig.groovy index b42f151c63..3865cf1ba2 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsConfig.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsConfig.groovy @@ -45,6 +45,8 @@ class AwsConfig implements ConfigScope { final AwsBatchConfig batch + final AwsEcsConfig ecs + final AwsS3Config client @ConfigOption @@ -80,6 +82,7 @@ class AwsConfig implements ConfigScope { this.profile = getAwsProfile0(SysEnv.get(), opts) this.region = getAwsRegion(SysEnv.get(), opts) this.batch = new AwsBatchConfig((Map)opts.batch ?: Collections.emptyMap()) + this.ecs = new AwsEcsConfig((Map)opts.ecs ?: Collections.emptyMap()) this.client = new AwsS3Config((Map)opts.client ?: Collections.emptyMap()) } @@ -93,6 +96,8 @@ class AwsConfig implements ConfigScope { AwsBatchConfig getBatchConfig() { batch } + AwsEcsConfig getEcsConfig() { ecs } + @Deprecated String getS3GlobalRegion() { return !region || !s3Config.endpoint || s3Config.endpoint.contains(".amazonaws.com") diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsEcsConfig.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsEcsConfig.groovy new file mode 100644 index 0000000000..a51437642f --- /dev/null +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/config/AwsEcsConfig.groovy @@ -0,0 +1,111 @@ +/* + * Copyright 2020-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.cloud.aws.config + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.config.spec.ConfigOption +import nextflow.config.spec.ConfigScope +import nextflow.script.dsl.Description + +/** + * Model AWS ECS Managed Instances executor config settings + * + * @author Paolo Di Tommaso + */ +@Slf4j +@CompileStatic +class AwsEcsConfig implements ConfigScope { + + public static final String DEFAULT_LOGS_GROUP = '/aws/ecs/nextflow' + public static final int DEFAULT_MAX_SPOT_ATTEMPTS = 5 + + @ConfigOption + @Description(""" + The name or ARN of the ECS cluster with Managed Instances capacity provider where tasks will be executed. + This setting is required. + """) + final String cluster + + @ConfigOption + @Description(""" + The ARN of the IAM role that Amazon ECS uses for task execution. This role is required for pulling + container images and sending logs to CloudWatch. + """) + final String executionRole + + @ConfigOption + @Description(""" + The ARN of the IAM role that tasks can use to make AWS API requests (e.g., for S3 access). + """) + final String taskRole + + @ConfigOption + @Description(""" + List of VPC subnet IDs where ECS tasks will be launched. If not specified, subnets are + auto-discovered from the default VPC. + """) + final List subnets + + @ConfigOption + @Description(""" + List of security group IDs to associate with ECS tasks. If not specified, the default + security group is auto-discovered from the default VPC. + """) + final List securityGroups + + @ConfigOption + @Description(""" + The name of the CloudWatch Logs group where task logs will be sent (default: `/aws/ecs/nextflow`). + """) + final String logsGroup + + @ConfigOption + @Description(""" + Maximum number of retry attempts for tasks interrupted by Spot instance reclaim (default: `5`). + """) + final Integer maxSpotAttempts + + @ConfigOption + @Description(""" + Whether to assign a public IP address to tasks (default: `true`). When enabled, tasks can + access the internet without requiring a NAT gateway. + """) + final Boolean assignPublicIp + + AwsEcsConfig(Map opts) { + cluster = opts.cluster as String + executionRole = opts.executionRole as String + taskRole = opts.taskRole as String + subnets = parseStringList(opts.subnets) + securityGroups = parseStringList(opts.securityGroups) + logsGroup = opts.logsGroup as String ?: DEFAULT_LOGS_GROUP + maxSpotAttempts = opts.maxSpotAttempts != null ? opts.maxSpotAttempts as Integer : DEFAULT_MAX_SPOT_ATTEMPTS + assignPublicIp = opts.assignPublicIp != null ? opts.assignPublicIp as Boolean : true + } + + protected List parseStringList(Object obj) { + if (!obj) + return null + if (obj instanceof List) + return ((List) obj).collect { it.toString() } + if (obj instanceof CharSequence) + return obj.toString().tokenize(',').collect { it.trim() } + throw new IllegalArgumentException("Not a valid list value: $obj [${obj.getClass().getName()}]") + } +} diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/ecs/AwsEcsExecutor.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/ecs/AwsEcsExecutor.groovy new file mode 100644 index 0000000000..8fc40b2c76 --- /dev/null +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/ecs/AwsEcsExecutor.groovy @@ -0,0 +1,478 @@ +/* + * Copyright 2020-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.cloud.aws.ecs + +import java.nio.file.Path +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException + +import groovy.transform.CompileStatic +import groovy.transform.PackageScope +import groovy.util.logging.Slf4j +import nextflow.cloud.aws.AwsClientFactory +import nextflow.cloud.aws.config.AwsConfig +import nextflow.cloud.aws.nio.S3Path +import nextflow.exception.AbortOperationException +import nextflow.executor.Executor +import nextflow.fusion.FusionHelper +import nextflow.processor.ParallelPollingMonitor +import nextflow.processor.TaskHandler +import nextflow.processor.TaskMonitor +import nextflow.processor.TaskRun +import nextflow.util.Duration +import nextflow.util.RateUnit +import nextflow.util.ServiceName +import nextflow.util.ThreadPoolHelper +import nextflow.util.ThrottlingExecutor +import org.pf4j.ExtensionPoint +import software.amazon.awssdk.services.ec2.Ec2Client +import software.amazon.awssdk.services.ec2.model.DescribeSecurityGroupsRequest +import software.amazon.awssdk.services.ec2.model.DescribeSubnetsRequest +import software.amazon.awssdk.services.ec2.model.DescribeVpcsRequest +import software.amazon.awssdk.services.ec2.model.Filter +import software.amazon.awssdk.services.ecs.EcsClient +import software.amazon.awssdk.services.ecs.model.EcsException + +/** + * AWS ECS executor for running Nextflow tasks on Amazon Elastic Container Service. + * + * This executor uses ECS with awsvpc networking mode and requires Fusion filesystem + * for S3-based work directory access. Tasks are executed as ECS tasks with + * automatically registered task definitions. + * + *

Prerequisites

+ *
    + *
  • An ECS cluster (can be empty - capacity is managed automatically)
  • + *
  • Fusion filesystem enabled ({@code fusion.enabled = true})
  • + *
  • S3 bucket for work directory
  • + *
  • IAM execution role with permissions to pull images and write logs
  • + *
  • IAM task role with S3 access for the work directory
  • + *
+ * + *

Minimal IAM Execution Role Policy

+ *
+ * {
+ *   "Version": "2012-10-17",
+ *   "Statement": [
+ *     {
+ *       "Effect": "Allow",
+ *       "Action": [
+ *         "ecr:GetAuthorizationToken",
+ *         "ecr:BatchCheckLayerAvailability",
+ *         "ecr:GetDownloadUrlForLayer",
+ *         "ecr:BatchGetImage",
+ *         "logs:CreateLogStream",
+ *         "logs:PutLogEvents"
+ *       ],
+ *       "Resource": "*"
+ *     }
+ *   ]
+ * }
+ * 
+ * + *

Minimal IAM Task Role Policy

+ *
+ * {
+ *   "Version": "2012-10-17",
+ *   "Statement": [
+ *     {
+ *       "Effect": "Allow",
+ *       "Action": ["s3:GetObject", "s3:PutObject", "s3:DeleteObject", "s3:ListBucket"],
+ *       "Resource": ["arn:aws:s3:::your-bucket", "arn:aws:s3:::your-bucket/*"]
+ *     }
+ *   ]
+ * }
+ * 
+ * + *

Minimal Configuration

+ *
+ * process.executor = 'awsecs'
+ * fusion.enabled = true
+ * wave.enabled = true
+ * aws {
+ *   region = 'us-east-1'
+ *   ecs {
+ *     cluster = 'my-cluster'
+ *     executionRole = 'arn:aws:iam::ACCOUNT:role/ecsTaskExecutionRole'
+ *     taskRole = 'arn:aws:iam::ACCOUNT:role/ecsTaskRole'
+ *   }
+ * }
+ * 
+ * + * @author Paolo Di Tommaso + */ +@Slf4j +@ServiceName('awsecs') +@CompileStatic +class AwsEcsExecutor extends Executor implements ExtensionPoint { + + private AwsEcsOptions awsOptions + + /** + * ECS client instance + */ + @PackageScope + private EcsClient ecsClient + + /** + * EC2 client for VPC auto-discovery + */ + @PackageScope + private Ec2Client ec2Client + + /** + * AWS client factory + */ + private AwsClientFactory clientFactory + + /** + * Executor service to throttle service requests + */ + private ThrottlingExecutor submitter + + /** + * Executor service to throttle cancel requests + */ + private ThrottlingExecutor reaper + + /** + * Auto-discovered or configured subnets + */ + private List resolvedSubnets + + /** + * Auto-discovered or configured security groups + */ + private List resolvedSecurityGroups + + AwsEcsOptions getAwsOptions() { awsOptions } + + @PackageScope + EcsClient getEcsClient() { ecsClient } + + @PackageScope + Ec2Client getEc2Client() { ec2Client } + + List getResolvedSubnets() { resolvedSubnets } + + List getResolvedSecurityGroups() { resolvedSecurityGroups } + + /** + * @return {@code true} to signal containers are managed directly by AWS ECS + */ + @Override + final boolean isContainerNative() { + return true + } + + @Override + String containerConfigEngine() { + return 'docker' + } + + /** + * @return {@code true} whenever the secrets handling is managed by the executing platform itself + */ + @Override + final boolean isSecretNative() { + return true + } + + @Override + Path getWorkDir() { + session.bucketDir ?: session.workDir + } + + protected void validateWorkDir() { + /* + * make sure the work dir is a S3 bucket + */ + if (workDir !instanceof S3Path) { + session.abort() + throw new AbortOperationException("When using `$name` executor an S3 bucket must be provided as working directory using either the `-bucket-dir` or `-work-dir` command line option") + } + } + + protected void validatePathDir() { + final path = session.config.navigate('env.PATH') + if (path) { + log.warn "Environment PATH defined in config file is ignored by AWS ECS executor" + } + } + + protected void validateFusion() { + if (!isFusionEnabled()) { + session.abort() + throw new AbortOperationException("AWS ECS executor requires Fusion filesystem to be enabled. Please add `fusion.enabled = true` to your Nextflow configuration.") + } + } + + /** + * Create AWS ECS and EC2 clients + */ + protected void createAwsClients() { + clientFactory = new AwsClientFactory(new AwsConfig(session.config.aws as Map)) + ecsClient = clientFactory.getEcsClient() + ec2Client = clientFactory.getEc2Client() + awsOptions = new AwsEcsOptions(this) + log.debug "[AWS ECS] Executor options=$awsOptions" + } + + /** + * Auto-discover VPC subnets and security groups from the default VPC. + * If no default VPC exists, fail with a clear error message. + */ + protected void discoverVpcConfiguration() { + // Use configured values if provided + if (awsOptions.subnets) { + resolvedSubnets = awsOptions.subnets + log.debug "[AWS ECS] Using configured subnets: $resolvedSubnets" + } + else { + resolvedSubnets = discoverDefaultVpcSubnets() + log.debug "[AWS ECS] Auto-discovered subnets from default VPC: $resolvedSubnets" + } + + if (awsOptions.securityGroups) { + resolvedSecurityGroups = awsOptions.securityGroups + log.debug "[AWS ECS] Using configured security groups: $resolvedSecurityGroups" + } + else { + resolvedSecurityGroups = discoverDefaultVpcSecurityGroups() + log.debug "[AWS ECS] Auto-discovered security groups from default VPC: $resolvedSecurityGroups" + } + } + + /** + * Discover subnets from the default VPC. + * @return List of subnet IDs from the default VPC + * @throws AbortOperationException if no default VPC exists + */ + protected List discoverDefaultVpcSubnets() { + // First, find the default VPC + final vpcRequest = DescribeVpcsRequest.builder() + .filters(Filter.builder().name('isDefault').values('true').build()) + .build() as DescribeVpcsRequest + + final vpcsResponse = ec2Client.describeVpcs(vpcRequest) + + if (vpcsResponse.vpcs().isEmpty()) { + throw new AbortOperationException(""" + No default VPC found in this AWS region. + + The AWS ECS executor requires VPC configuration for task networking. + Please configure VPC settings explicitly in your Nextflow config: + + aws.ecs.subnets = ['subnet-xxx', 'subnet-yyy'] + aws.ecs.securityGroups = ['sg-xxx'] + + Alternatively, create a default VPC in your AWS account. + """.stripIndent().trim()) + } + + final defaultVpcId = vpcsResponse.vpcs().first().vpcId() + log.debug "[AWS ECS] Found default VPC: $defaultVpcId" + + // Get all subnets in the default VPC + final subnetsRequest = DescribeSubnetsRequest.builder() + .filters(Filter.builder().name('vpc-id').values(defaultVpcId).build()) + .build() as DescribeSubnetsRequest + + final subnetsResponse = ec2Client.describeSubnets(subnetsRequest) + + if (subnetsResponse.subnets().isEmpty()) { + throw new AbortOperationException(""" + No subnets found in the default VPC ($defaultVpcId). + + Please configure VPC subnets explicitly in your Nextflow config: + + aws.ecs.subnets = ['subnet-xxx', 'subnet-yyy'] + """.stripIndent().trim()) + } + + return subnetsResponse.subnets().collect { it.subnetId() } + } + + /** + * Discover the default security group from the default VPC. + * @return List containing the default security group ID + * @throws AbortOperationException if no default VPC or security group exists + */ + protected List discoverDefaultVpcSecurityGroups() { + // First, find the default VPC + final vpcRequest = DescribeVpcsRequest.builder() + .filters(Filter.builder().name('isDefault').values('true').build()) + .build() as DescribeVpcsRequest + + final vpcsResponse = ec2Client.describeVpcs(vpcRequest) + + if (vpcsResponse.vpcs().isEmpty()) { + throw new AbortOperationException(""" + No default VPC found in this AWS region. + + Please configure security groups explicitly in your Nextflow config: + + aws.ecs.securityGroups = ['sg-xxx'] + """.stripIndent().trim()) + } + + final defaultVpcId = vpcsResponse.vpcs().first().vpcId() + + // Get the default security group for the VPC + final sgRequest = DescribeSecurityGroupsRequest.builder() + .filters( + Filter.builder().name('vpc-id').values(defaultVpcId).build(), + Filter.builder().name('group-name').values('default').build() + ) + .build() as DescribeSecurityGroupsRequest + + final sgResponse = ec2Client.describeSecurityGroups(sgRequest) + + if (sgResponse.securityGroups().isEmpty()) { + throw new AbortOperationException(""" + No default security group found in VPC ($defaultVpcId). + + Please configure security groups explicitly in your Nextflow config: + + aws.ecs.securityGroups = ['sg-xxx'] + """.stripIndent().trim()) + } + + return [sgResponse.securityGroups().first().groupId()] + } + + /** + * Initialise the AWS ECS executor. + */ + @Override + protected void register() { + super.register() + // validate Fusion is enabled (required for ECS executor) + validateFusion() + // validate work directory is S3 + validateWorkDir() + validatePathDir() + // initialize AWS clients + createAwsClients() + // auto-discover VPC configuration + discoverVpcConfiguration() + } + + /** + * @return The monitor instance that handles AWS ECS tasks + */ + @Override + protected TaskMonitor createTaskMonitor() { + // create the throttling executor + submitter = createExecutorService('AWSECS-executor') + reaper = createExecutorService('AWSECS-reaper') + + final pollInterval = config.getPollInterval(name, Duration.of('10 sec')) + final dumpInterval = config.getMonitorDumpInterval(name) + final capacity = config.getQueueSize(name, 1000) + + final def params = [ + name: name, + session: session, + config: config, + pollInterval: pollInterval, + dumpInterval: dumpInterval, + capacity: capacity + ] + + log.debug "Creating parallel monitor for executor '$name' > pollInterval=$pollInterval; dumpInterval=$dumpInterval" + new ParallelPollingMonitor(submitter, params) + } + + /** + * Create a task handler for the given task instance + * + * @param task The {@link TaskRun} instance to be executed + * @return A {@link AwsEcsTaskHandler} for the given task + */ + @Override + TaskHandler createTaskHandler(TaskRun task) { + assert task + assert task.workDir + log.trace "[AWS ECS] Launching process > ${task.name} -- work folder: ${task.workDirStr}" + new AwsEcsTaskHandler(task, this) + } + + private static final List RETRYABLE_STATUS = [429, 500, 502, 503, 504] + + /** + * @return Creates a {@link ThrottlingExecutor} service to throttle + * the API requests to the AWS ECS service. + */ + private ThrottlingExecutor createExecutorService(String name) { + final qs = 5_000 + final limit = config.getExecConfigProp(name, 'submitRateLimit', '50/s') as String + final size = Runtime.runtime.availableProcessors() * 5 + + final opts = new ThrottlingExecutor.Options() + .retryOn { Throwable t -> t instanceof EcsException && (t.statusCode() in RETRYABLE_STATUS) } + .onFailure { Throwable t -> session?.abort(t) } + .onRateLimitChange { RateUnit rate -> logRateLimitChange(rate) } + .withRateLimit(limit) + .withQueueSize(qs) + .withPoolSize(size) + .withKeepAlive(Duration.of('1 min')) + .withAutoThrottle(true) + .withMaxRetries(10) + .withPoolName(name) + + ThrottlingExecutor.create(opts) + } + + @Override + boolean isFusionEnabled() { + return FusionHelper.isFusionEnabled(session) + } + + protected void logRateLimitChange(RateUnit rate) { + log.debug "New submission rate limit: $rate" + } + + @PackageScope + ThrottlingExecutor getReaper() { reaper } + + @Override + void shutdown() { + if (submitter) { + final tasks = submitter.shutdownNow() + if (tasks) log.warn "Execution interrupted -- cleaning up execution pool" + submitter.awaitTermination(5, TimeUnit.MINUTES) + } + // finally shutdown reaper executor + if (reaper) { + reaper.shutdown() + final waitMsg = "[AWS ECS] Waiting jobs reaper to complete (%d jobs to be terminated)" + final exitMsg = "[AWS ECS] Exiting before jobs reaper thread pool complete -- Some jobs may not be terminated" + awaitCompletion(reaper, Duration.of('60min'), waitMsg, exitMsg) + } + } + + protected void awaitCompletion(ThrottlingExecutor executor, Duration duration, String waitMsg, String exitMsg) { + try { + ThreadPoolHelper.await(executor, duration, waitMsg, exitMsg) + } + catch (TimeoutException e) { + log.warn(e.message, e) + } + } +} diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/ecs/AwsEcsOptions.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/ecs/AwsEcsOptions.groovy new file mode 100644 index 0000000000..9412685aad --- /dev/null +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/ecs/AwsEcsOptions.groovy @@ -0,0 +1,90 @@ +/* + * Copyright 2020-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.cloud.aws.ecs + +import groovy.transform.CompileStatic +import groovy.transform.EqualsAndHashCode +import groovy.transform.ToString +import groovy.util.logging.Slf4j +import nextflow.Session +import nextflow.cloud.aws.config.AwsConfig +import nextflow.cloud.aws.config.AwsEcsConfig +import nextflow.util.TestOnly + +/** + * Helper class wrapping AWS config options required for ECS Managed Instances executor + * + * @author Paolo Di Tommaso + */ +@Slf4j +@ToString(includeNames = true, includePackage = false) +@EqualsAndHashCode +@CompileStatic +class AwsEcsOptions { + + private AwsConfig awsConfig + + @TestOnly + protected AwsEcsOptions() { + this.awsConfig = new AwsConfig(Collections.emptyMap()) + } + + AwsEcsOptions(AwsEcsExecutor executor) { + awsConfig = new AwsConfig(executor.session.config.aws as Map ?: Collections.emptyMap()) + } + + AwsEcsOptions(Session session) { + awsConfig = new AwsConfig(session.config.aws as Map ?: Collections.emptyMap()) + } + + String getRegion() { + return awsConfig.getRegion() + } + + String getCluster() { + return awsConfig.ecsConfig.getCluster() + } + + String getExecutionRole() { + return awsConfig.ecsConfig.getExecutionRole() + } + + String getTaskRole() { + return awsConfig.ecsConfig.getTaskRole() + } + + List getSubnets() { + return awsConfig.ecsConfig.getSubnets() + } + + List getSecurityGroups() { + return awsConfig.ecsConfig.getSecurityGroups() + } + + String getLogsGroup() { + return awsConfig.ecsConfig.getLogsGroup() + } + + Integer getMaxSpotAttempts() { + return awsConfig.ecsConfig.getMaxSpotAttempts() + } + + Boolean getAssignPublicIp() { + return awsConfig.ecsConfig.getAssignPublicIp() + } +} diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy new file mode 100644 index 0000000000..a0b9efcc4a --- /dev/null +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy @@ -0,0 +1,476 @@ +/* + * Copyright 2020-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.cloud.aws.ecs + +import java.nio.file.Path +import java.util.concurrent.ConcurrentHashMap + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.cloud.aws.ecs.model.ContainerDefinitionModel +import nextflow.cloud.aws.ecs.model.RegisterTaskDefinitionModel +import nextflow.exception.ProcessException +import nextflow.exception.ProcessUnrecoverableException +import nextflow.executor.BashWrapperBuilder +import nextflow.fusion.FusionAwareTask +import nextflow.processor.TaskHandler +import nextflow.processor.TaskRun +import nextflow.processor.TaskStatus +import nextflow.trace.TraceRecord +import nextflow.util.CacheHelper +import nextflow.util.MemoryUnit +import nextflow.util.TestOnly +import software.amazon.awssdk.services.ecs.EcsClient +import software.amazon.awssdk.services.ecs.model.AssignPublicIp +import software.amazon.awssdk.services.ecs.model.AwsVpcConfiguration +import software.amazon.awssdk.services.ecs.model.ContainerOverride +import software.amazon.awssdk.services.ecs.model.DescribeTasksRequest +import software.amazon.awssdk.services.ecs.model.KeyValuePair +import software.amazon.awssdk.services.ecs.model.NetworkConfiguration +import software.amazon.awssdk.services.ecs.model.RegisterTaskDefinitionResponse +import software.amazon.awssdk.services.ecs.model.RunTaskRequest +import software.amazon.awssdk.services.ecs.model.RunTaskResponse +import software.amazon.awssdk.services.ecs.model.StopTaskRequest +import software.amazon.awssdk.services.ecs.model.Task +import software.amazon.awssdk.services.ecs.model.TaskOverride + +/** + * Implements a task handler for AWS ECS Managed Instances jobs + * + * @author Paolo Di Tommaso + */ +@Slf4j +@CompileStatic +class AwsEcsTaskHandler extends TaskHandler implements FusionAwareTask { + + private final Path exitFile + private final Path wrapperFile + private final Path outputFile + private final Path errorFile + private final Path logFile + private final Path scriptFile + private final Path inputFile + private final Path traceFile + + private AwsEcsExecutor executor + + /** + * The ECS task ARN + */ + private volatile String taskArn + + /** + * The ECS task definition ARN + */ + private volatile String taskDefArn + + /** + * Counter for spot interruption retries + */ + private int spotAttempts = 0 + + /** + * Cache for task definitions keyed by resource hash + */ + private static final Map taskDefinitionCache = new ConcurrentHashMap<>() + + @TestOnly + protected AwsEcsTaskHandler() {} + + /** + * Create a new ECS task handler + * + * @param task The {@link nextflow.processor.TaskRun} descriptor of the task to run + * @param executor The {@link AwsEcsExecutor} instance + */ + AwsEcsTaskHandler(TaskRun task, AwsEcsExecutor executor) { + super(task) + this.executor = executor + this.logFile = task.workDir.resolve(TaskRun.CMD_LOG) + this.scriptFile = task.workDir.resolve(TaskRun.CMD_SCRIPT) + this.inputFile = task.workDir.resolve(TaskRun.CMD_INFILE) + this.outputFile = task.workDir.resolve(TaskRun.CMD_OUTFILE) + this.errorFile = task.workDir.resolve(TaskRun.CMD_ERRFILE) + this.exitFile = task.workDir.resolve(TaskRun.CMD_EXIT) + this.wrapperFile = task.workDir.resolve(TaskRun.CMD_RUN) + this.traceFile = task.workDir.resolve(TaskRun.CMD_TRACE) + } + + /** + * @return The task ARN or null if not yet submitted + */ + String getTaskArn() { taskArn } + + /** + * @return The ECS client from the executor + */ + protected EcsClient getEcsClient() { executor.ecsClient } + + /** + * @return The AWS ECS options from the executor + */ + protected AwsEcsOptions getAwsOptions() { executor.awsOptions } + + @Override + boolean checkIfRunning() { + if (taskArn == null || !isSubmitted()) + return false + + final ecsTask = describeTask(taskArn) + if (!ecsTask) + return false + + final lastStatus = ecsTask.lastStatus() + final result = lastStatus in ['RUNNING', 'DEACTIVATING', 'STOPPING', 'STOPPED'] + + if (result) { + this.status = TaskStatus.RUNNING + log.trace "[AWS ECS] Task $taskArn is now RUNNING (status: $lastStatus)" + } + + return result + } + + @Override + boolean checkIfCompleted() { + if (!isRunning()) + return false + + final ecsTask = describeTask(taskArn) + if (!ecsTask) + return false + + final done = ecsTask.lastStatus() == 'STOPPED' + if (done) { + // Extract exit code from container + final container = ecsTask.containers()?.find { it.name() == 'main' } + final exitCode = container?.exitCode() + + if (exitCode != null) { + task.exitStatus = exitCode + } else { + // Fallback to reading exit file + task.exitStatus = readExitFile() + } + + // Set stdout/stderr files + task.stdout = outputFile + task.stderr = errorFile + + // Check for spot interruption + final stoppedReason = ecsTask.stoppedReason() + final stopCode = ecsTask.stopCode()?.toString() + + if (isSpotInterruption(stoppedReason, stopCode)) { + if (spotAttempts < awsOptions.maxSpotAttempts) { + log.debug "[AWS ECS] Spot interruption detected for task $taskArn, attempt ${spotAttempts + 1}/${awsOptions.maxSpotAttempts}" + spotAttempts++ + // Resubmit will be handled by Nextflow's retry mechanism + } + task.error = new ProcessException("Task terminated due to spot instance interruption: $stoppedReason") + } + else if (task.exitStatus != 0) { + final reason = stoppedReason ?: 'Unknown error' + final unrecoverable = reason.contains('CannotPullContainer') && reason.contains('unauthorized') + task.error = unrecoverable + ? new ProcessUnrecoverableException(reason) + : new ProcessException(reason) + } + + status = TaskStatus.COMPLETED + log.debug "[AWS ECS] Task $taskArn completed with exit code ${task.exitStatus}" + return true + } + + return false + } + + /** + * Check if the stop reason indicates a spot instance interruption + */ + protected boolean isSpotInterruption(String stoppedReason, String stopCode) { + if (!stoppedReason) + return false + // ECS Managed Instances uses spot instances that can be interrupted + return stoppedReason.contains('spot') || + stoppedReason.contains('Spot') || + stoppedReason.contains('SPOT') || + stoppedReason.contains('capacity') || + stopCode == 'SpotInterruption' + } + + @Override + void killTask() { + if (taskArn) { + log.trace "[AWS ECS] Stopping task: $taskArn" + executor.reaper.submit({ + try { + final request = StopTaskRequest.builder() + .cluster(awsOptions.cluster) + .task(taskArn) + .reason('Task killed by Nextflow') + .build() + ecsClient.stopTask(request) + log.debug "[AWS ECS] Stopped task: $taskArn" + } + catch (Exception e) { + log.warn "[AWS ECS] Failed to stop task $taskArn: ${e.message}" + } + }) + } + } + + @Override + void submit() { + // Build the wrapper script + prepareLauncher() + + // Get or register task definition + taskDefArn = getOrCreateTaskDefinition() + log.trace "[AWS ECS] Using task definition: $taskDefArn" + + // Create and submit run task request + final request = newRunTaskRequest() + log.trace "[AWS ECS] Submitting task > $request" + + final response = ecsClient.runTask(request) + + if (response.tasks().isEmpty()) { + final failures = response.failures() + final reason = failures ? failures.collect { "${it.arn()}: ${it.reason()}" }.join('; ') : 'Unknown' + throw new ProcessException("Failed to submit ECS task: $reason") + } + + taskArn = response.tasks().first().taskArn() + this.status = TaskStatus.SUBMITTED + log.debug "[AWS ECS] Process `${task.lazyName()}` submitted > taskArn=$taskArn; work-dir=${task.workDirStr}" + } + + @Override + void prepareLauncher() { + createTaskWrapper().build() + } + + protected BashWrapperBuilder createTaskWrapper() { + // ECS executor only supports Fusion mode + return fusionLauncher() + } + + /** + * Get an existing task definition from cache or register a new one + */ + protected String getOrCreateTaskDefinition() { + final model = createTaskDefinitionModel() + final cacheKey = model.computeCacheKey() + + // Check cache first + String cachedArn = taskDefinitionCache.get(cacheKey) + if (cachedArn) { + log.trace "[AWS ECS] Using cached task definition: $cachedArn (key: $cacheKey)" + return cachedArn + } + + // Register new task definition + final request = model.toRequest() + log.trace "[AWS ECS] Registering task definition: ${request.family()}" + + final response = ecsClient.registerTaskDefinition(request) + final arn = response.taskDefinition().taskDefinitionArn() + + // Cache the result + taskDefinitionCache.put(cacheKey, arn) + log.debug "[AWS ECS] Registered task definition: $arn (key: $cacheKey)" + + return arn + } + + /** + * Create a task definition model from the current task configuration + */ + protected RegisterTaskDefinitionModel createTaskDefinitionModel() { + final model = RegisterTaskDefinitionModel.create() + + // Set task definition family name + final containerImage = task.container + final familyName = "nf-${sanitizeFamilyName(containerImage)}" + model.family = familyName + + // Set IAM roles + model.executionRoleArn = awsOptions.executionRole + model.taskRoleArn = awsOptions.taskRole + + // Map CPU (Nextflow cpus -> ECS CPU units, 1 CPU = 1024 units) + final cpus = task.config.getCpus() + model.cpu = (cpus * 1024).toString() + + // Map memory (Nextflow memory -> ECS memory in MiB) + final memory = task.config.getMemory() + model.memory = memory ? (memory.toMega() as Long).toString() : '1024' + + // Set ephemeral storage if specified + final disk = task.config.getDisk() + if (disk) { + model.ephemeralStorageGiB = disk.toGiga() as Integer + } + + // Configure container + final container = model.containerDefinitions.first() + container.image = containerImage + container.withLogging(awsOptions.logsGroup, 'nf', awsOptions.region) + + // Set container resource limits (soft limits at container level) + container.cpu = cpus * 1024 + container.memory = memory ? memory.toMega() as Integer : 1024 + + // Enable Fusion filesystem support (adds SYS_ADMIN capability and /dev/fuse device) + // This is always enabled because ECS executor requires Fusion + container.fusionEnabled = true + + return model + } + + /** + * Sanitize container image name for use in task definition family name + */ + protected String sanitizeFamilyName(String imageName) { + if (!imageName) + return 'unknown' + // Replace invalid characters with hyphens, keep only alphanumeric and hyphens + def result = imageName + .replaceAll('[/:@]', '-') + .replaceAll('[^a-zA-Z0-9-]', '') + .replaceAll('-+', '-') + .replaceAll('^-|-$', '') + // Limit length (ECS family names max 255 chars) + if (result.length() > 200) + result = result.substring(0, 200) + return result ?: 'task' + } + + /** + * Create the RunTask request + */ + protected RunTaskRequest newRunTaskRequest() { + // Build network configuration + final assignPublicIp = awsOptions.assignPublicIp + ? AssignPublicIp.ENABLED + : AssignPublicIp.DISABLED + + final networkConfig = NetworkConfiguration.builder() + .awsvpcConfiguration( + AwsVpcConfiguration.builder() + .subnets(executor.resolvedSubnets) + .securityGroups(executor.resolvedSecurityGroups) + .assignPublicIp(assignPublicIp) + .build() + ) + .build() + + // Build container overrides with command and environment + final envVars = getEnvironment().collect { k, v -> + KeyValuePair.builder().name(k).value(v).build() + } + + final containerOverride = ContainerOverride.builder() + .name('main') + .command(getContainerCommand()) + .environment(envVars) + .build() + + final taskOverride = TaskOverride.builder() + .containerOverrides(containerOverride) + .build() + + return RunTaskRequest.builder() + .cluster(awsOptions.cluster) + .taskDefinition(taskDefArn) + .networkConfiguration(networkConfig) + .overrides(taskOverride) + .count(1) + .build() + } + + /** + * Get the command to execute in the container + */ + protected List getContainerCommand() { + // ECS executor only supports Fusion mode + return fusionSubmitCli() + } + + /** + * Get environment variables for the container + */ + protected Map getEnvironment() { + final result = new LinkedHashMap() + + // Add Fusion environment if enabled + if (fusionEnabled()) { + for (Map.Entry entry : fusionLauncher().fusionEnv()) { + result.put(entry.key, entry.value) + } + } + + return result + } + + /** + * Describe an ECS task by ARN + */ + protected Task describeTask(String taskArn) { + try { + final request = DescribeTasksRequest.builder() + .cluster(awsOptions.cluster) + .tasks(taskArn) + .build() + + final response = ecsClient.describeTasks(request) + + if (response.tasks().isEmpty()) { + log.debug "[AWS ECS] Task not found: $taskArn" + return null + } + + return response.tasks().first() + } + catch (Exception e) { + log.warn "[AWS ECS] Failed to describe task $taskArn: ${e.message}" + return null + } + } + + /** + * Read exit code from the exit file + */ + protected int readExitFile() { + try { + exitFile.text as Integer + } + catch (Exception e) { + log.debug "[AWS ECS] Cannot read exit status for task: `${task.lazyName()}` | ${e.message}" + return Integer.MAX_VALUE + } + } + + @Override + TraceRecord getTraceRecord() { + def result = super.getTraceRecord() + result.put('native_id', taskArn) + return result + } +} diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/ecs/model/ContainerDefinitionModel.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/ecs/model/ContainerDefinitionModel.groovy new file mode 100644 index 0000000000..7b88acffd3 --- /dev/null +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/ecs/model/ContainerDefinitionModel.groovy @@ -0,0 +1,239 @@ +/* + * Copyright 2020-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.cloud.aws.ecs.model + +import groovy.transform.CompileStatic +import groovy.transform.EqualsAndHashCode +import groovy.transform.ToString +import software.amazon.awssdk.services.ecs.model.ContainerDefinition +import software.amazon.awssdk.services.ecs.model.Device +import software.amazon.awssdk.services.ecs.model.DeviceCgroupPermission +import software.amazon.awssdk.services.ecs.model.KernelCapabilities +import software.amazon.awssdk.services.ecs.model.KeyValuePair +import software.amazon.awssdk.services.ecs.model.LinuxParameters +import software.amazon.awssdk.services.ecs.model.LogConfiguration +import software.amazon.awssdk.services.ecs.model.LogDriver +import software.amazon.awssdk.services.ecs.model.ResourceRequirement +import software.amazon.awssdk.services.ecs.model.ResourceType + +/** + * Mutable wrapper model for building ECS ContainerDefinition. + * + * This class provides a builder-style interface for constructing container + * definitions with Nextflow task requirements. + * + * @author Paolo Di Tommaso + */ +@ToString(includeNames = true, includePackage = false) +@EqualsAndHashCode +@CompileStatic +class ContainerDefinitionModel { + + /** + * Container name (always "main" for Nextflow tasks) + */ + String name = 'main' + + /** + * Container image (e.g., "ubuntu:latest") + */ + String image + + /** + * Command to execute (typically the bash wrapper script) + */ + List command + + /** + * Entry point override + */ + List entryPoint + + /** + * Working directory inside the container + */ + String workingDirectory + + /** + * Environment variables + */ + Map environment = [:] + + /** + * Whether the container is essential (must be true for single-container tasks) + */ + boolean essential = true + + /** + * CloudWatch Logs group name + */ + String logsGroup + + /** + * CloudWatch Logs stream prefix + */ + String logsStreamPrefix + + /** + * AWS region for CloudWatch Logs + */ + String logsRegion + + /** + * CPU units for this container (soft limit) + */ + Integer cpu + + /** + * Memory in MiB for this container (hard limit) + */ + Integer memory + + /** + * Memory reservation in MiB (soft limit) + */ + Integer memoryReservation + + /** + * Whether to enable Fusion filesystem support (adds SYS_ADMIN capability and /dev/fuse device) + */ + boolean fusionEnabled = false + + /** + * Build the ECS ContainerDefinition from this model. + * + * @param gpuCount Number of GPUs to allocate (null or 0 for none) + * @return ContainerDefinition ready for task definition + */ + ContainerDefinition toContainerDefinition(Integer gpuCount = null) { + def builder = ContainerDefinition.builder() + .name(name) + .essential(essential) + + if (image) + builder.image(image) + + if (command) + builder.command(command) + + if (entryPoint) + builder.entryPoint(entryPoint) + + if (workingDirectory) + builder.workingDirectory(workingDirectory) + + // Add environment variables + if (environment) { + def envPairs = environment.collect { k, v -> + KeyValuePair.builder().name(k).value(v).build() + } + builder.environment(envPairs) + } + + // Configure CloudWatch Logs + if (logsGroup) { + def logOptions = [ + 'awslogs-group': logsGroup, + 'awslogs-stream-prefix': logsStreamPrefix ?: 'nf' + ] + if (logsRegion) + logOptions['awslogs-region'] = logsRegion + + builder.logConfiguration( + LogConfiguration.builder() + .logDriver(LogDriver.AWSLOGS) + .options(logOptions) + .build() + ) + } + + // Add CPU/memory limits if specified + if (cpu) + builder.cpu(cpu) + + if (memory) + builder.memory(memory) + + if (memoryReservation) + builder.memoryReservation(memoryReservation) + + // Add GPU resource requirements if specified + if (gpuCount && gpuCount > 0) { + builder.resourceRequirements( + ResourceRequirement.builder() + .type(ResourceType.GPU) + .value(gpuCount.toString()) + .build() + ) + } + + // Add Linux parameters for Fusion filesystem FUSE driver support + if (fusionEnabled) { + builder.linuxParameters( + LinuxParameters.builder() + .capabilities( + KernelCapabilities.builder() + .add('SYS_ADMIN') // Required for FUSE mount operations + .build() + ) + .devices( + Device.builder() + .hostPath('/dev/fuse') + .containerPath('/dev/fuse') + .permissions( + DeviceCgroupPermission.READ, + DeviceCgroupPermission.WRITE, + DeviceCgroupPermission.MKNOD + ) + .build() + ) + .build() + ) + } + + return builder.build() + } + + /** + * Add an environment variable + */ + ContainerDefinitionModel withEnvironment(String key, String value) { + if (value != null) + environment[key] = value + return this + } + + /** + * Add multiple environment variables + */ + ContainerDefinitionModel withEnvironment(Map env) { + if (env) + environment.putAll(env) + return this + } + + /** + * Configure CloudWatch Logs + */ + ContainerDefinitionModel withLogging(String group, String prefix, String region) { + this.logsGroup = group + this.logsStreamPrefix = prefix + this.logsRegion = region + return this + } +} diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/ecs/model/RegisterTaskDefinitionModel.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/ecs/model/RegisterTaskDefinitionModel.groovy new file mode 100644 index 0000000000..a2ad644fb2 --- /dev/null +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/ecs/model/RegisterTaskDefinitionModel.groovy @@ -0,0 +1,144 @@ +/* + * Copyright 2020-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.cloud.aws.ecs.model + +import groovy.transform.CompileStatic +import groovy.transform.EqualsAndHashCode +import groovy.transform.ToString +import software.amazon.awssdk.services.ecs.model.Compatibility +import software.amazon.awssdk.services.ecs.model.EphemeralStorage +import software.amazon.awssdk.services.ecs.model.NetworkMode +import software.amazon.awssdk.services.ecs.model.RegisterTaskDefinitionRequest + +/** + * Mutable wrapper model for building ECS RegisterTaskDefinition requests. + * + * This class provides a builder-style interface for constructing task definitions + * with Nextflow process requirements mapped to ECS task definition fields. + * + * @author Paolo Di Tommaso + */ +@ToString(includeNames = true, includePackage = false) +@EqualsAndHashCode +@CompileStatic +class RegisterTaskDefinitionModel { + + /** + * Task definition family name (e.g., "nf-ubuntu-latest") + */ + String family + + /** + * Task execution role ARN - used by ECS agent to pull images and write logs + */ + String executionRoleArn + + /** + * Task role ARN - used by the container for AWS API calls (e.g., S3 access) + */ + String taskRoleArn + + /** + * CPU units (1024 = 1 vCPU) + */ + String cpu + + /** + * Memory in MiB + */ + String memory + + /** + * Container definitions (typically just one "main" container) + */ + List containerDefinitions = [] + + /** + * Ephemeral storage size in GiB (30-200 for Fargate, 30-16384 for Managed Instances) + */ + Integer ephemeralStorageGiB + + /** + * Number of GPUs required (for EC2/Managed Instances only) + */ + Integer gpuCount + + /** + * Build the ECS RegisterTaskDefinitionRequest from this model. + * + * @return RegisterTaskDefinitionRequest ready for AWS SDK call + */ + RegisterTaskDefinitionRequest toRequest() { + def builder = RegisterTaskDefinitionRequest.builder() + .family(family) + .networkMode(NetworkMode.AWSVPC) + .requiresCompatibilities(Compatibility.EC2) // Managed Instances uses EC2 compatibility + + if (executionRoleArn) + builder.executionRoleArn(executionRoleArn) + + if (taskRoleArn) + builder.taskRoleArn(taskRoleArn) + + if (cpu) + builder.cpu(cpu) + + if (memory) + builder.memory(memory) + + // Build container definitions + if (containerDefinitions) { + def containers = containerDefinitions.collect { it.toContainerDefinition(gpuCount) } + builder.containerDefinitions(containers) + } + + // Configure ephemeral storage for Managed Instances + if (ephemeralStorageGiB && ephemeralStorageGiB > 30) { + builder.ephemeralStorage( + EphemeralStorage.builder() + .sizeInGiB(ephemeralStorageGiB) + .build() + ) + } + + return builder.build() as RegisterTaskDefinitionRequest + } + + /** + * Compute a cache key for this task definition based on resource requirements. + * Used to avoid re-registering identical task definitions. + * + * @return Cache key string + */ + String computeCacheKey() { + def container = containerDefinitions ? containerDefinitions.first() : null + def image = container?.image ?: 'unknown' + def gpu = gpuCount ?: 0 + def disk = ephemeralStorageGiB ?: 30 + return "${image}:${cpu}:${memory}:${gpu}:${disk}" + } + + /** + * Create a model with default container named "main" + */ + static RegisterTaskDefinitionModel create() { + def model = new RegisterTaskDefinitionModel() + model.containerDefinitions = [new ContainerDefinitionModel(name: 'main')] + return model + } +} diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/ecs/AwsEcsConfigTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/ecs/AwsEcsConfigTest.groovy new file mode 100644 index 0000000000..60c76d3250 --- /dev/null +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/ecs/AwsEcsConfigTest.groovy @@ -0,0 +1,118 @@ +/* + * Copyright 2020-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.cloud.aws.ecs + +import nextflow.cloud.aws.config.AwsEcsConfig +import spock.lang.Specification + +/** + * Tests for {@link AwsEcsConfig} + * + * @author Paolo Di Tommaso + */ +class AwsEcsConfigTest extends Specification { + + def 'should create default config'() { + when: + def ecs = new AwsEcsConfig([:]) + + then: + ecs.cluster == null + ecs.executionRole == null + ecs.taskRole == null + ecs.subnets == null + ecs.securityGroups == null + ecs.logsGroup == AwsEcsConfig.DEFAULT_LOGS_GROUP + ecs.maxSpotAttempts == AwsEcsConfig.DEFAULT_MAX_SPOT_ATTEMPTS + ecs.assignPublicIp == true + } + + def 'should create config with options'() { + given: + def OPTS = [ + cluster: 'my-cluster', + executionRole: 'arn:aws:iam::123456789:role/ecsTaskExecutionRole', + taskRole: 'arn:aws:iam::123456789:role/ecsTaskRole', + subnets: ['subnet-abc123', 'subnet-def456'], + securityGroups: ['sg-xyz789'], + logsGroup: '/custom/logs/group', + maxSpotAttempts: 10, + assignPublicIp: false + ] + + when: + def ecs = new AwsEcsConfig(OPTS) + + then: + ecs.cluster == 'my-cluster' + ecs.executionRole == 'arn:aws:iam::123456789:role/ecsTaskExecutionRole' + ecs.taskRole == 'arn:aws:iam::123456789:role/ecsTaskRole' + ecs.subnets == ['subnet-abc123', 'subnet-def456'] + ecs.securityGroups == ['sg-xyz789'] + ecs.logsGroup == '/custom/logs/group' + ecs.maxSpotAttempts == 10 + ecs.assignPublicIp == false + } + + def 'should parse string lists'() { + given: + def ecs = new AwsEcsConfig([:]) + + expect: + ecs.parseStringList(OBJ) == EXPECTED + + where: + OBJ | EXPECTED + null | null + 'foo' | ['foo'] + 'foo, bar' | ['foo', 'bar'] + 'subnet-1,subnet-2,subnet-3'| ['subnet-1', 'subnet-2', 'subnet-3'] + ['subnet-1', 'subnet-2'] | ['subnet-1', 'subnet-2'] + } + + def 'should use default values when not specified'() { + when: + def ecs = new AwsEcsConfig([ + cluster: 'test-cluster', + executionRole: 'arn:aws:iam::123:role/test' + ]) + + then: + ecs.cluster == 'test-cluster' + ecs.executionRole == 'arn:aws:iam::123:role/test' + ecs.logsGroup == '/aws/ecs/nextflow' + ecs.maxSpotAttempts == 5 + ecs.assignPublicIp == true + } + + def 'should handle maxSpotAttempts zero value'() { + when: + def ecs = new AwsEcsConfig([maxSpotAttempts: 0]) + + then: + ecs.maxSpotAttempts == 0 + } + + def 'should handle assignPublicIp false value'() { + when: + def ecs = new AwsEcsConfig([assignPublicIp: false]) + + then: + ecs.assignPublicIp == false + } +} diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/ecs/AwsEcsExecutorTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/ecs/AwsEcsExecutorTest.groovy new file mode 100644 index 0000000000..551f9d4caa --- /dev/null +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/ecs/AwsEcsExecutorTest.groovy @@ -0,0 +1,301 @@ +/* + * Copyright 2020-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.cloud.aws.ecs + +import java.nio.file.Paths + +import nextflow.Session +import nextflow.cloud.aws.nio.S3Path +import nextflow.exception.AbortOperationException +import nextflow.processor.TaskRun +import software.amazon.awssdk.services.ec2.Ec2Client +import software.amazon.awssdk.services.ec2.model.DescribeSecurityGroupsResponse +import software.amazon.awssdk.services.ec2.model.DescribeSubnetsResponse +import software.amazon.awssdk.services.ec2.model.DescribeVpcsResponse +import software.amazon.awssdk.services.ec2.model.SecurityGroup +import software.amazon.awssdk.services.ec2.model.Subnet +import software.amazon.awssdk.services.ec2.model.Vpc +import software.amazon.awssdk.services.ecs.EcsClient +import spock.lang.Specification + +/** + * Tests for {@link AwsEcsExecutor} + * + * @author Paolo Di Tommaso + */ +class AwsEcsExecutorTest extends Specification { + + def 'should validate fusion is enabled'() { + given: + def session = Mock(Session) { + getConfig() >> [aws: [ecs: [cluster: 'test-cluster']]] + } + def executor = Spy(AwsEcsExecutor) + executor.session = session + + when: + executor.isFusionEnabled() >> false + executor.validateFusion() + + then: + 1 * session.abort() + thrown(AbortOperationException) + } + + def 'should pass fusion validation when enabled'() { + given: + def session = Mock(Session) { + getConfig() >> [aws: [ecs: [cluster: 'test-cluster']]] + } + def executor = Spy(AwsEcsExecutor) + executor.session = session + + when: + executor.isFusionEnabled() >> true + executor.validateFusion() + + then: + noExceptionThrown() + } + + def 'should validate work directory is S3'() { + given: + def session = Mock(Session) { + getConfig() >> [aws: [ecs: [cluster: 'test-cluster']]] + } + def executor = Spy(AwsEcsExecutor) { + getWorkDir() >> Paths.get('/local/work/dir') + } + executor.session = session + + when: + executor.validateWorkDir() + + then: + 1 * session.abort() + thrown(AbortOperationException) + } + + def 'should pass work dir validation for S3 path'() { + given: + def s3Path = Mock(S3Path) + def session = Mock(Session) { + getConfig() >> [aws: [ecs: [cluster: 'test-cluster']]] + } + def executor = Spy(AwsEcsExecutor) { + getWorkDir() >> s3Path + } + executor.session = session + + when: + executor.validateWorkDir() + + then: + noExceptionThrown() + } + + def 'should use configured subnets'() { + given: + def awsOptions = Mock(AwsEcsOptions) { + getSubnets() >> ['subnet-123', 'subnet-456'] + getSecurityGroups() >> null + } + def ec2Client = Mock(Ec2Client) + def executor = new AwsEcsExecutor() + executor.@awsOptions = awsOptions + executor.@ec2Client = ec2Client + + when: + executor.discoverVpcConfiguration() + + then: + executor.resolvedSubnets == ['subnet-123', 'subnet-456'] + // Security groups should be auto-discovered + 1 * ec2Client.describeVpcs(_) >> DescribeVpcsResponse.builder() + .vpcs(Vpc.builder().vpcId('vpc-default').build()) + .build() + 1 * ec2Client.describeSecurityGroups(_) >> DescribeSecurityGroupsResponse.builder() + .securityGroups(SecurityGroup.builder().groupId('sg-default').build()) + .build() + } + + def 'should use configured security groups'() { + given: + def awsOptions = Mock(AwsEcsOptions) { + getSubnets() >> null + getSecurityGroups() >> ['sg-123'] + } + def ec2Client = Mock(Ec2Client) + def executor = new AwsEcsExecutor() + executor.@awsOptions = awsOptions + executor.@ec2Client = ec2Client + + when: + executor.discoverVpcConfiguration() + + then: + executor.resolvedSecurityGroups == ['sg-123'] + // Subnets should be auto-discovered (one describeVpcs call for subnet discovery) + 1 * ec2Client.describeVpcs(_) >> DescribeVpcsResponse.builder() + .vpcs(Vpc.builder().vpcId('vpc-default').build()) + .build() + 1 * ec2Client.describeSubnets(_) >> DescribeSubnetsResponse.builder() + .subnets( + Subnet.builder().subnetId('subnet-a').build(), + Subnet.builder().subnetId('subnet-b').build() + ) + .build() + } + + def 'should auto-discover VPC configuration'() { + given: + def awsOptions = Mock(AwsEcsOptions) { + getSubnets() >> null + getSecurityGroups() >> null + } + def ec2Client = Mock(Ec2Client) + def executor = new AwsEcsExecutor() + executor.@awsOptions = awsOptions + executor.@ec2Client = ec2Client + + when: + executor.discoverVpcConfiguration() + + then: + // Called twice: once for subnets, once for security groups + 2 * ec2Client.describeVpcs(_) >> DescribeVpcsResponse.builder() + .vpcs(Vpc.builder().vpcId('vpc-default').build()) + .build() + 1 * ec2Client.describeSubnets(_) >> DescribeSubnetsResponse.builder() + .subnets( + Subnet.builder().subnetId('subnet-a').build(), + Subnet.builder().subnetId('subnet-b').build() + ) + .build() + 1 * ec2Client.describeSecurityGroups(_) >> DescribeSecurityGroupsResponse.builder() + .securityGroups(SecurityGroup.builder().groupId('sg-default').build()) + .build() + + and: + executor.resolvedSubnets == ['subnet-a', 'subnet-b'] + executor.resolvedSecurityGroups == ['sg-default'] + } + + def 'should fail when no default VPC exists for subnet discovery'() { + given: + def awsOptions = Mock(AwsEcsOptions) { + getSubnets() >> null + getSecurityGroups() >> ['sg-123'] + } + def ec2Client = Mock(Ec2Client) + def executor = new AwsEcsExecutor() + executor.@awsOptions = awsOptions + executor.@ec2Client = ec2Client + + when: + executor.discoverVpcConfiguration() + + then: + 1 * ec2Client.describeVpcs(_) >> DescribeVpcsResponse.builder() + .vpcs([]) + .build() + + thrown(AbortOperationException) + } + + def 'should fail when no subnets found in default VPC'() { + given: + def awsOptions = Mock(AwsEcsOptions) { + getSubnets() >> null + getSecurityGroups() >> ['sg-123'] + } + def ec2Client = Mock(Ec2Client) + def executor = new AwsEcsExecutor() + executor.@awsOptions = awsOptions + executor.@ec2Client = ec2Client + + when: + executor.discoverVpcConfiguration() + + then: + 1 * ec2Client.describeVpcs(_) >> DescribeVpcsResponse.builder() + .vpcs(Vpc.builder().vpcId('vpc-default').build()) + .build() + 1 * ec2Client.describeSubnets(_) >> DescribeSubnetsResponse.builder() + .subnets([]) + .build() + + thrown(AbortOperationException) + } + + def 'should create task handler'() { + given: + def task = Mock(TaskRun) { + getWorkDir() >> Paths.get('/work/dir') + } + def executor = new AwsEcsExecutor() + executor.@awsOptions = Mock(AwsEcsOptions) + executor.@ecsClient = Mock(EcsClient) + + when: + def handler = executor.createTaskHandler(task) + + then: + handler instanceof AwsEcsTaskHandler + } + + def 'should return container native true'() { + given: + def executor = new AwsEcsExecutor() + + expect: + executor.isContainerNative() == true + } + + def 'should return secret native true'() { + given: + def executor = new AwsEcsExecutor() + + expect: + executor.isSecretNative() == true + } + + def 'should return docker container engine'() { + given: + def executor = new AwsEcsExecutor() + + expect: + executor.containerConfigEngine() == 'docker' + } + + def 'should warn about PATH env in config'() { + given: + def session = Mock(Session) { + getConfig() >> [env: [PATH: '/some/path']] + } + def executor = new AwsEcsExecutor() + executor.session = session + + when: + executor.validatePathDir() + + then: + // Should complete without error (just logs a warning) + noExceptionThrown() + } +} diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/ecs/AwsEcsTaskHandlerTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/ecs/AwsEcsTaskHandlerTest.groovy new file mode 100644 index 0000000000..3b7407355b --- /dev/null +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/ecs/AwsEcsTaskHandlerTest.groovy @@ -0,0 +1,368 @@ +/* + * Copyright 2020-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.cloud.aws.ecs + +import java.nio.file.Paths + +import nextflow.processor.TaskConfig +import nextflow.processor.TaskRun +import nextflow.processor.TaskStatus +import nextflow.util.MemoryUnit +import software.amazon.awssdk.services.ecs.EcsClient +import software.amazon.awssdk.services.ecs.model.AssignPublicIp +import software.amazon.awssdk.services.ecs.model.Container +import software.amazon.awssdk.services.ecs.model.DescribeTasksResponse +import software.amazon.awssdk.services.ecs.model.Task +import spock.lang.Specification +import spock.lang.Unroll + +/** + * Tests for {@link AwsEcsTaskHandler} + * + * @author Paolo Di Tommaso + */ +class AwsEcsTaskHandlerTest extends Specification { + + def 'should sanitize family name'() { + given: + def handler = [:] as AwsEcsTaskHandler + + expect: + handler.sanitizeFamilyName('ubuntu:latest') == 'ubuntu-latest' + handler.sanitizeFamilyName('quay.io/org/image:1.0') == 'quayio-org-image-10' + handler.sanitizeFamilyName('registry.example.com/path/to/image@sha256:abc123') == 'registryexamplecom-path-to-image-sha256-abc123' + handler.sanitizeFamilyName('simple') == 'simple' + handler.sanitizeFamilyName(null) == 'unknown' + handler.sanitizeFamilyName('') == 'unknown' + } + + def 'should limit family name length'() { + given: + def handler = [:] as AwsEcsTaskHandler + def longName = 'a' * 250 + + when: + def result = handler.sanitizeFamilyName(longName) + + then: + result.size() == 200 + } + + @Unroll + def 'should detect spot interruption: #stoppedReason'() { + given: + def handler = [:] as AwsEcsTaskHandler + + expect: + handler.isSpotInterruption(stoppedReason, stopCode) == expected + + where: + stoppedReason | stopCode | expected + null | null | false + 'Task finished successfully' | null | false + 'spot capacity unavailable' | null | true + 'Spot instance interruption' | null | true + 'Task stopped due to SPOT termination' | null | true + 'Insufficient capacity in region' | null | true + 'Normal termination' | 'SpotInterruption' | true + 'User requested termination' | null | false + } + + def 'should create task definition model'() { + given: + def task = Mock(TaskRun) { + getContainer() >> 'ubuntu:latest' + getConfig() >> new TaskConfig(cpus: 2, memory: new MemoryUnit('4 GB')) + getWorkDir() >> Paths.get('/work/dir') + } + def awsOptions = Mock(AwsEcsOptions) { + getExecutionRole() >> 'arn:aws:iam::123:role/exec' + getTaskRole() >> 'arn:aws:iam::123:role/task' + getLogsGroup() >> '/aws/ecs/test' + getRegion() >> 'us-east-1' + } + def executor = Mock(AwsEcsExecutor) { + getAwsOptions() >> awsOptions + } + def handler = new AwsEcsTaskHandler(task, executor) + + when: + def model = handler.createTaskDefinitionModel() + + then: + model.family == 'nf-ubuntu-latest' + model.cpu == '2048' // 2 CPUs * 1024 + model.memory == '4096' // 4 GB in MiB + model.executionRoleArn == 'arn:aws:iam::123:role/exec' + model.taskRoleArn == 'arn:aws:iam::123:role/task' + model.containerDefinitions.first().image == 'ubuntu:latest' + } + + def 'should create task definition model with disk'() { + given: + def task = Mock(TaskRun) { + getContainer() >> 'ubuntu:latest' + getConfig() >> new TaskConfig(cpus: 1, memory: new MemoryUnit('2 GB'), disk: new MemoryUnit('100 GB')) + getWorkDir() >> Paths.get('/work/dir') + } + def awsOptions = Mock(AwsEcsOptions) { + getExecutionRole() >> 'arn:aws:iam::123:role/exec' + getTaskRole() >> null + getLogsGroup() >> '/aws/ecs/test' + getRegion() >> 'us-east-1' + } + def executor = Mock(AwsEcsExecutor) { + getAwsOptions() >> awsOptions + } + def handler = new AwsEcsTaskHandler(task, executor) + + when: + def model = handler.createTaskDefinitionModel() + + then: + model.ephemeralStorageGiB == 100 + } + + def 'should build run task request'() { + given: + def task = Mock(TaskRun) { + getContainer() >> 'ubuntu:latest' + getConfig() >> new TaskConfig(cpus: 2, memory: new MemoryUnit('4 GB')) + getWorkDir() >> Paths.get('/work/dir') + } + def awsOptions = Mock(AwsEcsOptions) { + getCluster() >> 'test-cluster' + getAssignPublicIp() >> true + } + def executor = Mock(AwsEcsExecutor) { + getAwsOptions() >> awsOptions + getResolvedSubnets() >> ['subnet-123', 'subnet-456'] + getResolvedSecurityGroups() >> ['sg-789'] + } + def handler = Spy(new AwsEcsTaskHandler(task, executor)) { + fusionEnabled() >> false + getEnvironment() >> [:] + getContainerCommand() >> ['bash', '-c', 'echo hello'] + } + handler.@taskDefArn = 'arn:aws:ecs:us-east-1:123:task-definition/test:1' + + when: + def request = handler.newRunTaskRequest() + + then: + request.cluster() == 'test-cluster' + request.taskDefinition() == 'arn:aws:ecs:us-east-1:123:task-definition/test:1' + request.count() == 1 + request.networkConfiguration().awsvpcConfiguration().subnets() == ['subnet-123', 'subnet-456'] + request.networkConfiguration().awsvpcConfiguration().securityGroups() == ['sg-789'] + request.networkConfiguration().awsvpcConfiguration().assignPublicIp() == AssignPublicIp.ENABLED + request.overrides().containerOverrides().first().name() == 'main' + request.overrides().containerOverrides().first().command() == ['bash', '-c', 'echo hello'] + } + + def 'should build run task request with disabled public IP'() { + given: + def task = Mock(TaskRun) { + getContainer() >> 'ubuntu:latest' + getConfig() >> new TaskConfig(cpus: 1, memory: new MemoryUnit('2 GB')) + getWorkDir() >> Paths.get('/work/dir') + } + def awsOptions = Mock(AwsEcsOptions) { + getCluster() >> 'test-cluster' + getAssignPublicIp() >> false + } + def executor = Mock(AwsEcsExecutor) { + getAwsOptions() >> awsOptions + getResolvedSubnets() >> ['subnet-123'] + getResolvedSecurityGroups() >> ['sg-789'] + } + def handler = Spy(new AwsEcsTaskHandler(task, executor)) { + fusionEnabled() >> false + getEnvironment() >> [:] + getContainerCommand() >> ['bash', '-c', 'echo hello'] + } + handler.@taskDefArn = 'arn:aws:ecs:us-east-1:123:task-definition/test:1' + + when: + def request = handler.newRunTaskRequest() + + then: + request.networkConfiguration().awsvpcConfiguration().assignPublicIp() == AssignPublicIp.DISABLED + } + + def 'should check if running when task is running'() { + given: + def task = Mock(TaskRun) { + getWorkDir() >> Paths.get('/work/dir') + } + def ecsClient = Mock(EcsClient) + def awsOptions = Mock(AwsEcsOptions) { + getCluster() >> 'test-cluster' + } + def executor = Mock(AwsEcsExecutor) { + getEcsClient() >> ecsClient + getAwsOptions() >> awsOptions + } + def handler = new AwsEcsTaskHandler(task, executor) + handler.@taskArn = 'arn:aws:ecs:us-east-1:123:task/test-cluster/abc123' + handler.status = TaskStatus.SUBMITTED + + when: + def result = handler.checkIfRunning() + + then: + 1 * ecsClient.describeTasks(_) >> DescribeTasksResponse.builder() + .tasks(Task.builder() + .taskArn('arn:aws:ecs:us-east-1:123:task/test-cluster/abc123') + .lastStatus('RUNNING') + .build()) + .build() + + and: + result == true + handler.status == TaskStatus.RUNNING + } + + def 'should check if completed when task is stopped'() { + given: + def task = Mock(TaskRun) { + getWorkDir() >> Paths.get('/work/dir') + } + def ecsClient = Mock(EcsClient) + def awsOptions = Mock(AwsEcsOptions) { + getCluster() >> 'test-cluster' + getMaxSpotAttempts() >> 5 + } + def executor = Mock(AwsEcsExecutor) { + getEcsClient() >> ecsClient + getAwsOptions() >> awsOptions + } + def handler = new AwsEcsTaskHandler(task, executor) + handler.@taskArn = 'arn:aws:ecs:us-east-1:123:task/test-cluster/abc123' + handler.status = TaskStatus.RUNNING + + when: + def result = handler.checkIfCompleted() + + then: + 1 * ecsClient.describeTasks(_) >> DescribeTasksResponse.builder() + .tasks(Task.builder() + .taskArn('arn:aws:ecs:us-east-1:123:task/test-cluster/abc123') + .lastStatus('STOPPED') + .containers(Container.builder() + .name('main') + .exitCode(0) + .build()) + .build()) + .build() + + and: + result == true + handler.status == TaskStatus.COMPLETED + } + + def 'should return false when task not yet submitted'() { + given: + def task = Mock(TaskRun) { + getWorkDir() >> Paths.get('/work/dir') + } + def executor = Mock(AwsEcsExecutor) + def handler = new AwsEcsTaskHandler(task, executor) + // taskArn is null - not yet submitted + + when: + def result = handler.checkIfRunning() + + then: + result == false + } + + def 'should return false when checkIfCompleted and not running'() { + given: + def task = Mock(TaskRun) { + getWorkDir() >> Paths.get('/work/dir') + } + def executor = Mock(AwsEcsExecutor) + def handler = new AwsEcsTaskHandler(task, executor) + handler.status = TaskStatus.SUBMITTED // not yet RUNNING + + when: + def result = handler.checkIfCompleted() + + then: + result == false + } + + def 'should enable Fusion FUSE support in task definition'() { + given: + def task = Mock(TaskRun) { + getContainer() >> 'ubuntu:latest' + getConfig() >> new TaskConfig(cpus: 1, memory: new MemoryUnit('2 GB')) + getWorkDir() >> Paths.get('/work/dir') + } + def awsOptions = Mock(AwsEcsOptions) { + getExecutionRole() >> 'arn:aws:iam::123:role/exec' + getTaskRole() >> null + getLogsGroup() >> '/aws/ecs/test' + getRegion() >> 'us-east-1' + } + def executor = Mock(AwsEcsExecutor) { + getAwsOptions() >> awsOptions + } + def handler = new AwsEcsTaskHandler(task, executor) + + when: + def model = handler.createTaskDefinitionModel() + + then: + // Verify fusionEnabled is set on container definition + model.containerDefinitions.first().fusionEnabled == true + } + + def 'should include Linux parameters for Fusion FUSE driver in container definition'() { + given: + def container = new nextflow.cloud.aws.ecs.model.ContainerDefinitionModel(name: 'main', image: 'ubuntu:latest') + container.fusionEnabled = true + + when: + def containerDef = container.toContainerDefinition() + + then: + // Verify Linux parameters are set + containerDef.linuxParameters() != null + // Verify SYS_ADMIN capability is added + containerDef.linuxParameters().capabilities().add().contains('SYS_ADMIN') + // Verify /dev/fuse device is mounted + containerDef.linuxParameters().devices().size() == 1 + containerDef.linuxParameters().devices().first().hostPath() == '/dev/fuse' + containerDef.linuxParameters().devices().first().containerPath() == '/dev/fuse' + } + + def 'should not include Linux parameters when Fusion not enabled'() { + given: + def container = new nextflow.cloud.aws.ecs.model.ContainerDefinitionModel(name: 'main', image: 'ubuntu:latest') + container.fusionEnabled = false + + when: + def containerDef = container.toContainerDefinition() + + then: + // Verify no Linux parameters are set + containerDef.linuxParameters() == null + } +} diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/ecs/model/RegisterTaskDefinitionModelTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/ecs/model/RegisterTaskDefinitionModelTest.groovy new file mode 100644 index 0000000000..746b2cdbdb --- /dev/null +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/ecs/model/RegisterTaskDefinitionModelTest.groovy @@ -0,0 +1,120 @@ +/* + * Copyright 2020-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.cloud.aws.ecs.model + +import software.amazon.awssdk.services.ecs.model.Compatibility +import software.amazon.awssdk.services.ecs.model.NetworkMode +import spock.lang.Specification + +/** + * Tests for {@link RegisterTaskDefinitionModel} + * + * @author Paolo Di Tommaso + */ +class RegisterTaskDefinitionModelTest extends Specification { + + def 'should create model with defaults'() { + when: + def model = RegisterTaskDefinitionModel.create() + + then: + model.containerDefinitions.size() == 1 + model.containerDefinitions.first().name == 'main' + } + + def 'should compute cache key'() { + given: + def model = RegisterTaskDefinitionModel.create() + model.cpu = '1024' + model.memory = '2048' + model.containerDefinitions.first().image = 'ubuntu:latest' + + when: + def key1 = model.computeCacheKey() + + then: + key1 == 'ubuntu:latest:1024:2048:0:30' + + when: + model.gpuCount = 2 + model.ephemeralStorageGiB = 100 + def key2 = model.computeCacheKey() + + then: + key2 == 'ubuntu:latest:1024:2048:2:100' + } + + def 'should build request'() { + given: + def model = RegisterTaskDefinitionModel.create() + model.family = 'nf-test' + model.cpu = '1024' + model.memory = '2048' + model.executionRoleArn = 'arn:aws:iam::123:role/exec' + model.taskRoleArn = 'arn:aws:iam::123:role/task' + model.containerDefinitions.first().image = 'ubuntu:latest' + + when: + def request = model.toRequest() + + then: + request.family() == 'nf-test' + request.cpu() == '1024' + request.memory() == '2048' + request.executionRoleArn() == 'arn:aws:iam::123:role/exec' + request.taskRoleArn() == 'arn:aws:iam::123:role/task' + request.networkMode() == NetworkMode.AWSVPC + request.requiresCompatibilities() == [Compatibility.EC2] + request.containerDefinitions().size() == 1 + request.containerDefinitions().first().name() == 'main' + request.containerDefinitions().first().image() == 'ubuntu:latest' + } + + def 'should build request with ephemeral storage'() { + given: + def model = RegisterTaskDefinitionModel.create() + model.family = 'nf-test' + model.cpu = '1024' + model.memory = '2048' + model.ephemeralStorageGiB = 100 + model.containerDefinitions.first().image = 'ubuntu:latest' + + when: + def request = model.toRequest() + + then: + request.ephemeralStorage() != null + request.ephemeralStorage().sizeInGiB() == 100 + } + + def 'should not include ephemeral storage if 30 or less'() { + given: + def model = RegisterTaskDefinitionModel.create() + model.family = 'nf-test' + model.cpu = '1024' + model.memory = '2048' + model.ephemeralStorageGiB = 30 + model.containerDefinitions.first().image = 'ubuntu:latest' + + when: + def request = model.toRequest() + + then: + request.ephemeralStorage() == null + } +} diff --git a/specs/001-ecs-executor/checklists/requirements.md b/specs/001-ecs-executor/checklists/requirements.md new file mode 100644 index 0000000000..e1b0582d47 --- /dev/null +++ b/specs/001-ecs-executor/checklists/requirements.md @@ -0,0 +1,36 @@ +# Specification Quality Checklist: AWS ECS Managed Instances Executor + +**Purpose**: Validate specification completeness and quality before proceeding to planning +**Created**: 2025-12-30 +**Feature**: [spec.md](../spec.md) + +## Content Quality + +- [x] No implementation details (languages, frameworks, APIs) +- [x] Focused on user value and business needs +- [x] Written for non-technical stakeholders +- [x] All mandatory sections completed + +## Requirement Completeness + +- [x] No [NEEDS CLARIFICATION] markers remain +- [x] Requirements are testable and unambiguous +- [x] Success criteria are measurable +- [x] Success criteria are technology-agnostic (no implementation details) +- [x] All acceptance scenarios are defined +- [x] Edge cases are identified +- [x] Scope is clearly bounded +- [x] Dependencies and assumptions identified + +## Feature Readiness + +- [x] All functional requirements have clear acceptance criteria +- [x] User scenarios cover primary flows +- [x] Feature meets measurable outcomes defined in Success Criteria +- [x] No implementation details leak into specification + +## Notes + +- All validation items pass +- Specification is ready for `/speckit.clarify` or `/speckit.plan` +- The spec focuses on WHAT (ECS executor with CPU/memory/GPU/disk support) and WHY (offload compute to managed infrastructure) without specifying HOW (no API details, code structure, etc.) diff --git a/specs/001-ecs-executor/contracts/ecs-api-contracts.md b/specs/001-ecs-executor/contracts/ecs-api-contracts.md new file mode 100644 index 0000000000..f6d6e77e5f --- /dev/null +++ b/specs/001-ecs-executor/contracts/ecs-api-contracts.md @@ -0,0 +1,450 @@ +# ECS API Contracts + +**Branch**: `001-ecs-executor` | **Date**: 2025-12-30 + +This document defines the AWS ECS API contracts used by the `awsecs` executor. + +## RegisterTaskDefinition + +Creates a new task definition for running Nextflow tasks. + +### Request Contract + +```java +RegisterTaskDefinitionRequest.builder() + // Required fields + .family(String) // "nf-{normalized-image-name}" + .requiresCompatibilities(Compatibility.MANAGED_INSTANCES) + .networkMode(NetworkMode.AWSVPC) + .executionRoleArn(String) // From aws.ecs.executionRole + + // Resource configuration (task level) + .cpu(String) // CPU units: "1024" = 1 vCPU + .memory(String) // Memory MiB: "4096" = 4 GB + + // Optional task role + .taskRoleArn(String) // From aws.ecs.taskRole + + // Container definition + .containerDefinitions( + ContainerDefinition.builder() + .name("main") // Always "main" + .image(String) // Container image from process + .command("true") // Placeholder, overridden at RunTask + .essential(true) + .logConfiguration(LogConfiguration.builder() + .logDriver(LogDriver.AWSLOGS) + .options(Map.of( + "awslogs-group", String, // From aws.ecs.logsGroup + "awslogs-region", String, // From aws.region + "awslogs-stream-prefix", "nf" + )) + .build()) + // GPU resources (optional) + .resourceRequirements(List.of( + ResourceRequirement.builder() + .type(ResourceType.GPU) + .value(String) // Number of GPUs + .build() + )) + .build() + ) + + // Storage configuration for Managed Instances + .managedInstancesProvider( + ManagedInstancesProvider.builder() + .instanceLaunchTemplate( + InstanceLaunchTemplate.builder() + .storageConfiguration( + StorageConfiguration.builder() + .storageSizeinGiB(Integer) // 30-16384 GiB + .build() + ) + .build() + ) + .build() + ) + .build() +``` + +### Response Contract + +```java +RegisterTaskDefinitionResponse { + TaskDefinition taskDefinition() { + String taskDefinitionArn() // "arn:aws:ecs:region:account:task-definition/family:revision" + String family() // Task definition family + Integer revision() // Revision number + String status() // "ACTIVE" or "INACTIVE" + } +} +``` + +### Error Cases + +| Error Code | Condition | Handling | +|------------|-----------|----------| +| `ClientException` | Invalid parameters | Fail task with error message | +| `ServerException` | AWS service error | Retry with backoff | +| `InvalidParameterException` | Resource limits exceeded | Fail task with clear message | + +--- + +## RunTask + +Submits a task for execution on the ECS cluster. + +### Request Contract + +```java +RunTaskRequest.builder() + // Required fields + .cluster(String) // From aws.ecs.cluster + .taskDefinition(String) // ARN from cache/registration + + // Capacity provider strategy (for Managed Instances) + .capacityProviderStrategy( + CapacityProviderStrategyItem.builder() + .capacityProvider(String) // User-configured capacity provider + .weight(1) + .build() + ) + + // Network configuration (required for awsvpc) + .networkConfiguration( + NetworkConfiguration.builder() + .awsvpcConfiguration( + AwsVpcConfiguration.builder() + .subnets(List) // From aws.ecs.subnets + .securityGroups(List) // From aws.ecs.securityGroups + .assignPublicIp(AssignPublicIp.ENABLED | DISABLED) + .build() + ) + .build() + ) + + // Task overrides (command, environment) + .overrides( + TaskOverride.builder() + .containerOverrides( + ContainerOverride.builder() + .name("main") + .command(List) // ["bash", "-c", "..."] + .environment(List.of( + KeyValuePair.builder() + .name(String) + .value(String) + .build() + )) + .build() + ) + .build() + ) + + // Optional: Tags for tracking + .tags(List.of( + Tag.builder() + .key("nextflow:taskId") + .value(String) + .build() + )) + .build() +``` + +### Response Contract + +```java +RunTaskResponse { + List tasks() { + Task { + String taskArn() // ARN of started task + String clusterArn() + String lastStatus() // Initially "PROVISIONING" or "PENDING" + String desiredStatus() // "RUNNING" + Instant createdAt() + } + } + List failures() { + Failure { + String arn() // Resource that failed + String reason() // Failure reason + String detail() // Additional details + } + } +} +``` + +### Error Cases + +| Error Code | Condition | Handling | +|------------|-----------|----------| +| `ClusterNotFoundException` | Invalid cluster | Fail with config error | +| `InvalidParameterException` | Bad request parameters | Fail task with message | +| `AccessDeniedException` | Missing IAM permissions | Fail with permissions error | +| `PlatformTaskDefinitionIncompatibilityException` | Wrong launch type | Fail with config error | +| `BlockedException` | Account blocked | Fail with clear message | + +--- + +## DescribeTasks + +Polls task status for monitoring execution. + +### Request Contract + +```java +DescribeTasksRequest.builder() + .cluster(String) // From aws.ecs.cluster + .tasks(List) // Task ARNs (up to 100) + .include(TaskField.TAGS) // Include tags in response + .build() +``` + +### Response Contract + +```java +DescribeTasksResponse { + List tasks() { + Task { + String taskArn() + String taskDefinitionArn() + String clusterArn() + String lastStatus() // PROVISIONING, PENDING, ACTIVATING, + // RUNNING, DEACTIVATING, STOPPING, STOPPED + String desiredStatus() // RUNNING or STOPPED + TaskStopCode stopCode() // TASK_FAILED_TO_START, SPOT_INTERRUPTION, + // ESSENTIAL_CONTAINER_EXITED, USER_INITIATED, etc. + String stoppedReason() // Human-readable reason + Instant startedAt() + Instant stoppedAt() + List containers() { + Container { + String name() // "main" + String containerArn() + Integer exitCode() // 0-255, null if still running + String lastStatus() + String reason() // Failure reason if any + } + } + } + } + List failures() { + Failure { + String arn() + String reason() // Usually "MISSING" for deleted tasks + } + } +} +``` + +### Status Mapping + +| ECS lastStatus | Nextflow TaskStatus | Action | +|----------------|---------------------|--------| +| PROVISIONING | SUBMITTED | Continue polling | +| PENDING | SUBMITTED | Continue polling | +| ACTIVATING | SUBMITTED | Continue polling | +| RUNNING | RUNNING | Continue polling | +| DEACTIVATING | RUNNING | Continue polling | +| STOPPING | RUNNING | Continue polling | +| STOPPED | COMPLETED | Extract exit code, complete | + +### Exit Code Extraction Logic + +```groovy +Integer extractExitCode(Task task) { + def container = task.containers().find { it.name() == "main" } + def exitCode = container?.exitCode() + + // Handle special cases + if (exitCode == null) { + def stopCode = task.stopCode() + if (stopCode == TaskStopCode.TASK_FAILED_TO_START) { + return 1 // Failed to start + } + if (stopCode == TaskStopCode.ESSENTIAL_CONTAINER_EXITED) { + return 1 // Container crashed + } + // Unknown failure + return 1 + } + + return exitCode +} +``` + +--- + +## StopTask + +Cancels a running task. + +### Request Contract + +```java +StopTaskRequest.builder() + .cluster(String) // From aws.ecs.cluster + .task(String) // Task ARN + .reason("Cancelled by Nextflow") // Stop reason + .build() +``` + +### Response Contract + +```java +StopTaskResponse { + Task task() { + String taskArn() + String lastStatus() // May still be RUNNING briefly + String desiredStatus() // STOPPED + } +} +``` + +### Error Cases + +| Error Code | Condition | Handling | +|------------|-----------|----------| +| `InvalidParameterException` | Task not found | Log warning, ignore (already stopped) | +| `ClusterNotFoundException` | Invalid cluster | Log error | + +--- + +## DescribeClusters + +Validates cluster configuration at startup. + +### Request Contract + +```java +DescribeClustersRequest.builder() + .clusters(String) // Cluster name or ARN + .include(ClusterField.ATTACHMENTS, + ClusterField.SETTINGS, + ClusterField.STATISTICS, + ClusterField.TAGS) + .build() +``` + +### Response Contract + +```java +DescribeClustersResponse { + List clusters() { + Cluster { + String clusterArn() + String clusterName() + String status() // ACTIVE, PROVISIONING, DEPROVISIONING, + // FAILED, INACTIVE + List capacityProviders() // Attached capacity providers + Integer registeredContainerInstancesCount() + Integer runningTasksCount() + Integer pendingTasksCount() + } + } + List failures() { + Failure { + String arn() + String reason() // "MISSING" if not found + } + } +} +``` + +### Validation Logic + +```groovy +void validateCluster(String clusterName) { + def response = ecsClient.describeClusters( + DescribeClustersRequest.builder() + .clusters(clusterName) + .build() + ) + + if (response.failures()) { + throw new ProcessUnrecoverableException( + "ECS cluster not found: ${clusterName}") + } + + def cluster = response.clusters()[0] + + if (cluster.status() != "ACTIVE") { + throw new ProcessUnrecoverableException( + "ECS cluster is not active: ${cluster.status()}") + } + + // Check for Managed Instances capacity provider + def hasManaged = cluster.capacityProviders().any { + // User must configure capacity provider name + it == options.capacityProvider + } + + if (!hasManaged) { + throw new ProcessUnrecoverableException( + "ECS cluster lacks configured Managed Instances capacity provider") + } +} +``` + +--- + +## CloudWatch Logs Integration + +### GetLogEvents Request + +```java +GetLogEventsRequest.builder() + .logGroupName(String) // From aws.ecs.logsGroup + .logStreamName(String) // "nf/main/{task-id}" + .startFromHead(true) + .build() +``` + +### Log Stream Name Format + +``` +{prefix}/{container-name}/{task-id} +nf/main/1a2b3c4d-5e6f-7g8h-9i0j-1k2l3m4n5o6p +``` + +Where `task-id` is extracted from the task ARN: +``` +arn:aws:ecs:us-east-1:123456789:task/cluster-name/1a2b3c4d-5e6f-7g8h-9i0j-1k2l3m4n5o6p + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + task-id +``` + +--- + +## Rate Limiting + +ECS API has the following rate limits (requests per second): + +| API Operation | Burst | Sustained | +|---------------|-------|-----------| +| RunTask | 100 | 20 | +| DescribeTasks | 100 | 40 | +| StopTask | 100 | 20 | +| RegisterTaskDefinition | 100 | 1 | +| DescribeClusters | 100 | 20 | + +### Throttling Implementation + +The `AwsEcsProxy` class wraps the ECS client with a `ThrottlingExecutor`: + +```groovy +class AwsEcsProxy { + private final EcsClient client + private final ThrottlingExecutor throttle + + RunTaskResponse runTask(RunTaskRequest request) { + throttle.execute { client.runTask(request) } + } + + DescribeTasksResponse describeTasks(DescribeTasksRequest request) { + throttle.execute { client.describeTasks(request) } + } + + // ... other methods +} +``` diff --git a/specs/001-ecs-executor/data-model.md b/specs/001-ecs-executor/data-model.md new file mode 100644 index 0000000000..08ada2cf21 --- /dev/null +++ b/specs/001-ecs-executor/data-model.md @@ -0,0 +1,328 @@ +# Data Model: AWS ECS Managed Instances Executor + +**Branch**: `001-ecs-executor` | **Date**: 2025-12-30 + +## Entity Relationship Diagram + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ AWS Infrastructure │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────────┐ ┌──────────────────────────┐ │ +│ │ ECS Cluster │◄──────│ Capacity Provider │ │ +│ │ │ │ (Managed Instances) │ │ +│ │ - clusterArn │ │ │ │ +│ │ - clusterName │ │ - capacityProviderName │ │ +│ │ - status │ │ - autoScalingGroupArn │ │ +│ └────────┬─────────┘ └──────────────────────────┘ │ +│ │ │ +│ │ runs │ +│ ▼ │ +│ ┌──────────────────┐ ┌──────────────────────────┐ │ +│ │ ECS Task │◄──────│ Task Definition │ │ +│ │ │ │ │ │ +│ │ - taskArn │ │ - taskDefinitionArn │ │ +│ │ - lastStatus │ │ - family │ │ +│ │ - desiredStatus │ │ - revision │ │ +│ │ - stoppedReason │ │ - cpu │ │ +│ │ - stopCode │ │ - memory │ │ +│ │ - containers[] │ │ - containerDefinitions │ │ +│ └────────┬─────────┘ │ - executionRoleArn │ │ +│ │ │ - taskRoleArn │ │ +│ │ writes to │ - storageConfiguration │ │ +│ ▼ └──────────────────────────┘ │ +│ ┌──────────────────┐ │ +│ │ CloudWatch Logs │ │ +│ │ │ │ +│ │ - logGroupName │ │ +│ │ - logStreamName │ │ +│ └──────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ + +┌─────────────────────────────────────────────────────────────────────────────┐ +│ Nextflow Runtime │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────────┐ ┌──────────────────────────┐ │ +│ │ TaskRun │──────►│ AwsEcsTaskHandler │ │ +│ │ │ │ │ │ +│ │ - id │ │ - taskArn │ │ +│ │ - name │ │ - taskDefArn │ │ +│ │ - container │ │ - spotAttempts │ │ +│ │ - config │ │ - status │ │ +│ │ - workDir │ │ - exitCode │ │ +│ └──────────────────┘ └──────────────────────────┘ │ +│ │ │ │ +│ │ │ uses │ +│ │ ▼ │ +│ │ ┌──────────────────────────┐ │ +│ │ │ AwsEcsExecutor │ │ +│ │ │ │ │ +│ │ │ - ecsClient │ │ +│ │ │ - taskDefCache │ │ +│ │ │ - options │ │ +│ │ └──────────────────────────┘ │ +│ │ │ │ +│ │ work directory │ configured by │ +│ ▼ ▼ │ +│ ┌──────────────────┐ ┌──────────────────────────┐ │ +│ │ S3 Work Dir │ │ AwsEcsConfig │ │ +│ │ (via Fusion) │ │ │ │ +│ │ │ │ - cluster │ │ +│ │ - bucket │ │ - executionRole │ │ +│ │ - prefix │ │ - taskRole │ │ +│ └──────────────────┘ │ - logsGroup │ │ +│ │ - maxSpotAttempts │ │ +│ │ - subnets │ │ +│ │ - securityGroups │ │ +│ └──────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +## Entity Definitions + +### ECS Cluster + +The pre-configured AWS ECS cluster where tasks are executed. + +| Attribute | Type | Description | +|-----------|------|-------------| +| clusterArn | String | Full ARN of the ECS cluster | +| clusterName | String | Short name of the cluster | +| status | String | ACTIVE, PROVISIONING, DEPROVISIONING, FAILED, INACTIVE | +| capacityProviders | List | Attached capacity provider names | +| defaultCapacityProviderStrategy | List | Default strategy for tasks | + +**Lifecycle**: Pre-created by user, validated at executor startup. + +### Capacity Provider + +The Managed Instances capacity provider that handles automatic instance provisioning. + +| Attribute | Type | Description | +|-----------|------|-------------| +| capacityProviderArn | String | Full ARN of the capacity provider | +| name | String | Capacity provider name (user-defined) | +| autoScalingGroupArn | String | Associated Auto Scaling group | +| status | String | ACTIVE, INACTIVE | +| managedScaling | Object | Scaling configuration | + +**Lifecycle**: Pre-created by user as part of cluster setup. + +### Task Definition + +ECS task definition created from Nextflow process requirements. + +| Attribute | Type | Description | +|-----------|------|-------------| +| taskDefinitionArn | String | Full ARN with revision | +| family | String | Task definition family name | +| revision | Integer | Revision number (auto-incremented) | +| cpu | String | CPU units (1024 = 1 vCPU) | +| memory | String | Memory in MiB | +| containerDefinitions | List | Container configurations | +| executionRoleArn | String | IAM role for ECS agent | +| taskRoleArn | String | IAM role for task (S3 access) | +| networkMode | String | Always AWSVPC for Managed Instances | +| requiresCompatibilities | List | Always MANAGED_INSTANCES | +| managedInstancesProvider | Object | Storage configuration | + +**Lifecycle**: Auto-created by executor, cached by container+resource hash. + +### ECS Task + +Running ECS task instance corresponding to a Nextflow task. + +| Attribute | Type | Description | +|-----------|------|-------------| +| taskArn | String | Full ARN of the running task | +| clusterArn | String | Cluster where task runs | +| taskDefinitionArn | String | Associated task definition | +| lastStatus | String | Current status (see Status Mapping) | +| desiredStatus | String | Target status | +| containers | List | Container instances with exit codes | +| stoppedReason | String | Reason for task stop | +| stopCode | String | ECS stop code | +| startedAt | Timestamp | Task start time | +| stoppedAt | Timestamp | Task stop time | + +**Lifecycle**: Created by RunTask, transitions through states, deleted after polling. + +### Container Instance (within Task) + +| Attribute | Type | Description | +|-----------|------|-------------| +| containerArn | String | Container instance ARN | +| name | String | Container name (always "main") | +| exitCode | Integer | Process exit code (0-255) | +| lastStatus | String | Container status | +| reason | String | Failure reason if any | + +### Work Directory + +S3 bucket path used as Nextflow work directory, accessed via Fusion. + +| Attribute | Type | Description | +|-----------|------|-------------| +| bucket | String | S3 bucket name | +| prefix | String | Path prefix within bucket | +| uri | String | Full S3 URI (s3://bucket/prefix) | + +**Lifecycle**: User-configured, used throughout workflow execution. + +## State Diagrams + +### ECS Task Status Transitions + +``` + ┌──────────────┐ + │ PROVISIONING │ + └──────┬───────┘ + │ + ▼ + ┌──────────────┐ + │ PENDING │ + └──────┬───────┘ + │ + ▼ + ┌──────────────┐ + │ ACTIVATING │ + └──────┬───────┘ + │ + ▼ + ┌──────────────┐ + ┌─────────│ RUNNING │─────────┐ + │ └──────────────┘ │ + │ │ + │ (normal completion) (cancel/error) + │ │ + ▼ ▼ +┌──────────────┐ ┌──────────────┐ +│ DEACTIVATING │ │ STOPPING │ +└──────┬───────┘ └──────┬───────┘ + │ │ + └───────────────┬───────────────────┘ + │ + ▼ + ┌──────────────┐ + │ STOPPED │ + └──────────────┘ +``` + +### Nextflow Task Handler State Transitions + +``` +┌───────────────┐ +│ NEW │ (TaskHandler created) +└───────┬───────┘ + │ submit() + ▼ +┌───────────────┐ +│ SUBMITTED │ (RunTask called, taskArn obtained) +└───────┬───────┘ + │ checkIfRunning() → ECS status RUNNING + ▼ +┌───────────────┐ +│ RUNNING │ (Task executing) +└───────┬───────┘ + │ checkIfCompleted() → ECS status STOPPED + │ + ├─────────────────────────────────────────┐ + │ (spot interruption && attempts < max) │ + │ │ + │ resubmit() │ + ▼ │ +┌───────────────┐ │ +│ SUBMITTED │◄────────────────────────────────┘ +└───────┬───────┘ + │ + │ (normal completion or max attempts) + ▼ +┌───────────────┐ +│ COMPLETED │ (exitCode extracted, status reported) +└───────────────┘ +``` + +## Caching Strategy + +### Task Definition Cache + +```groovy +// Key structure +String cacheKey = "${containerImage}:${cpuUnits}:${memoryMiB}:${gpuCount}:${diskGiB}" + +// Example keys +"ubuntu:latest:2048:4096:0:50" // 2 vCPU, 4GB RAM, no GPU, 50GB disk +"nvidia/cuda:12:4096:16384:1:100" // 4 vCPU, 16GB RAM, 1 GPU, 100GB disk +``` + +### Cache Implementation + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Task Definition Cache │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ ConcurrentHashMap │ +│ │ +│ Key (hash) Value (taskDefArn) │ +│ ────────────────────────── ────────────────────────── │ +│ "ubuntu:2048:4096:0:50" → "arn:aws:ecs:...:nf- │ +│ ubuntu:1" │ +│ "nginx:1024:2048:0:30" → "arn:aws:ecs:...:nf- │ +│ nginx:1" │ +│ │ +└─────────────────────────────────────────────────────────────┘ +``` + +## Configuration Data Model + +### AwsEcsConfig Fields + +| Field | Type | Required | Default | Validation | +|-------|------|----------|---------|------------| +| cluster | String | **Yes** | - | Non-empty, valid cluster name/ARN | +| executionRole | String | **Yes** | - | Valid IAM role ARN | +| taskRole | String | No | - | Valid IAM role ARN if provided | +| subnets | List | No | Auto-discovered | Valid subnet IDs (discovers from default VPC) | +| securityGroups | List | No | Auto-discovered | Valid security group IDs (discovers from default VPC) | +| logsGroup | String | No | `/aws/ecs/nextflow` | Valid log group name | +| maxSpotAttempts | Integer | No | 5 | 1-100 | +| assignPublicIp | Boolean | No | true | - | + +### Configuration Hierarchy + +**Minimal** (only required settings): +``` +aws { + region = 'us-east-1' // AwsConfig.region + ecs { + cluster = '...' // AwsEcsConfig.cluster (REQUIRED) + executionRole = '...' // AwsEcsConfig.executionRole (REQUIRED) + } +} +wave { enabled = true } +fusion { enabled = true } +``` + +**Full** (with all optional settings): +``` +aws { + region = 'us-east-1' // AwsConfig.region + ecs { + cluster = '...' // AwsEcsConfig.cluster (REQUIRED) + executionRole = '...' // AwsEcsConfig.executionRole (REQUIRED) + taskRole = '...' // AwsEcsConfig.taskRole (optional) + subnets = ['...'] // AwsEcsConfig.subnets (auto-discovered) + securityGroups = ['...'] // AwsEcsConfig.securityGroups (auto-discovered) + logsGroup = '...' // AwsEcsConfig.logsGroup (default: /aws/ecs/nextflow) + maxSpotAttempts = 5 // AwsEcsConfig.maxSpotAttempts (default: 5) + assignPublicIp = true // AwsEcsConfig.assignPublicIp (default: true) + } +} +wave { enabled = true } +fusion { enabled = true } +``` diff --git a/specs/001-ecs-executor/plan.md b/specs/001-ecs-executor/plan.md new file mode 100644 index 0000000000..66ff73a1c5 --- /dev/null +++ b/specs/001-ecs-executor/plan.md @@ -0,0 +1,300 @@ +# Implementation Plan: AWS ECS Managed Instances Executor + +**Branch**: `001-ecs-executor` | **Date**: 2025-12-30 | **Spec**: [spec.md](./spec.md) +**Input**: Feature specification from `/specs/001-ecs-executor/spec.md` + +## Summary + +Implement a new `awsecs` executor in the nf-amazon plugin that runs Nextflow tasks on AWS ECS using the Managed Instances capacity provider. The executor will support configurable CPU, memory, GPU, and disk resources, use S3/Fusion for storage, and follow existing patterns from the AWS Batch executor while remaining completely independent. + +## Technical Context + +**Language/Version**: Groovy 4.0.29, Java 17 target (Java 21 toolchain) +**Primary Dependencies**: AWS SDK v2 (ECS, EC2, CloudWatch Logs, S3), GPars 1.2.1 +**Storage**: AWS S3 via Seqera Fusion filesystem +**Testing**: Spock Framework, JaCoCo for coverage +**Target Platform**: Nextflow runtime on any platform with AWS credentials +**Project Type**: Plugin module within nf-amazon +**Performance Goals**: Task submission <60s, status polling efficient via batch aggregation +**Constraints**: ECS Managed Instances 14-day lifecycle limit, Wave + Fusion required for S3 access +**Scale/Scope**: Support equivalent scale to AWS Batch executor + +## Constitution Check + +*GATE: Must pass before Phase 0 research. Re-check after Phase 1 design.* + +| Principle | Status | Notes | +|-----------|--------|-------| +| I. Modular Architecture | ✅ PASS | Plugin feature in `plugins/nf-amazon`, independent from Batch executor | +| II. Test-Driven Quality | ✅ PASS | Unit tests (Spock), integration tests, smoke tests planned | +| III. Dataflow Programming Model | ✅ PASS | Executor is infrastructure layer, doesn't affect dataflow semantics | +| IV. Apache 2.0 License | ✅ PASS | All new files will include Apache 2.0 headers | +| V. DCO Sign-off | ✅ PASS | All commits will use `-s` flag | +| VI. Semantic Versioning | ✅ PASS | Plugin version bump required in `plugins/nf-amazon/VERSION` | +| VII. Groovy Idioms | ✅ PASS | Follow existing Batch executor patterns and conventions | + +**Gate Status**: PASSED - No violations requiring justification. + +## Project Structure + +### Documentation (this feature) + +```text +specs/001-ecs-executor/ +├── spec.md # Feature specification (complete) +├── plan.md # This file +├── research.md # Phase 0 output +├── data-model.md # Phase 1 output +├── quickstart.md # Phase 1 output +├── contracts/ # Phase 1 output (ECS API patterns) +└── tasks.md # Phase 2 output (/speckit.tasks command) +``` + +### Source Code (repository root) + +```text +plugins/nf-amazon/src/main/nextflow/cloud/aws/ +├── ecs/ # NEW - ECS executor package +│ ├── AwsEcsExecutor.groovy # Main executor class +│ ├── AwsEcsTaskHandler.groovy # Task lifecycle management +│ ├── AwsEcsHelper.groovy # CloudWatch logs, metadata +│ ├── AwsEcsOptions.groovy # Configuration wrapper +│ └── model/ # ECS-specific model classes +│ ├── RegisterTaskDefinitionModel.groovy +│ └── ContainerDefinitionModel.groovy +├── config/ +│ ├── AwsConfig.groovy # MODIFY - Add ecs config field +│ └── AwsEcsConfig.groovy # NEW - ECS configuration +└── AmazonPlugin.groovy # VERIFY - Extension point may auto-register + +# DEFERRED (optimization phase): +# ├── ecs/AwsEcsProxy.groovy # Throttled ECS client wrapper + +plugins/nf-amazon/src/test/nextflow/cloud/aws/ +├── ecs/ # NEW - ECS executor tests +│ ├── AwsEcsExecutorTest.groovy +│ ├── AwsEcsTaskHandlerTest.groovy +│ └── AwsEcsConfigTest.groovy +``` + +**Structure Decision**: Follow the existing `batch/` package structure exactly, creating a parallel `ecs/` package. This maintains consistency with the modular architecture and keeps Batch and ECS code independent. + +## Architecture Design + +### Component Overview + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Nextflow Runtime │ +├─────────────────────────────────────────────────────────────────┤ +│ ┌──────────────────┐ ┌──────────────────┐ │ +│ │ TaskProcessor │────│ TaskMonitor │ │ +│ └────────┬─────────┘ │ (Polling Loop) │ │ +│ │ └────────┬─────────┘ │ +├───────────┼───────────────────────┼─-───────────────────────────┤ +│ ▼ ▼ │ +│ ┌──────────────────┐ ┌──────────────────┐ │ +│ │ AwsEcsExecutor │────│ AwsEcsTaskHandler│ │ +│ │ (@ServiceName │ │ (per task) │ │ +│ │ 'awsecs') │ └────────┬─────────┘ │ +│ └────────┬─────────┘ │ │ +│ │ │ │ +│ ┌────────┴─────────┐ ┌────────┴─────────┐ │ +│ │ AwsEcsProxy │ │ BatchContext │ │ +│ │ (throttling) │ │ (aggregation) │ │ +│ └────────┬─────────┘ └────────┬─────────┘ │ +├───────────┼───────────────────────┼─-───────────────────────────┤ +│ ▼ ▼ │ +│ ┌─────────────────────────────────────────────────────────────┐│ +│ │ AWS ECS API ││ +│ │ RunTask │ DescribeTasks │ StopTask │ RegisterTaskDefinition││ +│ └─────────────────────────────────────────────────────────────┘│ +└─────────────────────────────────────────────────────────────────┘ +``` + +### Key Classes + +| Class | Responsibility | Key Methods | +|-------|---------------|-------------| +| `AwsEcsExecutor` | Executor lifecycle, client init | `register()`, `createTaskHandler()`, `createTaskMonitor()` | +| `AwsEcsTaskHandler` | Task submission, polling, cancel | `submit()`, `checkIfRunning()`, `checkIfCompleted()`, `killTask()` | +| `AwsEcsConfig` | Configuration validation | `@ConfigOption` annotated fields | +| `AwsEcsHelper` | CloudWatch logs, metadata | `getTaskLogStream()`, `describeCluster()` | +| `AwsEcsProxy` | Request throttling (DEFERRED) | Wraps `EcsClient` with `ThrottlingExecutor` | + +### ECS API Mapping + +| Nextflow Operation | AWS Batch | AWS ECS | +|--------------------|-----------|---------| +| Create definition | `RegisterJobDefinition` | `RegisterTaskDefinition` | +| Submit task | `SubmitJob` | `RunTask` | +| Query status | `DescribeJobs` | `DescribeTasks` | +| Cancel task | `TerminateJob` | `StopTask` | +| Get logs | CloudWatch Logs | CloudWatch Logs | + +### Configuration Schema + +**Minimal configuration** (only 2 required settings): + +```groovy +aws { + region = 'us-east-1' + ecs { + cluster = 'my-nextflow-cluster' // REQUIRED + executionRole = 'arn:aws:iam::...' // REQUIRED + } +} + +wave { + enabled = true // Required for Fusion +} + +fusion { + enabled = true +} +``` + +**Full configuration** (with optional settings): + +```groovy +aws { + region = 'us-east-1' + ecs { + cluster = 'my-nextflow-cluster' // REQUIRED + executionRole = 'arn:aws:iam::...' // REQUIRED + taskRole = 'arn:aws:iam::...' // Optional: for S3 access + subnets = ['subnet-abc', 'subnet-def'] // Optional: auto-discovered from default VPC + securityGroups = ['sg-xyz'] // Optional: auto-discovered from default VPC + logsGroup = '/aws/ecs/nextflow' // Default: /aws/ecs/nextflow + maxSpotAttempts = 5 // Default: 5 + assignPublicIp = true // Default: true + } +} + +wave { + enabled = true +} + +fusion { + enabled = true +} +``` + +### Resource Mapping + +| Nextflow Directive | ECS Task Definition Field | Notes | +|--------------------|--------------------------|-------| +| `cpus` | `cpu` (CPU units) | 1 vCPU = 1024 units | +| `memory` | `memory` (MiB) | Direct mapping | +| `accelerator` | `resourceRequirements[GPU]` | Triggers GPU instance selection | +| `disk` | `storageConfiguration.storageSizeinGiB` | 30-16384 GiB | +| `container` | `containerDefinitions[0].image` | Docker image | +| `machineType` | Capacity provider attributes | Instance type constraint | + +### Task Definition Caching Strategy + +```groovy +// Global cache keyed by container:hash +static final Map taskDefinitions = [:] + +String resolveTaskDefinition(TaskRun task) { + def key = "${task.container}:${computeHash(task)}" + + // Check cache + if (taskDefinitions.containsKey(key)) + return taskDefinitions[key] + + synchronized(taskDefinitions) { + // Double-check after lock + if (taskDefinitions.containsKey(key)) + return taskDefinitions[key] + + // Find existing or create new + def taskDefArn = findOrCreateTaskDefinition(task) + taskDefinitions[key] = taskDefArn + return taskDefArn + } +} +``` + +### Spot Retry Strategy + +```groovy +// Detect spot interruption from task stop reason +boolean isSpotInterruption(Task task) { + def reason = task.stoppedReason() + return reason?.contains('Host EC2') || + reason?.contains('spot interruption') || + task.stopCode() == TaskStopCode.SPOT_INTERRUPTION +} + +// Retry logic in checkIfCompleted() +if (isSpotInterruption(task) && spotAttempts < maxSpotAttempts) { + spotAttempts++ + log.debug "Spot interruption detected, retry $spotAttempts/$maxSpotAttempts" + resubmit() + return false // Not completed yet +} +``` + +## Complexity Tracking + +> No constitution violations requiring justification. + +## Implementation Phases + +### Phase 1: Core Infrastructure (P1 - MVP) +- AwsEcsExecutor skeleton with lifecycle methods +- AwsEcsConfig with required options (cluster, executionRole) +- AwsEcsTaskHandler with basic submit/poll/cancel +- Task definition registration (no caching initially) +- Basic Fusion integration + +### Phase 2: Resource Management (P2) +- CPU/memory mapping with validation +- GPU support via attribute-based instance selection +- Disk size configuration +- Instance type specification (optional) +- Task definition caching + +### Phase 3: Reliability & Observability (P2/P3) +- Spot retry logic (maxSpotAttempts) +- CloudWatch Logs integration +- Cluster validation at startup +- BatchContext for efficient status polling + +### Phase 4: Optimization (Deferred) +- Rate limiting via AwsEcsProxy (throttled ECS client wrapper) + +### Phase 5: Documentation & Testing +- Unit tests for all components +- Integration tests with mock ECS +- Smoke tests +- User documentation for cluster setup +- Configuration reference documentation + +## Dependencies + +### Internal (nf-amazon plugin) +- `AwsClientFactory` - Reuse for ECS client creation +- `AwsConfig` - Extend with `ecs` field +- `AwsFusionEnv` - Reuse for S3 credentials +- `ThrottlingExecutor` - Reuse for rate limiting + +### External (AWS SDK) +- `software.amazon.awssdk:ecs:2.33.2` - Already included in plugin +- `software.amazon.awssdk:cloudwatchlogs:2.33.2` - Already included + +### Core Nextflow +- `BatchHandler` trait - Implement for batch aggregation +- `FusionAwareTask` trait - Implement for Fusion support +- `TaskHandler` - Extend for task lifecycle + +## Risks & Mitigations + +| Risk | Impact | Mitigation | +|------|--------|------------| +| ECS API rate limits | Task submission delays | ThrottlingExecutor with configurable rate | +| 14-day lifecycle limit | Long tasks fail | Clear error message, documentation | +| Managed Instances capacity | Task scheduling delays | Document cluster setup, capacity provider config | +| Spot interruptions | Task failures | Automatic retry with configurable max attempts | diff --git a/specs/001-ecs-executor/quickstart.md b/specs/001-ecs-executor/quickstart.md new file mode 100644 index 0000000000..4ef3398fee --- /dev/null +++ b/specs/001-ecs-executor/quickstart.md @@ -0,0 +1,362 @@ +# Quickstart: AWS ECS Managed Instances Executor + +**Branch**: `001-ecs-executor` | **Date**: 2025-12-30 + +This guide helps you get started with the `awsecs` executor to run Nextflow workflows on AWS ECS Managed Instances. + +## Prerequisites + +Before using the `awsecs` executor, ensure you have: + +1. **AWS Account** with appropriate permissions for ECS, EC2, S3, CloudWatch, and IAM +2. **Nextflow** installed with the nf-amazon plugin +3. **Wave containers** enabled (required for Fusion) +4. **Fusion** filesystem configured (required for S3 work directory) +5. **ECS Cluster** with Managed Instances capacity provider (see Setup section) + +## Quick Example + +### Minimal Configuration + +The executor requires only two AWS settings. Network configuration (subnets, security groups) is auto-discovered from the default VPC. + +```groovy +// nextflow.config +process { + executor = 'awsecs' +} + +aws { + region = 'us-east-1' + ecs { + cluster = 'nextflow-cluster' // REQUIRED + executionRole = 'arn:aws:iam::123456789:role/ecsTaskExecutionRole' // REQUIRED + } +} + +wave { + enabled = true +} + +fusion { + enabled = true +} +``` + +### Full Configuration (with explicit networking) + +For production deployments or custom VPC setups, you can explicitly specify network configuration: + +```groovy +// nextflow.config +process { + executor = 'awsecs' +} + +aws { + region = 'us-east-1' + ecs { + cluster = 'nextflow-cluster' + executionRole = 'arn:aws:iam::123456789:role/ecsTaskExecutionRole' + taskRole = 'arn:aws:iam::123456789:role/ecsTaskRole' // Optional: for S3 access + subnets = ['subnet-abc123', 'subnet-def456'] // Optional: auto-discovered + securityGroups = ['sg-xyz789'] // Optional: auto-discovered + logsGroup = '/aws/ecs/nextflow' // Optional: default shown + maxSpotAttempts = 5 // Optional: default shown + assignPublicIp = true // Optional: default shown + } +} + +wave { + enabled = true +} + +fusion { + enabled = true +} +``` + +### Sample Workflow + +```groovy +// main.nf +process HELLO { + container 'ubuntu:latest' + cpus 2 + memory '4 GB' + + output: + stdout + + script: + ''' + echo "Hello from ECS Managed Instances!" + hostname + ''' +} + +workflow { + HELLO() +} +``` + +### Run + +```bash +nextflow run main.nf -work-dir s3://my-bucket/work +``` + +## AWS Infrastructure Setup + +### 1. Create ECS Cluster with Managed Instances + +```bash +# Create cluster +aws ecs create-cluster \ + --cluster-name nextflow-cluster \ + --capacity-providers MANAGED_INSTANCES \ + --default-capacity-provider-strategy \ + capacityProvider=MANAGED_INSTANCES,weight=1 + +# Or via CloudFormation/Terraform (recommended for production) +``` + +### 2. Create IAM Roles + +**Task Execution Role** (required - allows ECS to pull images and write logs): + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "ecr:GetAuthorizationToken", + "ecr:BatchCheckLayerAvailability", + "ecr:GetDownloadUrlForLayer", + "ecr:BatchGetImage", + "logs:CreateLogStream", + "logs:PutLogEvents" + ], + "Resource": "*" + } + ] +} +``` + +Trust policy: +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "ecs-tasks.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] +} +``` + +**Task Role** (optional - for S3 access from within tasks): + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:GetObject", + "s3:PutObject", + "s3:DeleteObject", + "s3:ListBucket" + ], + "Resource": [ + "arn:aws:s3:::my-bucket", + "arn:aws:s3:::my-bucket/*" + ] + } + ] +} +``` + +### 3. Configure VPC Networking + +Ensure your subnets: +- Have internet access (NAT gateway for private subnets, or use public subnets) +- Allow outbound traffic to AWS services (ECR, S3, CloudWatch) +- Allow any inter-task communication if needed + +Security group should allow: +- Outbound HTTPS (443) for AWS API calls +- Outbound for any external services your workflows need + +### 4. Create CloudWatch Log Group + +```bash +aws logs create-log-group --log-group-name /aws/ecs/nextflow +``` + +## Configuration Reference + +### Required Options + +| Option | Description | +|--------|-------------| +| `aws.ecs.cluster` | ECS cluster name or ARN with Managed Instances capacity provider | +| `aws.ecs.executionRole` | Task execution IAM role ARN (for image pull, CloudWatch logs) | + +### Optional Options + +| Option | Default | Description | +|--------|---------|-------------| +| `aws.ecs.taskRole` | - | IAM role for task (S3 access from within containers) | +| `aws.ecs.subnets` | Auto-discovered | VPC subnet IDs (uses default VPC subnets if not specified) | +| `aws.ecs.securityGroups` | Auto-discovered | Security group IDs (uses default VPC security group if not specified) | +| `aws.ecs.logsGroup` | `/aws/ecs/nextflow` | CloudWatch log group | +| `aws.ecs.maxSpotAttempts` | `5` | Max retries for spot interruptions | +| `aws.ecs.assignPublicIp` | `true` | Assign public IP to tasks (enables internet without NAT gateway) | + +### Process Directives + +| Directive | ECS Mapping | Notes | +|-----------|-------------|-------| +| `cpus` | Task CPU units | 1 vCPU = 1024 units | +| `memory` | Task memory (MiB) | Direct mapping | +| `disk` | Storage size (GiB) | 30-16384 GiB | +| `accelerator` | GPU resources | Triggers GPU instance selection | +| `container` | Container image | Required | +| `machineType` | Instance type | Optional constraint | + +## Usage Examples + +### Basic CPU/Memory Task + +```groovy +process CPU_TASK { + container 'ubuntu:latest' + cpus 4 + memory '8 GB' + + script: + ''' + # Your computation here + ''' +} +``` + +### GPU-Accelerated Task + +```groovy +process GPU_TRAINING { + container 'nvidia/cuda:12.0-runtime-ubuntu22.04' + cpus 4 + memory '16 GB' + accelerator 1, type: 'nvidia-tesla-t4' + + script: + ''' + nvidia-smi + python train_model.py + ''' +} +``` + +### Large Storage Task + +```groovy +process BIG_DATA { + container 'ubuntu:latest' + cpus 2 + memory '4 GB' + disk '500 GB' + + script: + ''' + # Process large files + ''' +} +``` + +### Specific Instance Type + +```groovy +process MEMORY_INTENSIVE { + container 'ubuntu:latest' + cpus 4 + memory '64 GB' + machineType 'r6i.2xlarge' + + script: + ''' + # Memory-intensive computation + ''' +} +``` + +## Monitoring and Debugging + +### View Task Logs + +Logs are written to CloudWatch: + +```bash +# Via AWS CLI +aws logs tail /aws/ecs/nextflow --follow + +# Or in AWS Console: CloudWatch > Log groups > /aws/ecs/nextflow +``` + +### Check Task Status + +```bash +# List running tasks +aws ecs list-tasks --cluster nextflow-cluster + +# Describe specific task +aws ecs describe-tasks \ + --cluster nextflow-cluster \ + --tasks +``` + +### Common Issues + +| Issue | Cause | Solution | +|-------|-------|----------| +| Task stuck in PROVISIONING | No capacity | Check capacity provider configuration | +| Task fails to start | Image pull error | Verify ECR permissions, image exists | +| Network timeout | No internet access | Check NAT gateway, security groups | +| Permission denied (S3) | Missing task role | Add S3 permissions to task role | + +## Limitations + +1. **14-Day Lifecycle**: ECS Managed Instances tasks have a 14-day maximum runtime. Tasks exceeding this limit will fail. + +2. **No Task Arrays**: Unlike AWS Batch, ECS doesn't support native task arrays. Each task is submitted individually. + +3. **Wave + Fusion Required**: The executor requires Wave containers and Fusion filesystem for S3 access. Local work directories are not supported. + +## Comparison with AWS Batch + +| Feature | awsecs | awsbatch | +|---------|--------|----------| +| Infrastructure | ECS Managed Instances | AWS Batch compute environments | +| Task arrays | Not supported | Supported | +| Max runtime | 14 days | Unlimited | +| GPU support | Yes (via EC2) | Yes | +| Spot support | Yes (via capacity provider) | Yes | +| Pricing | ECS + EC2 | Batch + EC2 | + +Choose `awsecs` when: +- You prefer direct ECS control +- You have existing ECS infrastructure +- You need faster task startup (no Batch queue overhead) + +Choose `awsbatch` when: +- You need task arrays for large parallel jobs +- You need tasks running longer than 14 days +- You want AWS Batch's managed compute environments diff --git a/specs/001-ecs-executor/research.md b/specs/001-ecs-executor/research.md new file mode 100644 index 0000000000..a020aa1bfb --- /dev/null +++ b/specs/001-ecs-executor/research.md @@ -0,0 +1,325 @@ +# Research: AWS ECS Managed Instances Executor + +**Date**: 2025-12-30 | **Branch**: `001-ecs-executor` + +## 1. ECS Managed Instances API Patterns + +### Decision: Use ECS RunTask API with Managed Instances Capacity Provider + +**Rationale**: ECS Managed Instances uses `RunTask` with capacity provider strategy pointing to a Managed Instances capacity provider. This is different from AWS Batch which abstracts away instance management. + +**Alternatives Considered**: +- ECS Services: Rejected - Services are for long-running tasks, not batch jobs +- Direct EC2 with ECS agent: Rejected - Managed Instances handles this automatically + +### Key API Operations + +| Operation | API | Notes | +|-----------|-----|-------| +| Register task definition | `RegisterTaskDefinition` | One-time per container/resource combo | +| Run task | `RunTask` | With `capacityProviderStrategy` | +| Check status | `DescribeTasks` | Poll task ARN for status | +| Stop task | `StopTask` | For cancellation | +| Describe cluster | `DescribeClusters` | For validation at startup | + +### RunTask Request Structure + +```java +RunTaskRequest.builder() + .cluster(clusterArn) + .taskDefinition(taskDefArn) + .capacityProviderStrategy( + CapacityProviderStrategyItem.builder() + .capacityProvider("managed-instances-cp") // User-configured + .weight(1) + .build() + ) + .networkConfiguration( + NetworkConfiguration.builder() + .awsvpcConfiguration( + AwsVpcConfiguration.builder() + .subnets(subnets) + .securityGroups(securityGroups) + .assignPublicIp(AssignPublicIp.ENABLED) + .build() + ) + .build() + ) + .overrides( + TaskOverride.builder() + .containerOverrides( + ContainerOverride.builder() + .name("main") + .command(command) + .environment(envVars) + .build() + ) + .build() + ) + .build() +``` + +## 2. Task Definition Management + +### Decision: Cache task definitions by container+resource hash + +**Rationale**: ECS task definitions are versioned (family:revision). Creating a new revision for each task would be inefficient. Cache by hash of image + CPU + memory + GPU + disk configuration. + +**Alternatives Considered**: +- Single task definition with overrides: Rejected - Resource requirements (CPU/memory at task level) cannot be overridden +- No caching: Rejected - Would create excessive task definition revisions + +### Task Definition Structure + +```java +RegisterTaskDefinitionRequest.builder() + .family("nf-" + normalizedImageName) + .requiresCompatibilities(Compatibility.MANAGED_INSTANCES) + .networkMode(NetworkMode.AWSVPC) + .cpu(cpuUnits) // Task-level CPU (cannot override) + .memory(memoryMiB) // Task-level memory (cannot override) + .executionRoleArn(executionRole) + .taskRoleArn(taskRole) + .containerDefinitions( + ContainerDefinition.builder() + .name("main") + .image(containerImage) + .command("true") // Overridden at RunTask + .essential(true) + .logConfiguration(cloudWatchLogs) + .resourceRequirements(gpuRequirements) + .build() + ) + .managedInstancesProvider( + ManagedInstancesProvider.builder() + .instanceLaunchTemplate( + InstanceLaunchTemplate.builder() + .storageConfiguration( + StorageConfiguration.builder() + .storageSizeinGiB(diskSizeGiB) + .build() + ) + .build() + ) + .build() + ) + .build() +``` + +## 3. Resource Mapping + +### Decision: Map Nextflow directives to ECS task definition fields + +**CPU Mapping**: +- Nextflow `cpus = 2` → ECS `cpu = "2048"` (1 vCPU = 1024 units) +- ECS Managed Instances supports flexible CPU allocation + +**Memory Mapping**: +- Nextflow `memory = '4 GB'` → ECS `memory = "4096"` (MiB) +- Direct conversion from MemoryUnit + +**GPU Mapping**: +- Nextflow `accelerator 1, type: 'nvidia-tesla-t4'` +- ECS uses `resourceRequirements` with `GPU` type +- Triggers attribute-based instance selection for GPU instances + +**Disk Mapping**: +- Nextflow `disk = '500 GB'` +- ECS `storageConfiguration.storageSizeinGiB = 500` +- Range: 30 GiB - 16,384 GiB + +## 4. Task Status Polling + +### Decision: Use BatchContext pattern for efficient status polling + +**Rationale**: Following AWS Batch executor pattern, aggregate multiple task status queries into single `DescribeTasks` call (up to 100 tasks per call). + +**Task Status Mapping**: + +| ECS Task Status | Nextflow TaskStatus | Action | +|-----------------|---------------------|--------| +| PROVISIONING | SUBMITTED | Continue polling | +| PENDING | SUBMITTED | Continue polling | +| ACTIVATING | SUBMITTED | Continue polling | +| RUNNING | RUNNING | Continue polling | +| DEACTIVATING | RUNNING | Continue polling | +| STOPPING | RUNNING | Continue polling | +| STOPPED | COMPLETED | Extract exit code | + +**Exit Code Extraction**: +```groovy +def exitCode = task.containers()[0].exitCode() +def stoppedReason = task.stoppedReason() +def stopCode = task.stopCode() + +// Handle special cases +if (stopCode == TaskStopCode.SPOT_INTERRUPTION) { + // Trigger spot retry if configured +} +if (exitCode == null && stopCode == TaskStopCode.ESSENTIAL_CONTAINER_EXITED) { + exitCode = 1 // Default to failure +} +``` + +## 5. Spot Instance Retry + +### Decision: Implement spot retry similar to aws.batch.maxSpotAttempts + +**Rationale**: ECS Managed Instances can use spot capacity. When spot interruption occurs, the task should automatically retry up to a configurable limit. + +**Detection**: +```groovy +boolean isSpotInterruption(Task task) { + return task.stopCode() == TaskStopCode.SPOT_INTERRUPTION || + task.stoppedReason()?.contains('spot') || + task.stoppedReason()?.contains('Host EC2') +} +``` + +**Retry Logic**: +- Track `spotAttempts` counter per task handler +- On spot interruption, increment counter and resubmit if under limit +- Log warning on each retry +- Fail task if max attempts exceeded + +## 6. CloudWatch Logs Integration + +### Decision: Use awslogs driver with configurable log group + +**Rationale**: Standard ECS logging pattern. Log group configurable via `aws.ecs.logsGroup`. + +**Log Configuration**: +```java +LogConfiguration.builder() + .logDriver(LogDriver.AWSLOGS) + .options(Map.of( + "awslogs-group", logsGroup, + "awslogs-region", region, + "awslogs-stream-prefix", "nf" + )) + .build() +``` + +**Log Retrieval**: +- Log stream name format: `nf//` +- Use CloudWatch Logs `GetLogEvents` API +- Implement in AwsEcsHelper class + +## 7. Cluster Validation + +### Decision: Validate cluster and capacity provider at startup + +**Rationale**: Fail fast if cluster doesn't exist or lacks Managed Instances capacity provider. + +**Validation Steps**: +1. Call `DescribeClusters` with cluster name/ARN +2. Check cluster status is ACTIVE +3. Verify capacity providers include a Managed Instances provider +4. If validation fails, throw `ProcessUnrecoverableException` with clear message + +## 8. Fusion Integration + +### Decision: Reuse existing AwsFusionEnv implementation + +**Rationale**: Fusion environment setup for S3 is identical between Batch and ECS. Both need: +- AWS credentials in environment +- S3 endpoint configuration +- Fusion-specific environment variables + +**Implementation**: +- Implement `FusionAwareTask` trait in AwsEcsTaskHandler +- Use `FusionScriptLauncher` when Fusion enabled +- Pass Fusion environment variables to container + +## 9. Configuration Namespace + +### Decision: Use `aws.ecs.*` namespace with minimal required settings + +**Rationale**: Minimize user configuration burden by requiring only cluster and executionRole. Network settings (subnets, security groups) are auto-discovered from the default VPC when not specified. + +**Required Configuration** (only 2 settings): + +| Option | Type | Description | +|--------|------|-------------| +| `cluster` | String | ECS cluster name or ARN with Managed Instances capacity provider | +| `executionRole` | String | Task execution IAM role ARN (for image pull, CloudWatch logs) | + +**Optional Configuration** (with defaults/auto-discovery): + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `taskRole` | String | - | Task IAM role ARN (for S3 access from containers) | +| `subnets` | List | Auto-discovered | VPC subnets (discovers from default VPC) | +| `securityGroups` | List | Auto-discovered | Security groups (discovers from default VPC) | +| `logsGroup` | String | `/aws/ecs/nextflow` | CloudWatch log group | +| `maxSpotAttempts` | Integer | 5 | Max spot retry attempts | +| `assignPublicIp` | Boolean | true | Assign public IP (enables internet without NAT) | + +### Network Auto-Discovery + +When `subnets` or `securityGroups` are not specified, the executor auto-discovers them from the default VPC using EC2 API: + +```groovy +// Auto-discover default VPC subnets +List discoverDefaultSubnets() { + def response = ec2Client.describeSubnets( + DescribeSubnetsRequest.builder() + .filters( + Filter.builder() + .name("default-for-az") + .values("true") + .build() + ) + .build() + ) + return response.subnets().collect { it.subnetId() } +} + +// Auto-discover default security group +List discoverDefaultSecurityGroup() { + def response = ec2Client.describeSecurityGroups( + DescribeSecurityGroupsRequest.builder() + .filters( + Filter.builder() + .name("group-name") + .values("default") + .build() + ) + .build() + ) + return response.securityGroups().collect { it.groupId() } +} +``` + +## 10. Extension Point Registration + +### Decision: Use @ServiceName annotation for auto-discovery + +**Rationale**: Following existing executor pattern, the executor class annotated with `@ServiceName('awsecs')` and implementing `ExtensionPoint` is automatically discovered by Nextflow's plugin system. + +**Registration**: +```groovy +@ServiceName('awsecs') +@CompileStatic +class AwsEcsExecutor extends Executor implements ExtensionPoint { + // ... +} +``` + +**Build Configuration** (build.gradle): +```groovy +extensionPoints = [ + 'nextflow.cloud.aws.batch.AwsBatchExecutor', + 'nextflow.cloud.aws.ecs.AwsEcsExecutor', // ADD + // ... +] +``` + +## Summary + +All technical decisions are aligned with: +- Existing AWS Batch executor patterns in nf-amazon plugin +- AWS ECS Managed Instances best practices +- Nextflow executor architecture requirements + +No clarifications remain outstanding. diff --git a/specs/001-ecs-executor/spec.md b/specs/001-ecs-executor/spec.md new file mode 100644 index 0000000000..9e0662282c --- /dev/null +++ b/specs/001-ecs-executor/spec.md @@ -0,0 +1,225 @@ +# Feature Specification: AWS ECS Managed Instances Executor + +**Feature Branch**: `001-ecs-executor` +**Created**: 2025-12-30 +**Status**: Draft +**Input**: User description: "Add an executor to offload task executions via AWS ECS Managed instances, see https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ManagedInstances.html. The executor should make part of the nf-amazon plugin, however it should be independent from the AWS Batch implementation. The executor should allow running tasks as containers and should allow definition the following compute resources: CPUs, memory, GPUs, Disk size and type and the EC2 instance type (optionally). The storage strategy should be based on AWS S3 via Seqera Fusion file system." + +## Clarifications + +### Session 2025-12-30 + +- Q: Should the executor auto-create ECS clusters or require pre-configured infrastructure? → A: Pre-configured cluster required; executor validates cluster exists and has capacity provider; documentation provided for cluster setup. +- Q: How should task failure retries be handled? → A: Executor retries spot interruptions automatically (configurable max attempts, similar to aws.batch.maxSpotAttempts); all other errors delegate to Nextflow's errorStrategy/maxRetries. +- Q: How should tasks exceeding the 14-day ECS Managed Instances lifecycle be handled? → A: Fail task with clear error message; document this limitation prominently in executor documentation. +- Q: What configuration namespace should be used for ECS executor settings? → A: `aws.ecs.*` (e.g., `aws.ecs.cluster`, `aws.ecs.maxSpotAttempts`) to parallel existing `aws.batch` pattern. +- Q: What should the executor name be? → A: `awsecs`, following the `awsbatch` naming pattern. + +## User Scenarios & Testing *(mandatory)* + +### User Story 1 - Run Basic Nextflow Task on ECS Managed Instances (Priority: P1) + +A pipeline developer wants to run a containerized Nextflow task on AWS ECS Managed Instances with basic compute resources (CPUs and memory). The system should automatically provision appropriate EC2 instances, execute the task in the specified container, and make results available via S3/Fusion. + +**Why this priority**: This is the core functionality - without basic task execution, no other features matter. This enables the MVP where users can offload compute to ECS Managed Instances. + +**Independent Test**: Can be fully tested by running a simple Nextflow workflow with `executor = 'awsecs'` and verifying task completes successfully with output files available in the S3 work directory. + +**Acceptance Scenarios**: + +1. **Given** a Nextflow workflow with process specifying `executor = 'awsecs'`, `cpus = 2`, `memory = '4 GB'`, and a container image, **When** the workflow is executed, **Then** the task runs on an ECS Managed Instance with at least the requested resources and completes successfully. + +2. **Given** a Nextflow workflow with ECS executor configured, **When** the task fails (non-zero exit code), **Then** the error is properly reported back to Nextflow with the exit code and error logs accessible. + +3. **Given** multiple concurrent tasks in a workflow, **When** the workflow executes, **Then** tasks are submitted to ECS and run in parallel according to available capacity. + +--- + +### User Story 2 - Configure GPU-Accelerated Tasks (Priority: P2) + +A machine learning pipeline developer wants to run GPU-accelerated tasks (e.g., model training, inference) on ECS Managed Instances with NVIDIA GPUs. They need to specify the number and type of GPUs required. + +**Why this priority**: GPU support is essential for ML/AI workloads which are a growing use case. AWS ECS Managed Instances supports GPU instance types through attribute-based instance selection. + +**Independent Test**: Can be tested by running a workflow with `accelerator` directive and verifying the task runs on a GPU-enabled instance with correct GPU visibility in the container. + +**Acceptance Scenarios**: + +1. **Given** a process with `accelerator 1, type: 'nvidia-tesla-t4'`, **When** the task executes, **Then** ECS provisions a GPU-enabled instance and the container has access to the specified GPU. + +2. **Given** a process requiring GPU but no GPU instances available, **When** the task is submitted, **Then** the system waits for capacity and eventually provisions appropriate resources (or fails with clear error after timeout). + +--- + +### User Story 3 - Configure Custom Storage for Large Data Tasks (Priority: P2) + +A bioinformatics pipeline developer needs to run tasks that process large files requiring significant local disk space. They want to specify disk size to ensure sufficient ephemeral storage for temporary files during task execution. + +**Why this priority**: Many scientific workflows require substantial temporary storage for intermediate files. ECS Managed Instances supports 30 GiB to 16 TiB of EBS storage. + +**Independent Test**: Can be tested by running a workflow with `disk` directive and verifying the task has access to the requested storage capacity. + +**Acceptance Scenarios**: + +1. **Given** a process with `disk '500 GB'`, **When** the task executes, **Then** the ECS task has at least 500 GB of ephemeral storage available. + +2. **Given** a process specifying disk type (e.g., `disk '100 GB' type: 'gp3'`), **When** the task executes, **Then** the underlying EBS volume uses the specified volume type. + +--- + +### User Story 4 - Specify EC2 Instance Type (Priority: P3) + +An advanced user wants to pin tasks to specific EC2 instance types for performance predictability, cost optimization, or specific hardware requirements (e.g., high-memory instances, compute-optimized instances). + +**Why this priority**: While automatic instance selection works for most cases, power users need control over instance types for specific workloads or cost management. + +**Independent Test**: Can be tested by specifying an instance type in configuration and verifying the task runs on that exact instance type. + +**Acceptance Scenarios**: + +1. **Given** a process with `machineType 'c6i.2xlarge'`, **When** the task executes, **Then** ECS provisions that specific instance type. + +2. **Given** a process without explicit instance type, **When** the task executes, **Then** ECS automatically selects a cost-optimized instance matching the CPU/memory requirements. + +--- + +### User Story 5 - Monitor and Debug Task Execution (Priority: P3) + +A pipeline developer needs to monitor task progress and debug failures. They need access to task logs and status information during and after execution. + +**Why this priority**: Observability is critical for production workflows but builds on top of core execution functionality. + +**Independent Test**: Can be tested by running a workflow and verifying logs appear in CloudWatch and are accessible through standard Nextflow mechanisms. + +**Acceptance Scenarios**: + +1. **Given** a running task on ECS, **When** the user checks Nextflow logs, **Then** they see the task status (pending, running, completed/failed). + +2. **Given** a completed or failed task, **When** the user requests logs, **Then** container stdout/stderr logs are available via CloudWatch Logs integration. + +--- + +### Edge Cases + +- What happens when requested resources exceed ECS Managed Instances limits (e.g., >16 vCPUs, >120 GB memory per instance)? +- How does the system handle ECS capacity unavailable scenarios (instance type not available in region)? +- What happens when a task runs longer than 14 days (ECS Managed Instances lifecycle limit)? → Task fails with clear error; limitation documented. +- How does the system handle S3/Fusion connectivity failures during task execution? +- What happens when the container image pull fails (image not found, authentication error)? +- How does the system behave when AWS API rate limits are encountered? + +## Requirements *(mandatory)* + +### Functional Requirements + +- **FR-001**: System MUST provide a new executor named `awsecs` that submits Nextflow tasks to AWS ECS using the Managed Instances capacity provider. + +- **FR-002**: System MUST allow users to specify CPU requirements via the standard `cpus` process directive, mapped to ECS task definition CPU units. + +- **FR-003**: System MUST allow users to specify memory requirements via the standard `memory` process directive, mapped to ECS task definition memory allocation. + +- **FR-004**: System MUST allow users to specify GPU requirements via the standard `accelerator` process directive, triggering attribute-based instance selection for GPU-enabled instances. + +- **FR-005**: System MUST allow users to specify disk size via the standard `disk` process directive, mapped to ECS storage configuration (30 GiB - 16,384 GiB range). + +- **FR-006**: System MUST allow users to optionally specify EC2 instance type for precise control over compute resources. + +- **FR-007**: System MUST use AWS S3 as the work directory with Seqera Fusion filesystem for efficient data access. + +- **FR-007a**: System MUST configure ECS task definitions with Linux capabilities and device mappings required for FUSE driver operation: + - `linuxParameters.capabilities.add` MUST include `SYS_ADMIN` capability + - `linuxParameters.devices` MUST include `/dev/fuse` device mapping + - These settings are required for Fusion filesystem to mount S3 as a FUSE filesystem inside the container. + +- **FR-008**: System MUST automatically create ECS task definitions based on process requirements (container image, resources, environment variables). + +- **FR-009**: System MUST poll and track task status (PENDING, RUNNING, STOPPED) and report back to Nextflow runtime. + +- **FR-010**: System MUST extract and report task exit codes from completed containers. + +- **FR-011**: System MUST support task cancellation when the user terminates the workflow. + +- **FR-012**: System MUST be implemented as part of the nf-amazon plugin but independent from the existing AWS Batch executor code. + +- **FR-013**: System MUST support container image specification via the standard `container` process directive. + +- **FR-014**: System MUST integrate with CloudWatch Logs for task output logging. + +- **FR-015**: System MUST support ECS task role configuration for AWS service access from within tasks. + +- **FR-016**: System MUST validate at startup that the configured ECS cluster exists and has a Managed Instances capacity provider, failing fast with a clear error message if not. + +- **FR-017**: Documentation MUST be provided describing how to set up the required ECS cluster with Managed Instances capacity provider. + +- **FR-018**: System MUST automatically retry tasks that fail due to spot instance interruption, with configurable maximum retry attempts (similar to `aws.batch.maxSpotAttempts`). + +- **FR-019**: System MUST delegate all non-spot failures to Nextflow's standard error handling (`errorStrategy`, `maxRetries` directives). + +- **FR-020**: System MUST report a clear error when a task is terminated due to the 14-day ECS Managed Instances lifecycle limit. + +- **FR-021**: System MUST use configuration namespace `aws.ecs.*` for all ECS executor settings (e.g., `aws.ecs.cluster`, `aws.ecs.maxSpotAttempts`), consistent with the existing `aws.batch.*` pattern. + +### Infrastructure Configuration Requirements + +The executor MUST minimize required user configuration by using sensible defaults and auto-discovery. + +**Required Settings** (no defaults possible): + +- **FR-022**: System MUST require `aws.ecs.cluster` setting specifying the ECS cluster name or ARN with Managed Instances capacity provider. + +- **FR-023**: System MUST require `aws.ecs.executionRole` setting specifying the IAM role ARN for ECS task execution (image pull, CloudWatch logs). + +**Auto-Discovery** (use defaults when not specified): + +- **FR-024**: System MUST auto-discover VPC subnets from the default VPC when `aws.ecs.subnets` is not specified. If no default VPC exists, system MUST fail with a clear error message instructing the user to explicitly configure `aws.ecs.subnets`. + +- **FR-025**: System MUST auto-discover the default security group from the default VPC when `aws.ecs.securityGroups` is not specified. If no default VPC exists, system MUST fail with a clear error message instructing the user to explicitly configure `aws.ecs.securityGroups`. + +**Default Values**: + +- **FR-026**: System MUST use `/aws/ecs/nextflow` as the default CloudWatch log group when `aws.ecs.logsGroup` is not specified. + +- **FR-027**: System MUST default `aws.ecs.assignPublicIp` to `true` to enable internet access without requiring NAT gateway configuration. + +- **FR-028**: System MUST default `aws.ecs.maxSpotAttempts` to `5` for automatic spot interruption retry. + +### Key Entities + +- **ECS Cluster**: The AWS ECS cluster with Managed Instances capacity provider where tasks are executed. + +- **Task Definition**: ECS task definition created from Nextflow process requirements (container, CPU, memory, GPU, storage). + +- **Task**: Running ECS task instance corresponding to a Nextflow task execution. + +- **Capacity Provider**: The Managed Instances capacity provider configuration that handles automatic instance provisioning. + +- **Work Directory**: S3 bucket path used as the Nextflow work directory, accessed via Fusion filesystem. + +## Success Criteria *(mandatory)* + +### Measurable Outcomes + +- **SC-001**: Users can successfully execute Nextflow workflows using ECS Managed Instances executor with equivalent functionality to AWS Batch executor for container-based tasks. + +- **SC-002**: Task submission to first status update takes less than 60 seconds for available capacity scenarios. + +- **SC-003**: 99% of successfully completed tasks report correct exit codes back to Nextflow. + +- **SC-004**: GPU-accelerated tasks can access the requested number of GPUs within the container environment. + +- **SC-005**: Tasks can utilize up to 16 TiB of ephemeral storage when configured via disk directive. + +- **SC-006**: Task logs are available in CloudWatch Logs within 30 seconds of task completion. + +- **SC-007**: The executor operates independently from AWS Batch code with no shared mutable state. + +## Assumptions + +- Users have an AWS account with appropriate IAM permissions for ECS, EC2, S3, and CloudWatch. +- An ECS cluster with Managed Instances capacity provider MUST be pre-configured by the user; the executor validates but does not create infrastructure. +- Wave containers service is enabled (required for Fusion filesystem). +- Fusion filesystem is properly configured and licensed for S3 access. +- Container images are accessible from the ECS execution environment (ECR, Docker Hub, or other registry). +- Network configuration (VPC, subnets, security groups) allows ECS tasks to access S3 and pull container images. +- Task definitions use `MANAGED_INSTANCES` as the required compatibility mode. +- Tasks are expected to complete within the 14-day ECS Managed Instances lifecycle limit. diff --git a/specs/001-ecs-executor/tasks.md b/specs/001-ecs-executor/tasks.md new file mode 100644 index 0000000000..cf5f7b87c0 --- /dev/null +++ b/specs/001-ecs-executor/tasks.md @@ -0,0 +1,230 @@ +# Tasks: AWS ECS Managed Instances Executor + +**Input**: Design documents from `/specs/001-ecs-executor/` +**Prerequisites**: plan.md, spec.md, research.md, data-model.md, contracts/ + +**Organization**: Tasks are grouped by user story to enable independent implementation and testing. + +## Format: `[ID] [P?] [Story] Description` + +- **[P]**: Can run in parallel (different files, no dependencies) +- **[Story]**: Which user story this task belongs to (US1-US5) +- Paths are relative to `plugins/nf-amazon/src/` + +--- + +## Phase 1: Setup (Shared Infrastructure) + +**Purpose**: Package structure and configuration foundation + +- [x] T001 Create ECS executor package directory structure at `main/nextflow/cloud/aws/ecs/` +- [x] T002 Create ECS model subpackage at `main/nextflow/cloud/aws/ecs/model/` +- [x] T003 [P] Create test package at `test/nextflow/cloud/aws/ecs/` +- [x] T004 Register extension point in `plugins/nf-amazon/build.gradle` for AwsEcsExecutor + +--- + +## Phase 2: Foundational (Blocking Prerequisites) + +**Purpose**: Core infrastructure that MUST be complete before ANY user story can be implemented + +**CRITICAL**: No user story work can begin until this phase is complete + +- [x] T005 Create AwsEcsConfig class with @ConfigScope annotation at `main/nextflow/cloud/aws/config/AwsEcsConfig.groovy` +- [x] T006 Add `ecs` field to AwsConfig class at `main/nextflow/cloud/aws/config/AwsConfig.groovy` +- [x] T007 [P] Create AwsEcsOptions wrapper class at `main/nextflow/cloud/aws/ecs/AwsEcsOptions.groovy` +- [x] T008 Create AwsEcsExecutor skeleton with @ServiceName('awsecs') at `main/nextflow/cloud/aws/ecs/AwsEcsExecutor.groovy` +- [x] T009 Create AwsEcsTaskHandler skeleton extending TaskHandler at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [x] T010 [P] Create AwsEcsConfigTest at `test/nextflow/cloud/aws/ecs/AwsEcsConfigTest.groovy` + +**Checkpoint**: Foundation ready - user story implementation can now begin + +--- + +## Phase 3: User Story 1 - Run Basic Nextflow Task (Priority: P1) MVP + +**Goal**: Execute containerized tasks on ECS Managed Instances with basic CPU/memory resources + +**Independent Test**: Run simple workflow with `executor = 'awsecs'`, verify task completes with output in S3 + +### Implementation for User Story 1 + +- [x] T011 [US1] Implement AwsEcsConfig required fields (cluster, executionRole) with validation at `main/nextflow/cloud/aws/config/AwsEcsConfig.groovy` +- [x] T012 [US1] Implement AwsEcsConfig optional fields with defaults (logsGroup, maxSpotAttempts, assignPublicIp) at `main/nextflow/cloud/aws/config/AwsEcsConfig.groovy` +- [x] T013 [US1] Implement VPC auto-discovery for subnets/securityGroups in AwsEcsExecutor (fail with clear error if no default VPC) at `main/nextflow/cloud/aws/ecs/AwsEcsExecutor.groovy` +- [x] T014 [US1] Implement AwsEcsExecutor.register() with ECS and EC2 client initialization (EC2 for VPC auto-discovery) at `main/nextflow/cloud/aws/ecs/AwsEcsExecutor.groovy` +- [x] T015 [US1] Implement AwsEcsExecutor.createTaskHandler() at `main/nextflow/cloud/aws/ecs/AwsEcsExecutor.groovy` +- [x] T016 [US1] Implement AwsEcsExecutor.createTaskMonitor() with ParallelPollingMonitor at `main/nextflow/cloud/aws/ecs/AwsEcsExecutor.groovy` +- [x] T017 [P] [US1] Create RegisterTaskDefinitionModel at `main/nextflow/cloud/aws/ecs/model/RegisterTaskDefinitionModel.groovy` +- [x] T018 [P] [US1] Create ContainerDefinitionModel at `main/nextflow/cloud/aws/ecs/model/ContainerDefinitionModel.groovy` +- [x] T019 [US1] Implement task definition registration with RegisterTaskDefinition API in AwsEcsTaskHandler at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [x] T020 [US1] Implement CPU/memory mapping (cpus directive -> CPU units, memory -> MiB) in AwsEcsTaskHandler at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [x] T021 [US1] Implement AwsEcsTaskHandler.submit() with RunTask API at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [x] T022 [US1] Implement AwsEcsTaskHandler.checkIfRunning() with status polling at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [x] T023 [US1] Implement AwsEcsTaskHandler.checkIfCompleted() with exit code extraction at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [x] T024 [US1] Implement AwsEcsTaskHandler.killTask() with StopTask API at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [x] T025 [US1] Implement FusionAwareTask trait for S3/Fusion integration in AwsEcsTaskHandler at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [x] T025a [US1] Implement Linux capabilities (SYS_ADMIN) and device mappings (/dev/fuse) in task definition for Fusion FUSE driver support at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [x] T026 [US1] Implement task definition caching by container+resource hash in AwsEcsTaskHandler at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [x] T027 [P] [US1] Create AwsEcsExecutorTest with mock ECS client at `test/nextflow/cloud/aws/ecs/AwsEcsExecutorTest.groovy` +- [x] T028 [P] [US1] Create AwsEcsTaskHandlerTest at `test/nextflow/cloud/aws/ecs/AwsEcsTaskHandlerTest.groovy` + +**Checkpoint**: Basic task execution works - MVP complete ✅ + +--- + +## Phase 4: User Story 2 - GPU-Accelerated Tasks (Priority: P2) + +**Goal**: Run tasks on GPU-enabled instances using the accelerator directive + +**Independent Test**: Run workflow with `accelerator 1, type: 'nvidia-tesla-t4'`, verify GPU access in container + +### Implementation for User Story 2 + +- [ ] T029 [US2] Add GPU resourceRequirements to RegisterTaskDefinitionModel at `main/nextflow/cloud/aws/ecs/model/RegisterTaskDefinitionModel.groovy` +- [ ] T030 [US2] Implement accelerator directive parsing in AwsEcsTaskHandler at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [ ] T031 [US2] Map GPU requirements to ECS resourceRequirements in task definition at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [ ] T032 [US2] Add GPU to task definition cache key computation at `main/nextflow/cloud/aws/ecs/AwsEcsExecutor.groovy` +- [ ] T033 [P] [US2] Add GPU tests to AwsEcsTaskHandlerTest at `test/nextflow/cloud/aws/ecs/AwsEcsTaskHandlerTest.groovy` + +**Checkpoint**: GPU tasks work independently + +--- + +## Phase 5: User Story 3 - Custom Storage (Priority: P2) + +**Goal**: Configure ephemeral disk storage (30 GiB - 16 TiB) via disk directive + +**Independent Test**: Run workflow with `disk '500 GB'`, verify storage capacity available + +### Implementation for User Story 3 + +- [x] T034 [US3] Add storageConfiguration to RegisterTaskDefinitionModel at `main/nextflow/cloud/aws/ecs/model/RegisterTaskDefinitionModel.groovy` +- [x] T035 [US3] Implement disk directive parsing with validation (30-16384 GiB) in AwsEcsTaskHandler at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [x] T036 [US3] Map disk size to ephemeralStorage in task definition at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [x] T037 [US3] Add disk size to task definition cache key computation at `main/nextflow/cloud/aws/ecs/model/RegisterTaskDefinitionModel.groovy` +- [ ] T038 [P] [US3] Add disk storage tests to AwsEcsTaskHandlerTest at `test/nextflow/cloud/aws/ecs/AwsEcsTaskHandlerTest.groovy` + +**Checkpoint**: Custom storage works independently + +--- + +## Phase 6: User Story 4 - EC2 Instance Type (Priority: P3) + +**Goal**: Allow pinning tasks to specific EC2 instance types via machineType directive + +**Independent Test**: Run workflow with `machineType 'c6i.2xlarge'`, verify instance type used + +### Implementation for User Story 4 + +- [ ] T039 [US4] Add instance type constraint to RegisterTaskDefinitionModel at `main/nextflow/cloud/aws/ecs/model/RegisterTaskDefinitionModel.groovy` +- [ ] T040 [US4] Implement machineType directive parsing in AwsEcsTaskHandler at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [ ] T041 [US4] Map machineType to capacity provider attributes in RunTask request at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [ ] T042 [P] [US4] Add instance type tests to AwsEcsTaskHandlerTest at `test/nextflow/cloud/aws/ecs/AwsEcsTaskHandlerTest.groovy` + +**Checkpoint**: Instance type selection works independently + +--- + +## Phase 7: User Story 5 - Monitoring & Debugging (Priority: P3) + +**Goal**: CloudWatch Logs integration and task status observability + +**Independent Test**: Run workflow, verify logs appear in CloudWatch and are accessible via Nextflow + +### Implementation for User Story 5 + +- [ ] T043 [US5] Create AwsEcsHelper class for CloudWatch logs operations at `main/nextflow/cloud/aws/ecs/AwsEcsHelper.groovy` +- [ ] T044 [US5] Implement getTaskLogStream() for log retrieval in AwsEcsHelper at `main/nextflow/cloud/aws/ecs/AwsEcsHelper.groovy` +- [x] T045 [US5] Implement CloudWatch log configuration in task definition (awslogs driver) at `main/nextflow/cloud/aws/ecs/model/ContainerDefinitionModel.groovy` +- [ ] T046 [US5] Implement describeCluster() for cluster validation in AwsEcsHelper at `main/nextflow/cloud/aws/ecs/AwsEcsHelper.groovy` +- [ ] T047 [US5] Add cluster validation at executor startup using AwsEcsHelper at `main/nextflow/cloud/aws/ecs/AwsEcsExecutor.groovy` +- [x] T048 [US5] Implement spot interruption detection in checkIfCompleted() at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [ ] T049 [US5] Implement spot retry logic with maxSpotAttempts counter at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [ ] T050 [US5] Implement 14-day lifecycle error detection and clear error message at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [ ] T051 [US5] Implement BatchHandler trait for efficient status polling via BatchContext at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [ ] T052 [US5] Implement resource limits validation with clear error for exceeded limits (>16 vCPUs, >120 GB memory) at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [ ] T053 [US5] Implement capacity unavailable error handling (instance type not available) at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [x] T054 [US5] Implement container image pull failure detection and clear error message at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` +- [x] T055 [US5] Implement AWS API rate limit handling with appropriate backoff at `main/nextflow/cloud/aws/ecs/AwsEcsExecutor.groovy` +- [ ] T056 [P] [US5] Add error handling tests to AwsEcsTaskHandlerTest at `test/nextflow/cloud/aws/ecs/AwsEcsTaskHandlerTest.groovy` +- [ ] T057 [P] [US5] Add CloudWatch logs tests to AwsEcsExecutorTest at `test/nextflow/cloud/aws/ecs/AwsEcsExecutorTest.groovy` + +**Checkpoint**: Monitoring and debugging works independently + +--- + +## Phase 8: Polish & Cross-Cutting Concerns + +**Purpose**: Documentation, integration validation, cleanup + +- [x] T058 [P] Add Apache 2.0 license headers to all new source files +- [ ] T059 [P] Update plugins/nf-amazon/VERSION with appropriate version bump +- [ ] T060 Run quickstart.md validation scenarios +- [ ] T061 Code cleanup and Groovy idiom consistency review +- [ ] T062 [P] Add integration test workflow in tests/ directory +- [ ] T063 [P] [Low Priority] Implement disk type support (e.g., `type: 'gp3'`) in storage configuration at `main/nextflow/cloud/aws/ecs/AwsEcsTaskHandler.groovy` + +--- + +## Dependencies & Execution Order + +### Phase Dependencies + +- **Setup (Phase 1)**: No dependencies - can start immediately ✅ COMPLETE +- **Foundational (Phase 2)**: Depends on Setup completion - BLOCKS all user stories ✅ COMPLETE +- **User Stories (Phase 3-7)**: All depend on Foundational phase completion + - User stories can proceed in parallel (if staffed) + - Or sequentially in priority order (P1 -> P2 -> P3) +- **Polish (Phase 8)**: Depends on all desired user stories being complete + +### User Story Dependencies + +- **User Story 1 (P1)**: Can start after Foundational (Phase 2) - No dependencies on other stories ✅ COMPLETE +- **User Story 2 (P2)**: Can start after Foundational - Extends US1 task definition model +- **User Story 3 (P2)**: Can start after Foundational - Extends US1 task definition model ✅ MOSTLY COMPLETE (T038 pending) +- **User Story 4 (P3)**: Can start after Foundational - Extends US1 RunTask handling +- **User Story 5 (P3)**: Can start after Foundational - Adds observability to US1 infrastructure (partial) + +### Within Each User Story + +- Models before services/handlers +- Core implementation before enhancements +- Tests can be written in parallel with implementation + +### Parallel Opportunities + +- All Setup tasks marked [P] can run in parallel +- All Foundational tasks marked [P] can run in parallel +- Once Foundational phase completes, all user stories can start in parallel +- Model classes (T017, T018) can be created in parallel +- Test files can be created in parallel with implementation + +--- + +## Progress Summary + +| Phase | Status | Tasks Complete | Tasks Remaining | +|-------|--------|----------------|-----------------| +| Phase 1: Setup | ✅ COMPLETE | 4/4 | 0 | +| Phase 2: Foundational | ✅ COMPLETE | 6/6 | 0 | +| Phase 3: US1 Basic Tasks | ✅ COMPLETE | 18/18 | 0 | +| Phase 4: US2 GPU | ⬜ NOT STARTED | 0/5 | 5 | +| Phase 5: US3 Storage | 🟡 IN PROGRESS | 4/5 | 1 (T038) | +| Phase 6: US4 Instance Type | ⬜ NOT STARTED | 0/4 | 4 | +| Phase 7: US5 Monitoring | 🟡 IN PROGRESS | 4/15 | 11 | +| Phase 8: Polish | 🟡 IN PROGRESS | 1/6 | 5 | + +**Overall Progress**: 37/63 tasks complete (59%) + +--- + +## Notes + +- [P] tasks = different files, no dependencies +- [Story] label maps task to specific user story for traceability +- Each user story should be independently completable and testable +- Commit after each task or logical group +- Stop at any checkpoint to validate story independently +- Avoid: vague tasks, same file conflicts, cross-story dependencies that break independence +- AwsEcsProxy (throttling) is DEFERRED to optimization phase - not included in these tasks