Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move s3 HEAD bucket to lib. #5067

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
150 changes: 73 additions & 77 deletions src/toil/jobStores/aws/jobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
uploadFromPath,
)
from toil.jobStores.utils import ReadablePipe, ReadableTransformingPipe, WritablePipe
from toil.lib.aws.s3 import head_s3_bucket
from toil.lib.aws import build_tag_dict_from_env
from toil.lib.aws.session import establish_boto3_session
from toil.lib.aws.utils import (
Expand Down Expand Up @@ -821,85 +822,80 @@ def bucket_retry_predicate(error):
return False

bucketExisted = True
for attempt in retry_s3(predicate=bucket_retry_predicate):
with attempt:
try:
# the head_bucket() call makes sure that the bucket exists and the user can access it
self.s3_client.head_bucket(Bucket=bucket_name)

bucket = self.s3_resource.Bucket(bucket_name)
except ClientError as e:
error_http_status = get_error_status(e)
if error_http_status == 404:
bucketExisted = False
logger.debug("Bucket '%s' does not exist.", bucket_name)
if create:
bucket = create_s3_bucket(
self.s3_resource, bucket_name, self.region
)
# Wait until the bucket exists before checking the region and adding tags
bucket.wait_until_exists()

# It is possible for create_bucket to return but
# for an immediate request for the bucket region to
# produce an S3ResponseError with code
# NoSuchBucket. We let that kick us back up to the
# main retry loop.
assert (
get_bucket_region(bucket_name) == self.region
), f"bucket_name: {bucket_name}, {get_bucket_region(bucket_name)} != {self.region}"

tags = build_tag_dict_from_env()

if tags:
flat_tags = flatten_tags(tags)
bucket_tagging = self.s3_resource.BucketTagging(bucket_name)
bucket_tagging.put(Tagging={'TagSet': flat_tags})

# Configure bucket so that we can make objects in
# it public, which was the historical default.
enable_public_objects(bucket_name)
elif block:
raise
else:
return None
elif error_http_status == 301:
# This is raised if the user attempts to get a bucket in a region outside
# the specified one, if the specified one is not `us-east-1`. The us-east-1
# server allows a user to use buckets from any region.
raise BucketLocationConflictException(get_bucket_region(bucket_name))
else:
raise
else:
bucketRegion = get_bucket_region(bucket_name)
if bucketRegion != self.region:
raise BucketLocationConflictException(bucketRegion)

if versioning and not bucketExisted:
# only call this method on bucket creation
bucket.Versioning().enable()
# Now wait until versioning is actually on. Some uploads
# would come back with no versions; maybe they were
# happening too fast and this setting isn't sufficiently
# consistent?
time.sleep(1)
while not self._getBucketVersioning(bucket_name):
logger.warning(f"Waiting for versioning activation on bucket '{bucket_name}'...")
time.sleep(1)
elif check_versioning_consistency:
# now test for versioning consistency
# we should never see any of these errors since 'versioning' should always be true
bucket_versioning = self._getBucketVersioning(bucket_name)
if bucket_versioning != versioning:
assert False, 'Cannot modify versioning on existing bucket'
elif bucket_versioning is None:
assert False, 'Cannot use a bucket with versioning suspended'
if bucketExisted:
logger.debug(f"Using pre-existing job store bucket '{bucket_name}'.")
try:
# make sure bucket exists and user can access it
head_s3_bucket(Bucket=bucket_name)

bucket = self.s3_resource.Bucket(bucket_name)
except ClientError as e:
error_http_status = get_error_status(e)
if error_http_status == 404:
bucketExisted = False
logger.debug("Bucket '%s' does not exist.", bucket_name)
if create:
bucket = create_s3_bucket(self.s3_resource, bucket_name, self.region)
# Wait until the bucket exists before checking the region and adding tags
bucket.wait_until_exists()

# It is possible for create_bucket to return but
# for an immediate request for the bucket region to
# produce an S3ResponseError with code
# NoSuchBucket. We let that kick us back up to the
# main retry loop.
assert (get_bucket_region(bucket_name) == self.region),\
f"bucket_name: {bucket_name}, {get_bucket_region(bucket_name)} != {self.region}"

tags = build_tag_dict_from_env()

if tags:
flat_tags = flatten_tags(tags)
bucket_tagging = self.s3_resource.BucketTagging(bucket_name)
bucket_tagging.put(Tagging={'TagSet': flat_tags})

# Configure bucket so that we can make objects in
# it public, which was the historical default.
enable_public_objects(bucket_name)
elif block:
raise
else:
logger.debug(f"Created new job store bucket '{bucket_name}' with versioning state {versioning}.")
return None
elif error_http_status == 301:
# This is raised if the user attempts to get a bucket in a region outside
# the specified one, if the specified one is not `us-east-1`. The us-east-1
# server allows a user to use buckets from any region.
raise BucketLocationConflictException(get_bucket_region(bucket_name))
else:
raise
else:
bucketRegion = get_bucket_region(bucket_name)
if bucketRegion != self.region:
raise BucketLocationConflictException(bucketRegion)

if versioning and not bucketExisted:
# only call this method on bucket creation
bucket.Versioning().enable()
# Now wait until versioning is actually on. Some uploads
# would come back with no versions; maybe they were
# happening too fast and this setting isn't sufficiently
# consistent?
time.sleep(1)
while not self._getBucketVersioning(bucket_name):
logger.warning(f"Waiting for versioning activation on bucket '{bucket_name}'...")
time.sleep(1)
elif check_versioning_consistency:
# now test for versioning consistency
# we should never see any of these errors since 'versioning' should always be true
bucket_versioning = self._getBucketVersioning(bucket_name)
if bucket_versioning != versioning:
assert False, 'Cannot modify versioning on existing bucket'
elif bucket_versioning is None:
assert False, 'Cannot use a bucket with versioning suspended'
if bucketExisted:
logger.debug(f"Using pre-existing job store bucket '{bucket_name}'.")
else:
logger.debug(f"Created new job store bucket '{bucket_name}' with versioning state {versioning}.")

return bucket
return bucket

def _bindDomain(self, domain_name: str, create: bool = False, block: bool = True) -> None:
"""
Expand Down
31 changes: 31 additions & 0 deletions src/toil/lib/aws/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (C) 2015-2024 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Dict, Any, Optional
from toil.lib.aws import session, AWSServerErrors
from toil.lib.retry import retry

logger = logging.getLogger(__name__)


@retry(errors=[AWSServerErrors])
def head_s3_bucket(bucket: str, region: Optional[str] = None):
"""
Attempt to HEAD an s3 bucket and return its response.

:param bucket: AWS bucket name
:param region: Region that we want to look for the bucket in
"""
s3_client = session.client("s3", region_name=region)
return s3_client.head_bucket(Bucket=bucket)