Skip to content

Commit

Permalink
lambdas: cross-region chunksums support (use per-region scratch bucke…
Browse files Browse the repository at this point in the history
…ts) (#3923)

Co-authored-by: Sergey Fedoseev <[email protected]>
  • Loading branch information
nl0 and sir-sigurd authored Apr 8, 2024
1 parent 8b1881b commit a1cf658
Show file tree
Hide file tree
Showing 20 changed files with 480 additions and 230 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ testdocs/scripts
.env

.venv
.aider*
1 change: 1 addition & 0 deletions lambdas/pkgpush/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ where verb is one of

## Changes

- [Changed] Use per-region scratch buckets ([#3923](https://github.com/quiltdata/quilt/pull/3923))
- [Changed] Speed-up copying of large files during promotion ([#3884](https://github.com/quiltdata/quilt/pull/3884))
- [Changed] Bump quilt3 to set max_pool_connections, this improves performance ([#3870](https://github.com/quiltdata/quilt/pull/3870))
- [Changed] Compute multipart checksums ([#3402](https://github.com/quiltdata/quilt/pull/3402))
Expand Down
2 changes: 1 addition & 1 deletion lambdas/pkgpush/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ python-dateutil==2.8.2
# via botocore
pyyaml==6.0.1
# via quilt3
quilt-shared[boto,pydantic,quilt] @ git+https://github.com/quiltdata/quilt@47055f7c5c0a93ddddfa5030a73b22a5d42b9c10#subdirectory=py-shared
quilt-shared[boto,pydantic,quilt] @ git+https://github.com/quiltdata/quilt@7c6edd14fbe8a26613bc26b1bbdc0b956132ef8c#subdirectory=py-shared
# via t4_lambda_pkgpush (setup.py)
quilt3 @ git+https://github.com/quiltdata/quilt@5c2b79128fe4d5d1e6093ff6a7d11d09d3315843#subdirectory=api/python
# via
Expand Down
2 changes: 1 addition & 1 deletion lambdas/pkgpush/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
),
(
"quilt_shared[pydantic,boto,quilt] @ git+https://github.com/quiltdata/quilt@"
"47055f7c5c0a93ddddfa5030a73b22a5d42b9c10"
"7c6edd14fbe8a26613bc26b1bbdc0b956132ef8c"
"#subdirectory=py-shared"
),
],
Expand Down
22 changes: 14 additions & 8 deletions lambdas/pkgpush/src/t4_lambda_pkgpush/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
ChecksumResult,
CopyResult,
PackageConstructEntry,
PackageConstructParams,
PackagePromoteParams,
PackagePushParams,
PackagePushResult,
S3CopyLambdaParams,
S3HashLambdaParams,
Expand Down Expand Up @@ -111,11 +111,16 @@ def invoke_lambda(*, function_name: str, params: pydantic.BaseModel, err_prefix:
return parsed["result"]


def invoke_hash_lambda(pk: PhysicalKey, credentials: AWSCredentials) -> Checksum:
def invoke_hash_lambda(
pk: PhysicalKey,
credentials: AWSCredentials,
scratch_buckets: T.Dict[str, str],
) -> Checksum:
result = invoke_lambda(
function_name=S3_HASH_LAMBDA,
params=S3HashLambdaParams(
credentials=credentials,
scratch_buckets=scratch_buckets,
location=S3ObjectSource.from_pk(pk),
),
err_prefix="S3HashLambda",
Expand All @@ -126,11 +131,12 @@ def invoke_hash_lambda(pk: PhysicalKey, credentials: AWSCredentials) -> Checksum
def calculate_pkg_entry_hash(
pkg_entry: quilt3.packages.PackageEntry,
credentials: AWSCredentials,
scratch_buckets: T.Dict[str, str],
):
pkg_entry.hash = invoke_hash_lambda(pkg_entry.physical_key, credentials).dict()
pkg_entry.hash = invoke_hash_lambda(pkg_entry.physical_key, credentials, scratch_buckets).dict()


def calculate_pkg_hashes(pkg: quilt3.Package):
def calculate_pkg_hashes(pkg: quilt3.Package, scratch_buckets: T.Dict[str, str]):
entries = []
for lk, entry in pkg.walk():
if entry.hash is not None:
Expand All @@ -155,7 +161,7 @@ def calculate_pkg_hashes(pkg: quilt3.Package):
) as pool:
credentials = AWSCredentials.from_boto_session(user_boto_session)
fs = [
pool.submit(calculate_pkg_entry_hash, entry, credentials)
pool.submit(calculate_pkg_entry_hash, entry, credentials, scratch_buckets)
for entry in entries
]
for f in concurrent.futures.as_completed(fs):
Expand Down Expand Up @@ -317,7 +323,7 @@ def _get_successor_params(


def _push_pkg_to_successor(
params: PackagePushParams,
params: PackagePromoteParams,
*,
src_bucket: str,
get_pkg: T.Callable[[S3PackageRegistryV1], quilt3.Package],
Expand Down Expand Up @@ -436,7 +442,7 @@ def get_pkg(src_registry: S3PackageRegistryV1):
logger=logger,
)
def create_package(req_file: T.IO[bytes]) -> PackagePushResult:
params = PackagePushParams.parse_raw(next(req_file))
params = PackageConstructParams.parse_raw(next(req_file))
registry_url = f"s3://{params.bucket}"
try:
package_registry = get_registry(registry_url)
Expand Down Expand Up @@ -509,7 +515,7 @@ def create_package(req_file: T.IO[bytes]) -> PackagePushResult:
except quilt3.util.QuiltException as qe:
raise PkgpushException.from_quilt_exception(qe)

calculate_pkg_hashes(pkg)
calculate_pkg_hashes(pkg, params.scratch_buckets)
try:
top_hash = pkg._build(
name=params.name,
Expand Down
1 change: 1 addition & 0 deletions lambdas/pkgpush/test-requirements.in
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
-c requirements.txt
pytest ~= 8.0
pytest-mock ~= 3.14
pytest-subtests ~= 0.11
3 changes: 3 additions & 0 deletions lambdas/pkgpush/test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ pluggy==1.4.0
pytest==8.0.0
# via
# -r test-requirements.in
# pytest-mock
# pytest-subtests
pytest-mock==3.14.0
# via -r test-requirements.in
pytest-subtests==0.11.0
# via -r test-requirements.in
tomli==2.0.1
Expand Down
46 changes: 31 additions & 15 deletions lambdas/pkgpush/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,41 @@
import os

import pytest
from botocore.stub import Stubber


def pytest_configure(config):
os.environ.update(
AWS_ACCESS_KEY_ID='foo',
AWS_SECRET_ACCESS_KEY='bar',
AWS_DEFAULT_REGION='us-east-1',
SERVICE_BUCKET='service-bucket',
AWS_ACCESS_KEY_ID="foo",
AWS_SECRET_ACCESS_KEY="bar",
AWS_DEFAULT_REGION="us-east-1",
SERVICE_BUCKET="service-bucket",
**dict.fromkeys(
(
'PROMOTE_PKG_MAX_MANIFEST_SIZE',
'PROMOTE_PKG_MAX_PKG_SIZE',
'PROMOTE_PKG_MAX_FILES',
'MAX_BYTES_TO_HASH',
'MAX_FILES_TO_HASH',
'S3_HASH_LAMBDA_MAX_FILE_SIZE_BYTES',
"PROMOTE_PKG_MAX_MANIFEST_SIZE",
"PROMOTE_PKG_MAX_PKG_SIZE",
"PROMOTE_PKG_MAX_FILES",
"MAX_BYTES_TO_HASH",
"MAX_FILES_TO_HASH",
"S3_HASH_LAMBDA_MAX_FILE_SIZE_BYTES",
),
str(2 ** 64), # Value big enough to serve as 'unlimited'.
str(2**64), # Value big enough to serve as 'unlimited'.
),
S3_HASH_LAMBDA='s3-hash-lambda-name',
S3_COPY_LAMBDA='s3-copy-lambda-name',
S3_HASH_LAMBDA_CONCURRENCY='40',
S3_COPY_LAMBDA_CONCURRENCY='40',
S3_HASH_LAMBDA="s3-hash-lambda-name",
S3_COPY_LAMBDA="s3-copy-lambda-name",
S3_HASH_LAMBDA_CONCURRENCY="40",
S3_COPY_LAMBDA_CONCURRENCY="40",
)


@pytest.fixture
def lambda_stub():
import t4_lambda_pkgpush

stub = Stubber(t4_lambda_pkgpush.lambda_)
stub.activate()
try:
yield stub
stub.assert_no_pending_responses()
finally:
stub.deactivate()
41 changes: 41 additions & 0 deletions lambdas/pkgpush/tests/test_copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import io

from botocore.stub import Stubber

import t4_lambda_pkgpush
from quilt3.util import PhysicalKey
from quilt_shared.aws import AWSCredentials
from quilt_shared.types import NonEmptyStr

CREDENTIALS = AWSCredentials(
key=NonEmptyStr("test_aws_access_key_id"),
secret=NonEmptyStr("test_aws_secret_access_key"),
token=NonEmptyStr("test_aws_session_token"),
)


def test_invoke_copy_lambda(lambda_stub: Stubber):
SRC_BUCKET = "src-bucket"
SRC_KEY = "src-key"
SRC_VERSION_ID = "src-version-id"
DST_BUCKET = "dst-bucket"
DST_KEY = "dst-key"
DST_VERSION_ID = "dst-version-id"

lambda_stub.add_response(
"invoke",
{
"Payload": io.BytesIO(
b'{"result": {"version": "%s"}}' % DST_VERSION_ID.encode()
)
},
)

assert (
t4_lambda_pkgpush.invoke_copy_lambda(
CREDENTIALS,
PhysicalKey(SRC_BUCKET, SRC_KEY, SRC_VERSION_ID),
PhysicalKey(DST_BUCKET, DST_KEY, None),
)
== DST_VERSION_ID
)
174 changes: 174 additions & 0 deletions lambdas/pkgpush/tests/test_hash_calc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import io
import json

import boto3
import pytest
from botocore.stub import Stubber
from pytest_mock import MockerFixture

import t4_lambda_pkgpush
from quilt3.packages import Package, PackageEntry
from quilt3.util import PhysicalKey
from quilt_shared.aws import AWSCredentials
from quilt_shared.pkgpush import Checksum, ChecksumType
from quilt_shared.types import NonEmptyStr

CREDENTIALS = AWSCredentials(
key=NonEmptyStr("test_aws_access_key_id"),
secret=NonEmptyStr("test_aws_secret_access_key"),
token=NonEmptyStr("test_aws_session_token"),
)

SCRATCH_BUCKETS = {"us-east-1": "test-scratch-bucket"}


@pytest.fixture
def entry_with_hash() -> PackageEntry:
return PackageEntry(
PhysicalKey("test-bucket", "with-hash", "with-hash"),
42,
{"type": "SHA256", "value": "0" * 64},
{},
)


@pytest.fixture
def entry_without_hash() -> PackageEntry:
return PackageEntry(
PhysicalKey("test-bucket", "without-hash", "without-hash"),
42,
None,
{},
)


@pytest.fixture
def pkg(entry_with_hash: PackageEntry, entry_without_hash: PackageEntry) -> Package:
p = Package()
p.set("with-hash", entry_with_hash)
p.set("without-hash", entry_without_hash)
return p


def test_calculate_pkg_hashes(
pkg: Package, entry_without_hash: PackageEntry, mocker: MockerFixture
):
calculate_pkg_entry_hash_mock = mocker.patch.object(
t4_lambda_pkgpush, "calculate_pkg_entry_hash"
)
session_mock = boto3.Session(**CREDENTIALS.boto_args)

with t4_lambda_pkgpush.setup_user_boto_session(session_mock):
t4_lambda_pkgpush.calculate_pkg_hashes(pkg, SCRATCH_BUCKETS)

calculate_pkg_entry_hash_mock.assert_called_once_with(
entry_without_hash,
CREDENTIALS,
SCRATCH_BUCKETS,
)


def test_calculate_pkg_hashes_too_large_file_error(pkg: Package, mocker: MockerFixture):
mocker.patch.object(t4_lambda_pkgpush, "S3_HASH_LAMBDA_MAX_FILE_SIZE_BYTES", 1)

with pytest.raises(t4_lambda_pkgpush.PkgpushException) as excinfo:
t4_lambda_pkgpush.calculate_pkg_hashes(pkg, SCRATCH_BUCKETS)
assert excinfo.value.name == "FileTooLargeForHashing"


def test_calculate_pkg_entry_hash(
entry_without_hash: PackageEntry,
mocker: MockerFixture,
):
invoke_hash_lambda_mock = mocker.patch(
"t4_lambda_pkgpush.invoke_hash_lambda",
return_value=Checksum(type=ChecksumType.SHA256_CHUNKED, value="base64hash"),
)

t4_lambda_pkgpush.calculate_pkg_entry_hash(
entry_without_hash,
CREDENTIALS,
SCRATCH_BUCKETS,
)

invoke_hash_lambda_mock.assert_called_once_with(
entry_without_hash.physical_key,
CREDENTIALS,
SCRATCH_BUCKETS,
)

assert entry_without_hash.hash == invoke_hash_lambda_mock.return_value.dict()


def test_invoke_hash_lambda(lambda_stub: Stubber):
checksum = {"type": "sha2-256-chunked", "value": "base64hash"}
pk = PhysicalKey(bucket="bucket", path="path", version_id="version-id")

lambda_stub.add_response(
"invoke",
service_response={
"Payload": io.BytesIO(
b'{"result": {"checksum": %s}}' % json.dumps(checksum).encode()
),
},
expected_params={
"FunctionName": t4_lambda_pkgpush.S3_HASH_LAMBDA,
"Payload": json.dumps(
{
"credentials": {
"key": CREDENTIALS.key,
"secret": CREDENTIALS.secret,
"token": CREDENTIALS.token,
},
"scratch_buckets": SCRATCH_BUCKETS,
"location": {
"bucket": pk.bucket,
"key": pk.path,
"version": pk.version_id,
},
}
),
},
)

assert (
t4_lambda_pkgpush.invoke_hash_lambda(pk, CREDENTIALS, SCRATCH_BUCKETS)
== checksum
)


def test_invoke_hash_lambda_error(lambda_stub: Stubber):
pk = PhysicalKey(bucket="bucket", path="path", version_id="version-id")

lambda_stub.add_response(
"invoke",
service_response={
"FunctionError": "Unhandled",
"Payload": io.BytesIO(
b'{"errorMessage":"2024-02-02T14:33:39.754Z e0db9ea8-1329-44d5-a0dc-364ba2749b09'
b' Task timed out after 1.00 seconds"}'
),
},
expected_params={
"FunctionName": t4_lambda_pkgpush.S3_HASH_LAMBDA,
"Payload": json.dumps(
{
"credentials": {
"key": CREDENTIALS.key,
"secret": CREDENTIALS.secret,
"token": CREDENTIALS.token,
},
"scratch_buckets": SCRATCH_BUCKETS,
"location": {
"bucket": pk.bucket,
"key": pk.path,
"version": pk.version_id,
},
}
),
},
)

with pytest.raises(t4_lambda_pkgpush.PkgpushException) as excinfo:
t4_lambda_pkgpush.invoke_hash_lambda(pk, CREDENTIALS, SCRATCH_BUCKETS)
assert excinfo.value.name == "S3HashLambdaUnhandledError"
Loading

0 comments on commit a1cf658

Please sign in to comment.