diff --git a/.gitignore b/.gitignore index 5f657a012b9..64317deaec1 100644 --- a/.gitignore +++ b/.gitignore @@ -49,3 +49,4 @@ testdocs/scripts .env .venv +.aider* diff --git a/lambdas/pkgpush/CHANGELOG.md b/lambdas/pkgpush/CHANGELOG.md index 5c103fa7cb4..1251e5c50cf 100644 --- a/lambdas/pkgpush/CHANGELOG.md +++ b/lambdas/pkgpush/CHANGELOG.md @@ -16,6 +16,7 @@ where verb is one of ## Changes +- [Changed] Use per-region scratch buckets ([#3923](https://github.com/quiltdata/quilt/pull/3923)) - [Changed] Speed-up copying of large files during promotion ([#3884](https://github.com/quiltdata/quilt/pull/3884)) - [Changed] Bump quilt3 to set max_pool_connections, this improves performance ([#3870](https://github.com/quiltdata/quilt/pull/3870)) - [Changed] Compute multipart checksums ([#3402](https://github.com/quiltdata/quilt/pull/3402)) diff --git a/lambdas/pkgpush/requirements.txt b/lambdas/pkgpush/requirements.txt index 1f007ce3c8e..9e79344936c 100644 --- a/lambdas/pkgpush/requirements.txt +++ b/lambdas/pkgpush/requirements.txt @@ -61,7 +61,7 @@ python-dateutil==2.8.2 # via botocore pyyaml==6.0.1 # via quilt3 -quilt-shared[boto,pydantic,quilt] @ git+https://github.com/quiltdata/quilt@47055f7c5c0a93ddddfa5030a73b22a5d42b9c10#subdirectory=py-shared +quilt-shared[boto,pydantic,quilt] @ git+https://github.com/quiltdata/quilt@7c6edd14fbe8a26613bc26b1bbdc0b956132ef8c#subdirectory=py-shared # via t4_lambda_pkgpush (setup.py) quilt3 @ git+https://github.com/quiltdata/quilt@5c2b79128fe4d5d1e6093ff6a7d11d09d3315843#subdirectory=api/python # via diff --git a/lambdas/pkgpush/setup.py b/lambdas/pkgpush/setup.py index 69ef43a0064..602dc32791e 100644 --- a/lambdas/pkgpush/setup.py +++ b/lambdas/pkgpush/setup.py @@ -15,7 +15,7 @@ ), ( "quilt_shared[pydantic,boto,quilt] @ git+https://github.com/quiltdata/quilt@" - "47055f7c5c0a93ddddfa5030a73b22a5d42b9c10" + "7c6edd14fbe8a26613bc26b1bbdc0b956132ef8c" "#subdirectory=py-shared" ), ], diff --git a/lambdas/pkgpush/src/t4_lambda_pkgpush/__init__.py b/lambdas/pkgpush/src/t4_lambda_pkgpush/__init__.py index 5052e854e25..59d3093da65 100644 --- a/lambdas/pkgpush/src/t4_lambda_pkgpush/__init__.py +++ b/lambdas/pkgpush/src/t4_lambda_pkgpush/__init__.py @@ -37,8 +37,8 @@ ChecksumResult, CopyResult, PackageConstructEntry, + PackageConstructParams, PackagePromoteParams, - PackagePushParams, PackagePushResult, S3CopyLambdaParams, S3HashLambdaParams, @@ -111,11 +111,16 @@ def invoke_lambda(*, function_name: str, params: pydantic.BaseModel, err_prefix: return parsed["result"] -def invoke_hash_lambda(pk: PhysicalKey, credentials: AWSCredentials) -> Checksum: +def invoke_hash_lambda( + pk: PhysicalKey, + credentials: AWSCredentials, + scratch_buckets: T.Dict[str, str], +) -> Checksum: result = invoke_lambda( function_name=S3_HASH_LAMBDA, params=S3HashLambdaParams( credentials=credentials, + scratch_buckets=scratch_buckets, location=S3ObjectSource.from_pk(pk), ), err_prefix="S3HashLambda", @@ -126,11 +131,12 @@ def invoke_hash_lambda(pk: PhysicalKey, credentials: AWSCredentials) -> Checksum def calculate_pkg_entry_hash( pkg_entry: quilt3.packages.PackageEntry, credentials: AWSCredentials, + scratch_buckets: T.Dict[str, str], ): - pkg_entry.hash = invoke_hash_lambda(pkg_entry.physical_key, credentials).dict() + pkg_entry.hash = invoke_hash_lambda(pkg_entry.physical_key, credentials, scratch_buckets).dict() -def calculate_pkg_hashes(pkg: quilt3.Package): +def calculate_pkg_hashes(pkg: quilt3.Package, scratch_buckets: T.Dict[str, str]): entries = [] for lk, entry in pkg.walk(): if entry.hash is not None: @@ -155,7 +161,7 @@ def calculate_pkg_hashes(pkg: quilt3.Package): ) as pool: credentials = AWSCredentials.from_boto_session(user_boto_session) fs = [ - pool.submit(calculate_pkg_entry_hash, entry, credentials) + pool.submit(calculate_pkg_entry_hash, entry, credentials, scratch_buckets) for entry in entries ] for f in concurrent.futures.as_completed(fs): @@ -317,7 +323,7 @@ def _get_successor_params( def _push_pkg_to_successor( - params: PackagePushParams, + params: PackagePromoteParams, *, src_bucket: str, get_pkg: T.Callable[[S3PackageRegistryV1], quilt3.Package], @@ -436,7 +442,7 @@ def get_pkg(src_registry: S3PackageRegistryV1): logger=logger, ) def create_package(req_file: T.IO[bytes]) -> PackagePushResult: - params = PackagePushParams.parse_raw(next(req_file)) + params = PackageConstructParams.parse_raw(next(req_file)) registry_url = f"s3://{params.bucket}" try: package_registry = get_registry(registry_url) @@ -509,7 +515,7 @@ def create_package(req_file: T.IO[bytes]) -> PackagePushResult: except quilt3.util.QuiltException as qe: raise PkgpushException.from_quilt_exception(qe) - calculate_pkg_hashes(pkg) + calculate_pkg_hashes(pkg, params.scratch_buckets) try: top_hash = pkg._build( name=params.name, diff --git a/lambdas/pkgpush/test-requirements.in b/lambdas/pkgpush/test-requirements.in index 2cc2f800ecd..797cfec8179 100644 --- a/lambdas/pkgpush/test-requirements.in +++ b/lambdas/pkgpush/test-requirements.in @@ -1,3 +1,4 @@ -c requirements.txt pytest ~= 8.0 +pytest-mock ~= 3.14 pytest-subtests ~= 0.11 diff --git a/lambdas/pkgpush/test-requirements.txt b/lambdas/pkgpush/test-requirements.txt index cfeb92190f0..a2a1f43d6c9 100644 --- a/lambdas/pkgpush/test-requirements.txt +++ b/lambdas/pkgpush/test-requirements.txt @@ -19,7 +19,10 @@ pluggy==1.4.0 pytest==8.0.0 # via # -r test-requirements.in + # pytest-mock # pytest-subtests +pytest-mock==3.14.0 + # via -r test-requirements.in pytest-subtests==0.11.0 # via -r test-requirements.in tomli==2.0.1 diff --git a/lambdas/pkgpush/tests/conftest.py b/lambdas/pkgpush/tests/conftest.py index f404fcf2ad5..f8d4306c83a 100644 --- a/lambdas/pkgpush/tests/conftest.py +++ b/lambdas/pkgpush/tests/conftest.py @@ -1,25 +1,41 @@ import os +import pytest +from botocore.stub import Stubber + def pytest_configure(config): os.environ.update( - AWS_ACCESS_KEY_ID='foo', - AWS_SECRET_ACCESS_KEY='bar', - AWS_DEFAULT_REGION='us-east-1', - SERVICE_BUCKET='service-bucket', + AWS_ACCESS_KEY_ID="foo", + AWS_SECRET_ACCESS_KEY="bar", + AWS_DEFAULT_REGION="us-east-1", + SERVICE_BUCKET="service-bucket", **dict.fromkeys( ( - 'PROMOTE_PKG_MAX_MANIFEST_SIZE', - 'PROMOTE_PKG_MAX_PKG_SIZE', - 'PROMOTE_PKG_MAX_FILES', - 'MAX_BYTES_TO_HASH', - 'MAX_FILES_TO_HASH', - 'S3_HASH_LAMBDA_MAX_FILE_SIZE_BYTES', + "PROMOTE_PKG_MAX_MANIFEST_SIZE", + "PROMOTE_PKG_MAX_PKG_SIZE", + "PROMOTE_PKG_MAX_FILES", + "MAX_BYTES_TO_HASH", + "MAX_FILES_TO_HASH", + "S3_HASH_LAMBDA_MAX_FILE_SIZE_BYTES", ), - str(2 ** 64), # Value big enough to serve as 'unlimited'. + str(2**64), # Value big enough to serve as 'unlimited'. ), - S3_HASH_LAMBDA='s3-hash-lambda-name', - S3_COPY_LAMBDA='s3-copy-lambda-name', - S3_HASH_LAMBDA_CONCURRENCY='40', - S3_COPY_LAMBDA_CONCURRENCY='40', + S3_HASH_LAMBDA="s3-hash-lambda-name", + S3_COPY_LAMBDA="s3-copy-lambda-name", + S3_HASH_LAMBDA_CONCURRENCY="40", + S3_COPY_LAMBDA_CONCURRENCY="40", ) + + +@pytest.fixture +def lambda_stub(): + import t4_lambda_pkgpush + + stub = Stubber(t4_lambda_pkgpush.lambda_) + stub.activate() + try: + yield stub + stub.assert_no_pending_responses() + finally: + stub.deactivate() diff --git a/lambdas/pkgpush/tests/test_copy.py b/lambdas/pkgpush/tests/test_copy.py new file mode 100644 index 00000000000..d8297c129b0 --- /dev/null +++ b/lambdas/pkgpush/tests/test_copy.py @@ -0,0 +1,41 @@ +import io + +from botocore.stub import Stubber + +import t4_lambda_pkgpush +from quilt3.util import PhysicalKey +from quilt_shared.aws import AWSCredentials +from quilt_shared.types import NonEmptyStr + +CREDENTIALS = AWSCredentials( + key=NonEmptyStr("test_aws_access_key_id"), + secret=NonEmptyStr("test_aws_secret_access_key"), + token=NonEmptyStr("test_aws_session_token"), +) + + +def test_invoke_copy_lambda(lambda_stub: Stubber): + SRC_BUCKET = "src-bucket" + SRC_KEY = "src-key" + SRC_VERSION_ID = "src-version-id" + DST_BUCKET = "dst-bucket" + DST_KEY = "dst-key" + DST_VERSION_ID = "dst-version-id" + + lambda_stub.add_response( + "invoke", + { + "Payload": io.BytesIO( + b'{"result": {"version": "%s"}}' % DST_VERSION_ID.encode() + ) + }, + ) + + assert ( + t4_lambda_pkgpush.invoke_copy_lambda( + CREDENTIALS, + PhysicalKey(SRC_BUCKET, SRC_KEY, SRC_VERSION_ID), + PhysicalKey(DST_BUCKET, DST_KEY, None), + ) + == DST_VERSION_ID + ) diff --git a/lambdas/pkgpush/tests/test_hash_calc.py b/lambdas/pkgpush/tests/test_hash_calc.py new file mode 100644 index 00000000000..b251dfc0957 --- /dev/null +++ b/lambdas/pkgpush/tests/test_hash_calc.py @@ -0,0 +1,174 @@ +import io +import json + +import boto3 +import pytest +from botocore.stub import Stubber +from pytest_mock import MockerFixture + +import t4_lambda_pkgpush +from quilt3.packages import Package, PackageEntry +from quilt3.util import PhysicalKey +from quilt_shared.aws import AWSCredentials +from quilt_shared.pkgpush import Checksum, ChecksumType +from quilt_shared.types import NonEmptyStr + +CREDENTIALS = AWSCredentials( + key=NonEmptyStr("test_aws_access_key_id"), + secret=NonEmptyStr("test_aws_secret_access_key"), + token=NonEmptyStr("test_aws_session_token"), +) + +SCRATCH_BUCKETS = {"us-east-1": "test-scratch-bucket"} + + +@pytest.fixture +def entry_with_hash() -> PackageEntry: + return PackageEntry( + PhysicalKey("test-bucket", "with-hash", "with-hash"), + 42, + {"type": "SHA256", "value": "0" * 64}, + {}, + ) + + +@pytest.fixture +def entry_without_hash() -> PackageEntry: + return PackageEntry( + PhysicalKey("test-bucket", "without-hash", "without-hash"), + 42, + None, + {}, + ) + + +@pytest.fixture +def pkg(entry_with_hash: PackageEntry, entry_without_hash: PackageEntry) -> Package: + p = Package() + p.set("with-hash", entry_with_hash) + p.set("without-hash", entry_without_hash) + return p + + +def test_calculate_pkg_hashes( + pkg: Package, entry_without_hash: PackageEntry, mocker: MockerFixture +): + calculate_pkg_entry_hash_mock = mocker.patch.object( + t4_lambda_pkgpush, "calculate_pkg_entry_hash" + ) + session_mock = boto3.Session(**CREDENTIALS.boto_args) + + with t4_lambda_pkgpush.setup_user_boto_session(session_mock): + t4_lambda_pkgpush.calculate_pkg_hashes(pkg, SCRATCH_BUCKETS) + + calculate_pkg_entry_hash_mock.assert_called_once_with( + entry_without_hash, + CREDENTIALS, + SCRATCH_BUCKETS, + ) + + +def test_calculate_pkg_hashes_too_large_file_error(pkg: Package, mocker: MockerFixture): + mocker.patch.object(t4_lambda_pkgpush, "S3_HASH_LAMBDA_MAX_FILE_SIZE_BYTES", 1) + + with pytest.raises(t4_lambda_pkgpush.PkgpushException) as excinfo: + t4_lambda_pkgpush.calculate_pkg_hashes(pkg, SCRATCH_BUCKETS) + assert excinfo.value.name == "FileTooLargeForHashing" + + +def test_calculate_pkg_entry_hash( + entry_without_hash: PackageEntry, + mocker: MockerFixture, +): + invoke_hash_lambda_mock = mocker.patch( + "t4_lambda_pkgpush.invoke_hash_lambda", + return_value=Checksum(type=ChecksumType.SHA256_CHUNKED, value="base64hash"), + ) + + t4_lambda_pkgpush.calculate_pkg_entry_hash( + entry_without_hash, + CREDENTIALS, + SCRATCH_BUCKETS, + ) + + invoke_hash_lambda_mock.assert_called_once_with( + entry_without_hash.physical_key, + CREDENTIALS, + SCRATCH_BUCKETS, + ) + + assert entry_without_hash.hash == invoke_hash_lambda_mock.return_value.dict() + + +def test_invoke_hash_lambda(lambda_stub: Stubber): + checksum = {"type": "sha2-256-chunked", "value": "base64hash"} + pk = PhysicalKey(bucket="bucket", path="path", version_id="version-id") + + lambda_stub.add_response( + "invoke", + service_response={ + "Payload": io.BytesIO( + b'{"result": {"checksum": %s}}' % json.dumps(checksum).encode() + ), + }, + expected_params={ + "FunctionName": t4_lambda_pkgpush.S3_HASH_LAMBDA, + "Payload": json.dumps( + { + "credentials": { + "key": CREDENTIALS.key, + "secret": CREDENTIALS.secret, + "token": CREDENTIALS.token, + }, + "scratch_buckets": SCRATCH_BUCKETS, + "location": { + "bucket": pk.bucket, + "key": pk.path, + "version": pk.version_id, + }, + } + ), + }, + ) + + assert ( + t4_lambda_pkgpush.invoke_hash_lambda(pk, CREDENTIALS, SCRATCH_BUCKETS) + == checksum + ) + + +def test_invoke_hash_lambda_error(lambda_stub: Stubber): + pk = PhysicalKey(bucket="bucket", path="path", version_id="version-id") + + lambda_stub.add_response( + "invoke", + service_response={ + "FunctionError": "Unhandled", + "Payload": io.BytesIO( + b'{"errorMessage":"2024-02-02T14:33:39.754Z e0db9ea8-1329-44d5-a0dc-364ba2749b09' + b' Task timed out after 1.00 seconds"}' + ), + }, + expected_params={ + "FunctionName": t4_lambda_pkgpush.S3_HASH_LAMBDA, + "Payload": json.dumps( + { + "credentials": { + "key": CREDENTIALS.key, + "secret": CREDENTIALS.secret, + "token": CREDENTIALS.token, + }, + "scratch_buckets": SCRATCH_BUCKETS, + "location": { + "bucket": pk.bucket, + "key": pk.path, + "version": pk.version_id, + }, + } + ), + }, + ) + + with pytest.raises(t4_lambda_pkgpush.PkgpushException) as excinfo: + t4_lambda_pkgpush.invoke_hash_lambda(pk, CREDENTIALS, SCRATCH_BUCKETS) + assert excinfo.value.name == "S3HashLambdaUnhandledError" diff --git a/lambdas/pkgpush/tests/test_index.py b/lambdas/pkgpush/tests/test_index.py index 34a6ec20eb2..7f375999408 100644 --- a/lambdas/pkgpush/tests/test_index.py +++ b/lambdas/pkgpush/tests/test_index.py @@ -7,15 +7,14 @@ from unittest import mock import boto3 -import pytest from botocore.stub import Stubber -import quilt_shared.aws import t4_lambda_pkgpush from quilt3.backends import get_package_registry from quilt3.packages import Package, PackageEntry from quilt3.util import PhysicalKey -from quilt_shared.pkgpush import Checksum, ChecksumType +from quilt_shared.aws import AWSCredentials +from quilt_shared.types import NonEmptyStr def hash_data(data): @@ -25,12 +24,14 @@ def hash_data(data): calculate_sha256_patcher = functools.partial(mock.patch, 'quilt3.packages.calculate_sha256') -CREDENTIALS = quilt_shared.aws.AWSCredentials( - key='test_aws_access_key_id', - secret='test_aws_secret_access_key', - token='test_aws_session_token', +CREDENTIALS = AWSCredentials( + key=NonEmptyStr("test_aws_access_key_id"), + secret=NonEmptyStr("test_aws_secret_access_key"), + token=NonEmptyStr("test_aws_session_token"), ) +SCRATCH_BUCKETS = {"us-east-1": "test-scratch-bucket"} + class PackagePromoteTestBase(unittest.TestCase): credentials = CREDENTIALS @@ -140,7 +141,7 @@ def setUp(self): self.get_user_boto_session_mock = get_user_boto_session_patcher.start() self.addCleanup(get_user_boto_session_patcher.stop) - def calculate_pkg_hashes_side_effect(pkg): + def calculate_pkg_hashes_side_effect(pkg, scratch_buckets): for lk, entry in pkg.walk(): if entry.hash is None: entry.hash = { @@ -548,6 +549,7 @@ def setUpClass(cls): 'bucket': cls.dst_bucket, 'message': cls.dst_commit_message, 'user_meta': cls.meta, + 'scratch_buckets': SCRATCH_BUCKETS, } cls.gen_params = staticmethod(cls.make_params_factory(cls.params)) cls.gen_pkg_entry = staticmethod(cls.make_params_factory(cls.package_entry)) @@ -742,6 +744,7 @@ def test_invalid_entries_missing_required_props(self): 'name': 'user/atestpackage', 'bucket': self.dst_bucket, 'message': 'test package', + 'scratch_buckets': SCRATCH_BUCKETS, }, entry, ]) @@ -864,145 +867,3 @@ def test_create_package_no_browser_hash_no_meta(self): entry, ]) assert "result" in pkg_response - - -class HashCalculationTest(unittest.TestCase): - def setUp(self): - self.pkg = Package() - self.entry_with_hash = PackageEntry( - PhysicalKey('test-bucket', 'with-hash', 'with-hash'), - 42, - {'type': 'SHA256', 'value': '0' * 64}, - {}, - ) - self.entry_without_hash = PackageEntry( - PhysicalKey('test-bucket', 'without-hash', 'without-hash'), - 42, - None, - {}, - ) - self.pkg.set('with-hash', self.entry_with_hash) - self.pkg.set('without-hash', self.entry_without_hash) - - def test_calculate_pkg_hashes(self): - with mock.patch.object(t4_lambda_pkgpush, 'calculate_pkg_entry_hash') as calculate_pkg_entry_hash_mock: - session_mock = boto3.Session(**CREDENTIALS.boto_args) - with t4_lambda_pkgpush.setup_user_boto_session(session_mock): - t4_lambda_pkgpush.calculate_pkg_hashes(self.pkg) - - calculate_pkg_entry_hash_mock.assert_called_once_with(self.entry_without_hash, CREDENTIALS) - - @mock.patch.object(t4_lambda_pkgpush, 'S3_HASH_LAMBDA_MAX_FILE_SIZE_BYTES', 1) - def test_calculate_pkg_hashes_too_large_file_error(self): - with pytest.raises(t4_lambda_pkgpush.PkgpushException) as excinfo: - t4_lambda_pkgpush.calculate_pkg_hashes(self.pkg) - assert excinfo.value.name == "FileTooLargeForHashing" - - def test_calculate_pkg_entry_hash(self): - with mock.patch( - "t4_lambda_pkgpush.invoke_hash_lambda", - return_value=Checksum(type=ChecksumType.SHA256_CHUNKED, value="base64hash"), - ) as invoke_hash_lambda_mock: - t4_lambda_pkgpush.calculate_pkg_entry_hash(self.entry_without_hash, CREDENTIALS) - - invoke_hash_lambda_mock.assert_called_once_with(self.entry_without_hash.physical_key, CREDENTIALS) - - assert self.entry_without_hash.hash == invoke_hash_lambda_mock.return_value.dict() - - def test_invoke_hash_lambda(self): - lambda_client_stubber = Stubber(t4_lambda_pkgpush.lambda_) - lambda_client_stubber.activate() - self.addCleanup(lambda_client_stubber.deactivate) - checksum = {"type": "sha2-256-chunked", "value": "base64hash"} - pk = PhysicalKey(bucket="bucket", path="path", version_id="version-id") - - lambda_client_stubber.add_response( - 'invoke', - service_response={ - 'Payload': io.BytesIO( - b'{"result": {"checksum": %s}}' % json.dumps(checksum).encode() - ), - }, - expected_params={ - 'FunctionName': t4_lambda_pkgpush.S3_HASH_LAMBDA, - 'Payload': json.dumps({ - "credentials": { - "key": CREDENTIALS.key, - "secret": CREDENTIALS.secret, - "token": CREDENTIALS.token, - }, - "location": { - "bucket": pk.bucket, - "key": pk.path, - "version": pk.version_id, - } - }) - }, - ) - - assert t4_lambda_pkgpush.invoke_hash_lambda(pk, CREDENTIALS) == checksum - lambda_client_stubber.assert_no_pending_responses() - - def test_invoke_hash_lambda_error(self): - lambda_client_stubber = Stubber(t4_lambda_pkgpush.lambda_) - lambda_client_stubber.activate() - self.addCleanup(lambda_client_stubber.deactivate) - pk = PhysicalKey(bucket="bucket", path="path", version_id="version-id") - - lambda_client_stubber.add_response( - 'invoke', - service_response={ - 'FunctionError': 'Unhandled', - 'Payload': io.BytesIO( - b'{"errorMessage":"2024-02-02T14:33:39.754Z e0db9ea8-1329-44d5-a0dc-364ba2749b09' - b' Task timed out after 1.00 seconds"}' - ), - }, - expected_params={ - 'FunctionName': t4_lambda_pkgpush.S3_HASH_LAMBDA, - 'Payload': json.dumps({ - "credentials": { - "key": CREDENTIALS.key, - "secret": CREDENTIALS.secret, - "token": CREDENTIALS.token, - }, - "location": { - "bucket": pk.bucket, - "key": pk.path, - "version": pk.version_id, - } - }) - }, - ) - - with pytest.raises(t4_lambda_pkgpush.PkgpushException) as excinfo: - t4_lambda_pkgpush.invoke_hash_lambda(pk, CREDENTIALS) - assert excinfo.value.name == "S3HashLambdaUnhandledError" - lambda_client_stubber.assert_no_pending_responses() - - -def test_invoke_copy_lambda(): - SRC_BUCKET = "src-bucket" - SRC_KEY = "src-key" - SRC_VERSION_ID = "src-version-id" - DST_BUCKET = "dst-bucket" - DST_KEY = "dst-key" - DST_VERSION_ID = "dst-version-id" - - stubber = Stubber(t4_lambda_pkgpush.lambda_) - stubber.add_response( - "invoke", - { - "Payload": io.BytesIO(b'{"result": {"version": "%s"}}' % DST_VERSION_ID.encode()) - } - ) - stubber.activate() - try: - assert t4_lambda_pkgpush.invoke_copy_lambda( - CREDENTIALS, - PhysicalKey(SRC_BUCKET, SRC_KEY, SRC_VERSION_ID), - PhysicalKey(DST_BUCKET, DST_KEY, None), - ) == DST_VERSION_ID - stubber.assert_no_pending_responses() - finally: - stubber.deactivate() diff --git a/lambdas/s3hash/CHANGELOG.md b/lambdas/s3hash/CHANGELOG.md index ffcf8258709..64cfa5c0fe4 100644 --- a/lambdas/s3hash/CHANGELOG.md +++ b/lambdas/s3hash/CHANGELOG.md @@ -16,6 +16,7 @@ where verb is one of ## Changes +- [Changed] Use per-region scratch buckets ([#3923](https://github.com/quiltdata/quilt/pull/3923)) - [Changed] Always stream bytes in legacy mode ([#3903](https://github.com/quiltdata/quilt/pull/3903)) - [Changed] Compute chunked checksums, adhere to the spec ([#3889](https://github.com/quiltdata/quilt/pull/3889)) - [Added] Lambda handler for file copy ([#3884](https://github.com/quiltdata/quilt/pull/3884)) diff --git a/lambdas/s3hash/pytest.ini b/lambdas/s3hash/pytest.ini index 8c50524e46f..9337851a9d7 100644 --- a/lambdas/s3hash/pytest.ini +++ b/lambdas/s3hash/pytest.ini @@ -3,4 +3,3 @@ asyncio_mode=auto env = MPU_CONCURRENCY=1000 CHUNKED_CHECKSUMS=true - SERVICE_BUCKET=service-bucket diff --git a/lambdas/s3hash/requirements.txt b/lambdas/s3hash/requirements.txt index 082703ea023..4ec3edb6de3 100644 --- a/lambdas/s3hash/requirements.txt +++ b/lambdas/s3hash/requirements.txt @@ -83,7 +83,7 @@ python-dateutil==2.8.2 # via botocore pyyaml==6.0.1 # via quilt3 -quilt-shared[boto,pydantic,quilt] @ git+https://github.com/quiltdata/quilt@47055f7c5c0a93ddddfa5030a73b22a5d42b9c10#subdirectory=py-shared +quilt-shared[boto,pydantic,quilt] @ git+https://github.com/quiltdata/quilt@7c6edd14fbe8a26613bc26b1bbdc0b956132ef8c#subdirectory=py-shared # via t4_lambda_s3hash (setup.py) quilt3==5.4.0 # via quilt-shared diff --git a/lambdas/s3hash/setup.py b/lambdas/s3hash/setup.py index 693569d02d0..93d866c4d11 100644 --- a/lambdas/s3hash/setup.py +++ b/lambdas/s3hash/setup.py @@ -12,7 +12,7 @@ "types-aiobotocore[s3] ~= 2.11", ( "quilt_shared[pydantic,boto,quilt] @ git+https://github.com/quiltdata/quilt@" - "47055f7c5c0a93ddddfa5030a73b22a5d42b9c10" + "7c6edd14fbe8a26613bc26b1bbdc0b956132ef8c" "#subdirectory=py-shared" ), ], diff --git a/lambdas/s3hash/src/t4_lambda_s3hash/__init__.py b/lambdas/s3hash/src/t4_lambda_s3hash/__init__.py index 71d24ac89d4..7711564fb7b 100644 --- a/lambdas/s3hash/src/t4_lambda_s3hash/__init__.py +++ b/lambdas/s3hash/src/t4_lambda_s3hash/__init__.py @@ -34,9 +34,8 @@ MPU_CONCURRENCY = int(os.environ["MPU_CONCURRENCY"]) CHUNKED_CHECKSUMS = os.environ["CHUNKED_CHECKSUMS"] == "true" -SERVICE_BUCKET = os.environ["SERVICE_BUCKET"] -SCRATCH_KEY_SERVICE = "user-requests/checksum-upload-tmp" +SCRATCH_KEY = "user-requests/checksum-upload-tmp" # How much seconds before lambda is supposed to timeout we give up. SECONDS_TO_CLEANUP = 1 @@ -109,6 +108,33 @@ def get_part_size(file_size: int) -> T.Optional[int]: return part_size +async def get_bucket_region(bucket: str) -> str: + """ + Lookup the region for a given bucket. + """ + try: + resp = await S3.get().head_bucket(Bucket=bucket) + except botocore.exceptions.ClientError as e: + resp = e.response + if resp.get("Error", {}).get("Code") == "404": + raise + + assert "ResponseMetadata" in resp + return resp["ResponseMetadata"]["HTTPHeaders"]["x-amz-bucket-region"] + + +async def get_mpu_dst_for_location(location: S3ObjectSource, scratch_buckets: T.Dict[str, str]) -> S3ObjectDestination: + region = await get_bucket_region(location.bucket) + scratch_bucket = scratch_buckets.get(region) + if scratch_bucket is None: + raise LambdaError( + "ScratchBucketNotFound", + {"region": region, "bucket": location.bucket, "scratch_buckets": scratch_buckets}, + ) + + return S3ObjectDestination(bucket=scratch_bucket, key=SCRATCH_KEY) + + async def get_obj_attributes(location: S3ObjectSource) -> T.Optional[GetObjectAttributesOutputTypeDef]: try: return await S3.get().get_object_attributes( @@ -259,9 +285,6 @@ async def compute_part_checksums( return checksums -MPU_DST = S3ObjectDestination(bucket=SERVICE_BUCKET, key=SCRATCH_KEY_SERVICE) - - class PartUploadResult(pydantic.BaseModel): etag: str sha256: str # base64-encoded @@ -368,7 +391,7 @@ async def compute_checksum_legacy(location: S3ObjectSource) -> Checksum: return Checksum.sha256(hashobj.digest()) -async def compute_checksum(location: S3ObjectSource) -> ChecksumResult: +async def compute_checksum(location: S3ObjectSource, scratch_buckets: T.Dict[str, str]) -> ChecksumResult: obj_attrs = await get_obj_attributes(location) if obj_attrs: checksum = get_compliant_checksum(obj_attrs) @@ -389,7 +412,9 @@ async def compute_checksum(location: S3ObjectSource) -> ChecksumResult: part_defs = get_parts_for_size(total_size) - async with create_mpu(MPU_DST) as mpu: + mpu_dst = await get_mpu_dst_for_location(location, scratch_buckets) + + async with create_mpu(mpu_dst) as mpu: part_checksums = await compute_part_checksums( mpu, location, @@ -407,10 +432,11 @@ async def compute_checksum(location: S3ObjectSource) -> ChecksumResult: async def lambda_handler( *, credentials: AWSCredentials, + scratch_buckets: T.Dict[str, str], location: S3ObjectSource, ) -> ChecksumResult: async with aio_context(credentials): - return await compute_checksum(location) + return await compute_checksum(location, scratch_buckets) async def copy(location: S3ObjectSource, target: S3ObjectDestination) -> CopyResult: @@ -441,6 +467,6 @@ async def lambda_handler_copy( credentials: AWSCredentials, location: S3ObjectSource, target: S3ObjectDestination, -) -> str: +) -> CopyResult: async with aio_context(credentials): return await copy(location, target) diff --git a/lambdas/s3hash/tests/conftest.py b/lambdas/s3hash/tests/conftest.py new file mode 100644 index 00000000000..705ac556d72 --- /dev/null +++ b/lambdas/s3hash/tests/conftest.py @@ -0,0 +1,32 @@ +import asyncio + +import pytest +from botocore.stub import Stubber + +import t4_lambda_s3hash as s3hash + +AWS_CREDENTIALS = s3hash.AWSCredentials.parse_obj( + { + "key": "test-key", + "secret": "test-secret", + "token": "test-token", + } +) + + +# pytest's async fixtures don't propagate contextvars, so we have to set them manually in a sync fixture +@pytest.fixture +def s3_stub(): + async def _get_s3(): + async with s3hash.aio_context(AWS_CREDENTIALS): + return s3hash.S3.get() + + s3 = asyncio.run(_get_s3()) + stubber = Stubber(s3) + stubber.activate() + s3_token = s3hash.S3.set(s3) + try: + yield stubber + stubber.assert_no_pending_responses() + finally: + s3hash.S3.reset(s3_token) diff --git a/lambdas/s3hash/tests/test_compute_checksum.py b/lambdas/s3hash/tests/test_compute_checksum.py index ca0364b7449..6295ed809cc 100644 --- a/lambdas/s3hash/tests/test_compute_checksum.py +++ b/lambdas/s3hash/tests/test_compute_checksum.py @@ -1,4 +1,3 @@ -import asyncio import base64 import io @@ -32,14 +31,6 @@ def make_body(contents: bytes) -> StreamingBody: ) -AWS_CREDENTIALS = s3hash.AWSCredentials.parse_obj( - { - "key": "test-key", - "secret": "test-secret", - "token": "test-token", - } -) - LOC = s3hash.S3ObjectSource( bucket="test-bucket", key="test-key", @@ -52,30 +43,21 @@ def make_body(contents: bytes) -> StreamingBody: "ObjectAttributes": ["ETag", "Checksum", "ObjectParts", "ObjectSize"], } +REGION = "test-region" +SCRATCH_BUCKET = "test-scratch-bucket" +SCRATCH_BUCKETS = {REGION: SCRATCH_BUCKET} + +MPU_DST = s3hash.S3ObjectDestination( + bucket=SCRATCH_BUCKET, + key=s3hash.SCRATCH_KEY, +) + EXPECTED_MPU_PARAMS = { - **s3hash.MPU_DST.boto_args, + **MPU_DST.boto_args, "ChecksumAlgorithm": "SHA256", } -# pytest's async fixtures don't propagate contextvars, so we have to set them manually in a sync fixture -@pytest.fixture -def s3_stub(): - async def _get_s3(): - async with s3hash.aio_context(AWS_CREDENTIALS): - return s3hash.S3.get() - - s3 = asyncio.run(_get_s3()) - stubber = Stubber(s3) - stubber.activate() - s3_token = s3hash.S3.set(s3) - try: - yield stubber - stubber.assert_no_pending_responses() - finally: - s3hash.S3.reset(s3_token) - - async def test_compliant(s3_stub: Stubber): checksum = "MOFJVevxNSJm3C/4Bn5oEEYH51CrudOzZYK4r5Cfy1g=" checksum_hash = "WZ1xAz1wCsiSoOSPphsSXS9ZlBu0XaGQlETUPG7gurI=" @@ -89,7 +71,7 @@ async def test_compliant(s3_stub: Stubber): EXPECTED_GETATTR_PARAMS, ) - res = await s3hash.compute_checksum(LOC) + res = await s3hash.compute_checksum(LOC, SCRATCH_BUCKETS) assert res == s3hash.ChecksumResult(checksum=s3hash.Checksum.sha256_chunked(base64.b64decode(checksum_hash))) @@ -117,7 +99,7 @@ async def test_empty(chunked: bool, expected: s3hash.Checksum, s3_stub: Stubber, EXPECTED_GETATTR_PARAMS, ) - res = await s3hash.compute_checksum(LOC) + res = await s3hash.compute_checksum(LOC, SCRATCH_BUCKETS) assert res == s3hash.ChecksumResult(checksum=expected) @@ -147,7 +129,7 @@ async def test_empty_no_access(chunked: bool, expected: s3hash.Checksum, s3_stub LOC.boto_args, ) - res = await s3hash.compute_checksum(LOC) + res = await s3hash.compute_checksum(LOC, SCRATCH_BUCKETS) assert res == s3hash.ChecksumResult(checksum=expected) @@ -176,12 +158,24 @@ async def test_legacy(s3_stub: Stubber, mocker: MockerFixture): mocker.patch("t4_lambda_s3hash.CHUNKED_CHECKSUMS", False) - res = await s3hash.compute_checksum(LOC) + res = await s3hash.compute_checksum(LOC, SCRATCH_BUCKETS) checksum_hex = bytes.fromhex("d9d865cc54ec60678f1b119084ad79ae7f9357d1c4519c6457de3314b7fbba8a") assert res == s3hash.ChecksumResult(checksum=s3hash.Checksum.sha256(checksum_hex)) +def stub_bucket_region(s3_stub: Stubber): + s3_stub.add_response( + "head_bucket", + expected_params={"Bucket": LOC.bucket}, + service_response={ + "ResponseMetadata": { + "HTTPHeaders": {"x-amz-bucket-region": REGION}, + }, + }, + ) + + async def test_mpu_fail(s3_stub: Stubber): ETAG = "test-etag" SIZE = 1048576 @@ -191,6 +185,8 @@ async def test_mpu_fail(s3_stub: Stubber): EXPECTED_GETATTR_PARAMS, ) + stub_bucket_region(s3_stub) + s3_stub.add_client_error( "create_multipart_upload", "TestError", @@ -198,12 +194,12 @@ async def test_mpu_fail(s3_stub: Stubber): ) with pytest.raises(s3hash.LambdaError) as excinfo: - await s3hash.compute_checksum(LOC) + await s3hash.compute_checksum(LOC, SCRATCH_BUCKETS) assert excinfo.value.dict() == { "name": "MPUError", "context": { - "dst": s3hash.MPU_DST.dict(), + "dst": MPU_DST.dict(), "error": "An error occurred (TestError) when calling the CreateMultipartUpload operation: ", }, } @@ -219,6 +215,8 @@ async def test_mpu_single(s3_stub: Stubber): EXPECTED_GETATTR_PARAMS, ) + stub_bucket_region(s3_stub) + MPU_ID = "test-upload-id" s3_stub.add_response( "create_multipart_upload", @@ -237,7 +235,7 @@ async def test_mpu_single(s3_stub: Stubber): }, }, { - **s3hash.MPU_DST.boto_args, + **MPU_DST.boto_args, "UploadId": MPU_ID, "PartNumber": 1, "CopySource": LOC.boto_args, @@ -248,10 +246,10 @@ async def test_mpu_single(s3_stub: Stubber): s3_stub.add_response( "abort_multipart_upload", {}, - {**s3hash.MPU_DST.boto_args, "UploadId": MPU_ID}, + {**MPU_DST.boto_args, "UploadId": MPU_ID}, ) - res = await s3hash.compute_checksum(LOC) + res = await s3hash.compute_checksum(LOC, SCRATCH_BUCKETS) assert res == s3hash.ChecksumResult(checksum=s3hash.Checksum.sha256_chunked(CHECKSUM_HASH)) @@ -265,6 +263,8 @@ async def test_mpu_multi(s3_stub: Stubber): EXPECTED_GETATTR_PARAMS, ) + stub_bucket_region(s3_stub) + MPU_ID = "test-upload-id" s3_stub.add_response( "create_multipart_upload", @@ -284,7 +284,7 @@ async def test_mpu_multi(s3_stub: Stubber): }, }, { - **s3hash.MPU_DST.boto_args, + **MPU_DST.boto_args, "UploadId": MPU_ID, "PartNumber": 1, "CopySourceRange": "bytes=0-8388607", @@ -301,7 +301,7 @@ async def test_mpu_multi(s3_stub: Stubber): }, }, { - **s3hash.MPU_DST.boto_args, + **MPU_DST.boto_args, "UploadId": MPU_ID, "PartNumber": 2, "CopySourceRange": "bytes=8388608-8388608", @@ -313,10 +313,10 @@ async def test_mpu_multi(s3_stub: Stubber): s3_stub.add_response( "abort_multipart_upload", {}, - {**s3hash.MPU_DST.boto_args, "UploadId": MPU_ID}, + {**MPU_DST.boto_args, "UploadId": MPU_ID}, ) - res = await s3hash.compute_checksum(LOC) + res = await s3hash.compute_checksum(LOC, SCRATCH_BUCKETS) assert res == s3hash.ChecksumResult(checksum=s3hash.Checksum.sha256_chunked(CHECKSUM_TOP)) diff --git a/lambdas/s3hash/tests/test_mpu_dst.py b/lambdas/s3hash/tests/test_mpu_dst.py new file mode 100644 index 00000000000..5971f1ca4a9 --- /dev/null +++ b/lambdas/s3hash/tests/test_mpu_dst.py @@ -0,0 +1,87 @@ +import pytest +from botocore.stub import Stubber + +import t4_lambda_s3hash as s3hash + + +async def test_get_bucket_region_valid(s3_stub: Stubber): + expected_region = "us-west-2" + bucket_name = "test-bucket" + s3_stub.add_response( + method="head_bucket", + expected_params={"Bucket": bucket_name}, + service_response={ + "ResponseMetadata": { + "HTTPHeaders": {"x-amz-bucket-region": expected_region}, + }, + }, + ) + region = await s3hash.get_bucket_region(bucket_name) + assert region == expected_region + + +async def test_get_bucket_region_exist_error(s3_stub: Stubber): + bucket_name = "existent-bucket" + expected_region = "us-west-2" + s3_stub.add_client_error( + method="head_bucket", + service_error_code="403", + expected_params={"Bucket": bucket_name}, + service_message="Not Found", # we only care about the error code, not the message + response_meta={ + "HTTPHeaders": {"x-amz-bucket-region": expected_region}, + }, + ) + region = await s3hash.get_bucket_region(bucket_name) + assert region == expected_region + + +async def test_get_bucket_region_nonexist_error(s3_stub: Stubber): + bucket_name = "non-existent-bucket" + s3_stub.add_client_error( + method="head_bucket", + service_error_code="404", + expected_params={"Bucket": bucket_name}, + service_message="Not Found", + ) + with pytest.raises(Exception) as exc_info: + await s3hash.get_bucket_region(bucket_name) + assert "Not Found" in str(exc_info.value) + + +async def test_get_mpu_dst_for_location_valid(s3_stub: Stubber): + src_loc = s3hash.S3ObjectSource(bucket="source-bucket", key="source-key", version="source-version") + scratch_buckets = {"us-west-2": "scratch-bucket-us-west-2"} + expected_dst = s3hash.S3ObjectDestination(bucket="scratch-bucket-us-west-2", key=s3hash.SCRATCH_KEY) + + s3_stub.add_response( + method="head_bucket", + expected_params={"Bucket": src_loc.bucket}, + service_response={ + "ResponseMetadata": { + "HTTPHeaders": {"x-amz-bucket-region": "us-west-2"}, + }, + }, + ) + + dst = await s3hash.get_mpu_dst_for_location(src_loc, scratch_buckets) + assert dst == expected_dst + + +async def test_get_mpu_dst_for_location_no_scratch_bucket(s3_stub: Stubber): + src_loc = s3hash.S3ObjectSource(bucket="source-bucket", key="source-key", version="source-version") + scratch_buckets = {"us-east-1": "scratch-bucket-us-east-1"} + + s3_stub.add_response( + method="head_bucket", + expected_params={"Bucket": src_loc.bucket}, + service_response={ + "ResponseMetadata": { + "HTTPHeaders": {"x-amz-bucket-region": "us-west-2"}, + }, + }, + ) + + with pytest.raises(s3hash.LambdaError) as exc_info: + await s3hash.get_mpu_dst_for_location(src_loc, scratch_buckets) + assert "ScratchBucketNotFound" in str(exc_info.value) diff --git a/lambdas/s3hash/tests/test_wrapper.py b/lambdas/s3hash/tests/test_wrapper.py index 02aed282547..92cce9dd160 100644 --- a/lambdas/s3hash/tests/test_wrapper.py +++ b/lambdas/s3hash/tests/test_wrapper.py @@ -55,7 +55,7 @@ async def sleep(*_): # pylint: disable-next=missing-kwoa res = s3hash.lambda_handler( - {"credentials": AWS_CREDENTIALS, "location": S3_SRC}, + {"credentials": AWS_CREDENTIALS, "location": S3_SRC, "scratch_buckets": {}}, FakeContext(1001), ) @@ -82,7 +82,7 @@ def dict(self): # pylint: disable-next=missing-kwoa res = s3hash.lambda_handler( - {"credentials": AWS_CREDENTIALS, "location": S3_SRC}, + {"credentials": AWS_CREDENTIALS, "location": S3_SRC, "scratch_buckets": {"region": "bucket"}}, FakeContext(2000), ) @@ -94,6 +94,7 @@ def dict(self): key=S3_SRC["key"], version=S3_SRC["version"], ), + {"region": "bucket"}, ) aio_context_mock.assert_called_once_with(s3hash.AWSCredentials.parse_obj(AWS_CREDENTIALS))