From 2b8f4044651f2dbe75f9d02e425baa8b933e0823 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Mon, 19 Aug 2024 11:57:59 -0700 Subject: [PATCH 1/2] Add head s3 bucket function to lib. --- src/toil/jobStores/aws/jobStore.py | 150 ++++++++++++++--------------- src/toil/lib/aws/s3.py | 31 ++++++ 2 files changed, 104 insertions(+), 77 deletions(-) create mode 100644 src/toil/lib/aws/s3.py diff --git a/src/toil/jobStores/aws/jobStore.py b/src/toil/jobStores/aws/jobStore.py index 74958a2d77..dbdf8517e9 100644 --- a/src/toil/jobStores/aws/jobStore.py +++ b/src/toil/jobStores/aws/jobStore.py @@ -62,6 +62,7 @@ uploadFromPath, ) from toil.jobStores.utils import ReadablePipe, ReadableTransformingPipe, WritablePipe +from toil.lib.aws.s3 import head_s3_bucket from toil.lib.aws import build_tag_dict_from_env from toil.lib.aws.session import establish_boto3_session from toil.lib.aws.utils import ( @@ -821,85 +822,80 @@ def bucket_retry_predicate(error): return False bucketExisted = True - for attempt in retry_s3(predicate=bucket_retry_predicate): - with attempt: - try: - # the head_bucket() call makes sure that the bucket exists and the user can access it - self.s3_client.head_bucket(Bucket=bucket_name) - - bucket = self.s3_resource.Bucket(bucket_name) - except ClientError as e: - error_http_status = get_error_status(e) - if error_http_status == 404: - bucketExisted = False - logger.debug("Bucket '%s' does not exist.", bucket_name) - if create: - bucket = create_s3_bucket( - self.s3_resource, bucket_name, self.region - ) - # Wait until the bucket exists before checking the region and adding tags - bucket.wait_until_exists() - - # It is possible for create_bucket to return but - # for an immediate request for the bucket region to - # produce an S3ResponseError with code - # NoSuchBucket. We let that kick us back up to the - # main retry loop. - assert ( - get_bucket_region(bucket_name) == self.region - ), f"bucket_name: {bucket_name}, {get_bucket_region(bucket_name)} != {self.region}" - - tags = build_tag_dict_from_env() - - if tags: - flat_tags = flatten_tags(tags) - bucket_tagging = self.s3_resource.BucketTagging(bucket_name) - bucket_tagging.put(Tagging={'TagSet': flat_tags}) - - # Configure bucket so that we can make objects in - # it public, which was the historical default. - enable_public_objects(bucket_name) - elif block: - raise - else: - return None - elif error_http_status == 301: - # This is raised if the user attempts to get a bucket in a region outside - # the specified one, if the specified one is not `us-east-1`. The us-east-1 - # server allows a user to use buckets from any region. - raise BucketLocationConflictException(get_bucket_region(bucket_name)) - else: - raise - else: - bucketRegion = get_bucket_region(bucket_name) - if bucketRegion != self.region: - raise BucketLocationConflictException(bucketRegion) - - if versioning and not bucketExisted: - # only call this method on bucket creation - bucket.Versioning().enable() - # Now wait until versioning is actually on. Some uploads - # would come back with no versions; maybe they were - # happening too fast and this setting isn't sufficiently - # consistent? - time.sleep(1) - while not self._getBucketVersioning(bucket_name): - logger.warning(f"Waiting for versioning activation on bucket '{bucket_name}'...") - time.sleep(1) - elif check_versioning_consistency: - # now test for versioning consistency - # we should never see any of these errors since 'versioning' should always be true - bucket_versioning = self._getBucketVersioning(bucket_name) - if bucket_versioning != versioning: - assert False, 'Cannot modify versioning on existing bucket' - elif bucket_versioning is None: - assert False, 'Cannot use a bucket with versioning suspended' - if bucketExisted: - logger.debug(f"Using pre-existing job store bucket '{bucket_name}'.") + try: + # the head_bucket() call makes sure that the bucket exists and the user can access it + head_s3_bucket(Bucket=bucket_name) + + bucket = self.s3_resource.Bucket(bucket_name) + except ClientError as e: + error_http_status = get_error_status(e) + if error_http_status == 404: + bucketExisted = False + logger.debug("Bucket '%s' does not exist.", bucket_name) + if create: + bucket = create_s3_bucket(self.s3_resource, bucket_name, self.region) + # Wait until the bucket exists before checking the region and adding tags + bucket.wait_until_exists() + + # It is possible for create_bucket to return but + # for an immediate request for the bucket region to + # produce an S3ResponseError with code + # NoSuchBucket. We let that kick us back up to the + # main retry loop. + assert (get_bucket_region(bucket_name) == self.region),\ + f"bucket_name: {bucket_name}, {get_bucket_region(bucket_name)} != {self.region}" + + tags = build_tag_dict_from_env() + + if tags: + flat_tags = flatten_tags(tags) + bucket_tagging = self.s3_resource.BucketTagging(bucket_name) + bucket_tagging.put(Tagging={'TagSet': flat_tags}) + + # Configure bucket so that we can make objects in + # it public, which was the historical default. + enable_public_objects(bucket_name) + elif block: + raise else: - logger.debug(f"Created new job store bucket '{bucket_name}' with versioning state {versioning}.") + return None + elif error_http_status == 301: + # This is raised if the user attempts to get a bucket in a region outside + # the specified one, if the specified one is not `us-east-1`. The us-east-1 + # server allows a user to use buckets from any region. + raise BucketLocationConflictException(get_bucket_region(bucket_name)) + else: + raise + else: + bucketRegion = get_bucket_region(bucket_name) + if bucketRegion != self.region: + raise BucketLocationConflictException(bucketRegion) + + if versioning and not bucketExisted: + # only call this method on bucket creation + bucket.Versioning().enable() + # Now wait until versioning is actually on. Some uploads + # would come back with no versions; maybe they were + # happening too fast and this setting isn't sufficiently + # consistent? + time.sleep(1) + while not self._getBucketVersioning(bucket_name): + logger.warning(f"Waiting for versioning activation on bucket '{bucket_name}'...") + time.sleep(1) + elif check_versioning_consistency: + # now test for versioning consistency + # we should never see any of these errors since 'versioning' should always be true + bucket_versioning = self._getBucketVersioning(bucket_name) + if bucket_versioning != versioning: + assert False, 'Cannot modify versioning on existing bucket' + elif bucket_versioning is None: + assert False, 'Cannot use a bucket with versioning suspended' + if bucketExisted: + logger.debug(f"Using pre-existing job store bucket '{bucket_name}'.") + else: + logger.debug(f"Created new job store bucket '{bucket_name}' with versioning state {versioning}.") - return bucket + return bucket def _bindDomain(self, domain_name: str, create: bool = False, block: bool = True) -> None: """ diff --git a/src/toil/lib/aws/s3.py b/src/toil/lib/aws/s3.py new file mode 100644 index 0000000000..fab8c05945 --- /dev/null +++ b/src/toil/lib/aws/s3.py @@ -0,0 +1,31 @@ +# Copyright (C) 2015-2024 Regents of the University of California +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from typing import Dict, Any, Optional +from toil.lib.aws import session, AWSServerErrors +from toil.lib.retry import retry + +logger = logging.getLogger(__name__) + + +@retry(errors=[AWSServerErrors]) +def head_s3_bucket(bucket: str, region: Optional[str] = None): + """ + Attempt to HEAD an s3 bucket and return its response. + + :param bucket: AWS bucket name + :param region: Region that we want to look for the bucket in + """ + s3_client = session.client("s3", region_name=region) + return s3_client.head_bucket(Bucket=bucket) From dbe3fabc875fdcc0adbc2de010fec6c62ed54dec Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Mon, 19 Aug 2024 11:59:31 -0700 Subject: [PATCH 2/2] Shorten comment. --- src/toil/jobStores/aws/jobStore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/jobStores/aws/jobStore.py b/src/toil/jobStores/aws/jobStore.py index dbdf8517e9..3a7ec297ea 100644 --- a/src/toil/jobStores/aws/jobStore.py +++ b/src/toil/jobStores/aws/jobStore.py @@ -823,7 +823,7 @@ def bucket_retry_predicate(error): bucketExisted = True try: - # the head_bucket() call makes sure that the bucket exists and the user can access it + # make sure bucket exists and user can access it head_s3_bucket(Bucket=bucket_name) bucket = self.s3_resource.Bucket(bucket_name)