Skip to content

Commit

Permalink
test_bucket_put_dir
Browse files Browse the repository at this point in the history
  • Loading branch information
drernie committed Dec 27, 2024
1 parent e4f6d96 commit bb636cb
Showing 1 changed file with 37 additions and 11 deletions.
48 changes: 37 additions & 11 deletions api/python/tests/integration/test_put_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,62 @@
from quilt3 import Bucket

DATA_DIR = pathlib.Path(__file__).parent / 'data'
TEST_BUCKET_NAME = "test-kms-policies"
TEST_BUCKET = f"s3://{TEST_BUCKET_NAME}"
TEST_BUCKET = "test-kms-policies"
S3_BUCKET = Bucket(f"s3://{TEST_BUCKET}")
TEST_FILE = "foo.txt"
TEST_SRC = f"{DATA_DIR / TEST_FILE}"

print(f"TEST_BUCKET: {TEST_BUCKET}")
print(f"TEST_BUCKET: {S3_BUCKET}")
print(f"TEST_SRC: {TEST_SRC}")


def dest_dir(test_name):
return f"test/{test_name}"


def dest_key(test_name):
return f"test/{test_name}/{TEST_FILE}"
return f"{dest_dir(test_name)}/{TEST_FILE}"


def test_bucket_put_file():
dest = dest_key("test_bucket_put_file")
print(f"DEST: {dest}")
bucket = Bucket(TEST_BUCKET)

with raises(ClientError,
match="An error occurred \(AccessDenied\) when calling "
match=r"An error occurred \(AccessDenied\) when calling "
"the PutObject operation:.*"):
bucket.put_file(dest, TEST_SRC)
S3_BUCKET.put_file(dest, TEST_SRC)

bucket.put_file(dest, TEST_SRC,
put_options={"ServerSideEncryption": "aws:kms"})
S3_BUCKET.put_file(dest, TEST_SRC, put_options={"ServerSideEncryption": "aws:kms"})

# Use boto3 to verify the object was uploaded with the correct encryption
s3 = boto3.client('s3')
response = s3.head_object(Bucket=TEST_BUCKET_NAME, Key=dest)
response = s3.head_object(Bucket=TEST_BUCKET, Key=dest)
assert response['ServerSideEncryption'] == 'aws:kms'

# Use boto3 to clean up
s3.delete_object(Bucket=TEST_BUCKET_NAME, Key=dest)
s3.delete_object(Bucket=TEST_BUCKET, Key=dest)


def test_bucket_put_dir():
dest = dest_dir("test_bucket_put_dir")
print(f"DEST: {dest}")

with raises(ClientError,
match=r"An error occurred \(AccessDenied\) when calling "
"the PutObject operation:.*"):
S3_BUCKET.put_dir(dest, DATA_DIR)

S3_BUCKET.put_dir(dest, DATA_DIR, put_options={"ServerSideEncryption": "aws:kms"})

# Use boto3 to verify the object was uploaded with the correct encryption
s3 = boto3.client('s3')
response = s3.head_object(Bucket=TEST_BUCKET,
Key=f"{dest}/{TEST_FILE}")
assert response['ServerSideEncryption'] == 'aws:kms'

# Use boto3 to remove directory and its contents
response = s3.list_objects_v2(Bucket=TEST_BUCKET, Prefix=dest)
for obj in response.get('Contents', []):
s3.delete_object(Bucket=TEST_BUCKET, Key=obj['Key'])
s3.delete_object(Bucket=TEST_BUCKET, Key=dest)

0 comments on commit bb636cb

Please sign in to comment.