Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move s3 del obj to lib. #5069

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2b8f404
Add head s3 bucket function to lib.
DailyDreaming Aug 19, 2024
dbe3fab
Shorten comment.
DailyDreaming Aug 19, 2024
08b8f1b
Delete s3 object.
DailyDreaming Aug 19, 2024
087f725
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Aug 21, 2024
cfaaa1a
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Aug 22, 2024
9855322
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Aug 22, 2024
a25d022
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Aug 22, 2024
887608b
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Aug 27, 2024
a9c88fa
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Aug 27, 2024
c28efa6
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Sep 3, 2024
cb23358
Rebase w/conflict resolution.
DailyDreaming Oct 7, 2024
3cca97a
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Oct 8, 2024
6bbb2fe
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Oct 9, 2024
809438a
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Oct 9, 2024
4f21185
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Oct 10, 2024
ed14bcc
Add missing head bucket.
DailyDreaming Oct 11, 2024
dc33884
Type.
DailyDreaming Oct 14, 2024
3553c61
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Oct 14, 2024
0886ac1
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Oct 21, 2024
5591597
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Oct 22, 2024
0fe82dc
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Oct 22, 2024
2fb71e3
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Oct 22, 2024
ae0c5ff
Merge master into issues/5068-mv-s3-del-obj-to-lib
github-actions[bot] Oct 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 90 additions & 96 deletions src/toil/jobStores/aws/jobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
uploadFromPath,
)
from toil.jobStores.utils import ReadablePipe, ReadableTransformingPipe, WritablePipe
from toil.lib.aws.s3 import head_s3_bucket, delete_s3_object
from toil.lib.aws import build_tag_dict_from_env
from toil.lib.aws.session import establish_boto3_session
from toil.lib.aws.utils import (
Expand Down Expand Up @@ -504,15 +505,12 @@ def delete_job(self, job_id):
for item in items:
item: "ItemTypeDef"
version = get_item_from_attributes(attributes=item["Attributes"], name="version")
for attempt in retry_s3():
with attempt:
if version:
self.s3_client.delete_object(Bucket=self.files_bucket.name,
Key=compat_bytes(item["Name"]),
VersionId=version)
else:
self.s3_client.delete_object(Bucket=self.files_bucket.name,
Key=compat_bytes(item["Name"]))
delete_s3_object(
bucket=self.files_bucket.name,
key=compat_bytes(item["Name"]),
version=version,
region=self.outer.region
)

def get_empty_file_store_id(self, jobStoreID=None, cleanup=False, basename=None) -> FileID:
info = self.FileInfo.create(jobStoreID if cleanup else None)
Expand Down Expand Up @@ -821,85 +819,80 @@ def bucket_retry_predicate(error):
return False

bucketExisted = True
for attempt in retry_s3(predicate=bucket_retry_predicate):
with attempt:
try:
# the head_bucket() call makes sure that the bucket exists and the user can access it
self.s3_client.head_bucket(Bucket=bucket_name)

bucket = self.s3_resource.Bucket(bucket_name)
except ClientError as e:
error_http_status = get_error_status(e)
if error_http_status == 404:
bucketExisted = False
logger.debug("Bucket '%s' does not exist.", bucket_name)
if create:
bucket = create_s3_bucket(
self.s3_resource, bucket_name, self.region
)
# Wait until the bucket exists before checking the region and adding tags
bucket.wait_until_exists()

# It is possible for create_bucket to return but
# for an immediate request for the bucket region to
# produce an S3ResponseError with code
# NoSuchBucket. We let that kick us back up to the
# main retry loop.
assert (
get_bucket_region(bucket_name) == self.region
), f"bucket_name: {bucket_name}, {get_bucket_region(bucket_name)} != {self.region}"

tags = build_tag_dict_from_env()

if tags:
flat_tags = flatten_tags(tags)
bucket_tagging = self.s3_resource.BucketTagging(bucket_name)
bucket_tagging.put(Tagging={'TagSet': flat_tags})

# Configure bucket so that we can make objects in
# it public, which was the historical default.
enable_public_objects(bucket_name)
elif block:
raise
else:
return None
elif error_http_status == 301:
# This is raised if the user attempts to get a bucket in a region outside
# the specified one, if the specified one is not `us-east-1`. The us-east-1
# server allows a user to use buckets from any region.
raise BucketLocationConflictException(get_bucket_region(bucket_name))
else:
raise
else:
bucketRegion = get_bucket_region(bucket_name)
if bucketRegion != self.region:
raise BucketLocationConflictException(bucketRegion)

if versioning and not bucketExisted:
# only call this method on bucket creation
bucket.Versioning().enable()
# Now wait until versioning is actually on. Some uploads
# would come back with no versions; maybe they were
# happening too fast and this setting isn't sufficiently
# consistent?
time.sleep(1)
while not self._getBucketVersioning(bucket_name):
logger.warning(f"Waiting for versioning activation on bucket '{bucket_name}'...")
time.sleep(1)
elif check_versioning_consistency:
# now test for versioning consistency
# we should never see any of these errors since 'versioning' should always be true
bucket_versioning = self._getBucketVersioning(bucket_name)
if bucket_versioning != versioning:
assert False, 'Cannot modify versioning on existing bucket'
elif bucket_versioning is None:
assert False, 'Cannot use a bucket with versioning suspended'
if bucketExisted:
logger.debug(f"Using pre-existing job store bucket '{bucket_name}'.")
try:
# make sure bucket exists and user can access it
head_s3_bucket(Bucket=bucket_name)

bucket = self.s3_resource.Bucket(bucket_name)
except ClientError as e:
error_http_status = get_error_status(e)
if error_http_status == 404:
bucketExisted = False
logger.debug("Bucket '%s' does not exist.", bucket_name)
if create:
bucket = create_s3_bucket(self.s3_resource, bucket_name, self.region)
# Wait until the bucket exists before checking the region and adding tags
bucket.wait_until_exists()

# It is possible for create_bucket to return but
# for an immediate request for the bucket region to
# produce an S3ResponseError with code
# NoSuchBucket. We let that kick us back up to the
# main retry loop.
assert (get_bucket_region(bucket_name) == self.region),\
f"bucket_name: {bucket_name}, {get_bucket_region(bucket_name)} != {self.region}"

tags = build_tag_dict_from_env()

if tags:
flat_tags = flatten_tags(tags)
bucket_tagging = self.s3_resource.BucketTagging(bucket_name)
bucket_tagging.put(Tagging={'TagSet': flat_tags})

# Configure bucket so that we can make objects in
# it public, which was the historical default.
enable_public_objects(bucket_name)
elif block:
raise
else:
logger.debug(f"Created new job store bucket '{bucket_name}' with versioning state {versioning}.")
return None
elif error_http_status == 301:
# This is raised if the user attempts to get a bucket in a region outside
# the specified one, if the specified one is not `us-east-1`. The us-east-1
# server allows a user to use buckets from any region.
raise BucketLocationConflictException(get_bucket_region(bucket_name))
else:
raise
else:
bucketRegion = get_bucket_region(bucket_name)
if bucketRegion != self.region:
raise BucketLocationConflictException(bucketRegion)

if versioning and not bucketExisted:
# only call this method on bucket creation
bucket.Versioning().enable()
# Now wait until versioning is actually on. Some uploads
# would come back with no versions; maybe they were
# happening too fast and this setting isn't sufficiently
# consistent?
time.sleep(1)
while not self._getBucketVersioning(bucket_name):
logger.warning(f"Waiting for versioning activation on bucket '{bucket_name}'...")
time.sleep(1)
elif check_versioning_consistency:
# now test for versioning consistency
# we should never see any of these errors since 'versioning' should always be true
bucket_versioning = self._getBucketVersioning(bucket_name)
if bucket_versioning != versioning:
assert False, 'Cannot modify versioning on existing bucket'
elif bucket_versioning is None:
assert False, 'Cannot use a bucket with versioning suspended'
if bucketExisted:
logger.debug(f"Using pre-existing job store bucket '{bucket_name}'.")
else:
logger.debug(f"Created new job store bucket '{bucket_name}' with versioning state {versioning}.")

return bucket
return bucket

def _bindDomain(self, domain_name: str, create: bool = False, block: bool = True) -> None:
"""
Expand Down Expand Up @@ -1186,11 +1179,12 @@ def save(self):
Expected=expected)
# clean up the old version of the file if necessary and safe
if self.previousVersion and (self.previousVersion != self.version):
for attempt in retry_s3():
with attempt:
self.outer.s3_client.delete_object(Bucket=self.outer.files_bucket.name,
Key=compat_bytes(self.fileID),
VersionId=self.previousVersion)
delete_s3_object(
bucket=self.outer.files_bucket.name,
key=compat_bytes(self.fileID),
version=self.previousVersion,
region=self.outer.region
)
self._previousVersion = self._version
if numNewContentChunks < self._numContentChunks:
residualChunks = range(numNewContentChunks, self._numContentChunks)
Expand Down Expand Up @@ -1657,11 +1651,11 @@ def delete(self):
ItemName=compat_bytes(self.fileID),
Expected=expected)
if self.previousVersion:
for attempt in retry_s3():
with attempt:
store.s3_client.delete_object(Bucket=store.files_bucket.name,
Key=compat_bytes(self.fileID),
VersionId=self.previousVersion)
delete_s3_object(
bucket=store.files_bucket.name,
key=compat_bytes(self.fileID),
version=self.previousVersion
)

def getSize(self):
"""
Expand Down
51 changes: 51 additions & 0 deletions src/toil/lib/aws/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright (C) 2015-2024 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Dict, Any, Optional
from toil.lib.aws import session, AWSServerErrors
from toil.lib.retry import retry

logger = logging.getLogger(__name__)


@retry(errors=[AWSServerErrors])
def head_s3_object(bucket: str, key: str, header: Dict[str, Any], region: Optional[str] = None):
"""
Attempt to HEAD an s3 object and return its response.

:param bucket: AWS bucket name
:param key: AWS Key name for the s3 object
:param header: Headers to include (mostly for encryption).
See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/head_object.html
:param region: Region that we want to look for the bucket in
"""
s3_client = session.client("s3", region_name=region)
return s3_client.head_object(Bucket=bucket, Key=key, **header)


@retry(errors=[AWSServerErrors])
def delete_s3_object(bucket: str, key: str, version: Optional[str], region: Optional[str] = None):
"""
Attempt to DELETE an s3 object and return its response.

:param bucket: AWS bucket name
:param key: AWS Key name for the s3 object
:param version: The object's version ID, if it exists
:param region: Region that we want to look for the bucket in
"""
s3_client = session.client("s3", region_name=region)
if version:
return s3_client.delete_object(Bucket=bucket, Key=key, VersionId=version)
else:
return s3_client.delete_object(Bucket=bucket, Key=key)
Loading