Skip to content

Commit

Permalink
Lambdas: chunked checksums, adhere to the spec (#3889)
Browse files Browse the repository at this point in the history
  • Loading branch information
nl0 authored Feb 23, 2024
1 parent 57095a7 commit a2c5260
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 55 deletions.
2 changes: 1 addition & 1 deletion lambdas/pkgpush/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ python-dateutil==2.8.2
# via botocore
pyyaml==6.0.1
# via quilt3
quilt-shared[boto,pydantic,quilt] @ git+https://github.com/quiltdata/quilt@7a82dfcd869035c5e71a5cc7cd912af35d72515c#subdirectory=py-shared
quilt-shared[boto,pydantic,quilt] @ git+https://github.com/quiltdata/quilt@47055f7c5c0a93ddddfa5030a73b22a5d42b9c10#subdirectory=py-shared
# via t4_lambda_pkgpush (setup.py)
quilt3 @ git+https://github.com/quiltdata/quilt@5c2b79128fe4d5d1e6093ff6a7d11d09d3315843#subdirectory=api/python
# via
Expand Down
2 changes: 1 addition & 1 deletion lambdas/pkgpush/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
),
(
"quilt_shared[pydantic,boto,quilt] @ git+https://github.com/quiltdata/quilt@"
"7a82dfcd869035c5e71a5cc7cd912af35d72515c"
"47055f7c5c0a93ddddfa5030a73b22a5d42b9c10"
"#subdirectory=py-shared"
),
],
Expand Down
4 changes: 2 additions & 2 deletions lambdas/pkgpush/tests/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ def test_calculate_pkg_hashes_too_large_file_error(self):
def test_calculate_pkg_entry_hash(self):
with mock.patch(
"t4_lambda_pkgpush.invoke_hash_lambda",
return_value=Checksum(type=ChecksumType.SP, value='0' * 64),
return_value=Checksum(type=ChecksumType.SHA256_CHUNKED, value="base64hash"),
) as invoke_hash_lambda_mock:
t4_lambda_pkgpush.calculate_pkg_entry_hash(self.entry_without_hash, CREDENTIALS)

Expand All @@ -913,7 +913,7 @@ def test_invoke_hash_lambda(self):
lambda_client_stubber = Stubber(t4_lambda_pkgpush.lambda_)
lambda_client_stubber.activate()
self.addCleanup(lambda_client_stubber.deactivate)
checksum = {"type": "SHA256", "value": "0" * 64}
checksum = {"type": "sha2-256-chunked", "value": "base64hash"}
pk = PhysicalKey(bucket="bucket", path="path", version_id="version-id")

lambda_client_stubber.add_response(
Expand Down
1 change: 1 addition & 0 deletions lambdas/s3hash/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ where verb is one of

## Changes

- [Changed] Compute chunked checksums, adhere to the spec ([#3889](https://github.com/quiltdata/quilt/pull/3889))
- [Added] Lambda handler for file copy ([#3884](https://github.com/quiltdata/quilt/pull/3884))
- [Changed] Compute multipart checksums ([#3402](https://github.com/quiltdata/quilt/pull/3402))
- [Added] Bootstrap the change log ([#3402](https://github.com/quiltdata/quilt/pull/3402))
2 changes: 1 addition & 1 deletion lambdas/s3hash/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
asyncio_mode=auto
env =
MPU_CONCURRENCY=1000
MULTIPART_CHECKSUMS=true
CHUNKED_CHECKSUMS=true
SERVICE_BUCKET=service-bucket
2 changes: 1 addition & 1 deletion lambdas/s3hash/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ python-dateutil==2.8.2
# via botocore
pyyaml==6.0.1
# via quilt3
quilt-shared[boto,pydantic,quilt] @ git+https://github.com/quiltdata/quilt@7a82dfcd869035c5e71a5cc7cd912af35d72515c#subdirectory=py-shared
quilt-shared[boto,pydantic,quilt] @ git+https://github.com/quiltdata/quilt@47055f7c5c0a93ddddfa5030a73b22a5d42b9c10#subdirectory=py-shared
# via t4_lambda_s3hash (setup.py)
quilt3==5.4.0
# via quilt-shared
Expand Down
2 changes: 1 addition & 1 deletion lambdas/s3hash/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"types-aiobotocore[s3] ~= 2.11",
(
"quilt_shared[pydantic,boto,quilt] @ git+https://github.com/quiltdata/quilt@"
"7a82dfcd869035c5e71a5cc7cd912af35d72515c"
"47055f7c5c0a93ddddfa5030a73b22a5d42b9c10"
"#subdirectory=py-shared"
),
],
Expand Down
49 changes: 26 additions & 23 deletions lambdas/s3hash/src/t4_lambda_s3hash/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
logger.setLevel(os.environ.get("QUILT_LOG_LEVEL", "WARNING"))

MPU_CONCURRENCY = int(os.environ["MPU_CONCURRENCY"])
MULTIPART_CHECKSUMS = os.environ["MULTIPART_CHECKSUMS"] == "true"
CHUNKED_CHECKSUMS = os.environ["CHUNKED_CHECKSUMS"] == "true"
SERVICE_BUCKET = os.environ["SERVICE_BUCKET"]

SCRATCH_KEY_SERVICE = "user-requests/checksum-upload-tmp"
Expand Down Expand Up @@ -66,21 +66,22 @@ async def aio_context(credentials: AWSCredentials):

class Checksum(ChecksumBase):
@classmethod
def singlepart(cls, value: bytes):
return cls(value=value.hex(), type=ChecksumType.SP)
def sha256(cls, value: bytes):
return cls(value=value.hex(), type=ChecksumType.SHA256)

@classmethod
def multipart(cls, parts: T.Sequence[bytes]):
hash_list = hash_parts(parts)
b64 = base64.b64encode(hash_list).decode()
value = f"{b64}-{len(parts)}"
return cls(value=value, type=ChecksumType.MP)
def sha256_chunked(cls, value: bytes):
return cls(value=base64.b64encode(value).decode(), type=ChecksumType.SHA256_CHUNKED)

@classmethod
def for_parts(cls, checksums: T.Sequence[bytes], defs: T.Sequence[PartDef]):
if defs == PARTS_SINGLE:
return cls.singlepart(checksums[0])
return cls.multipart(checksums)
def for_parts(cls, checksums: T.Sequence[bytes]):
return cls.sha256_chunked(hash_parts(checksums))

_EMPTY_HASH = hashlib.sha256().digest()

@classmethod
def empty(cls):
return cls.sha256_chunked(cls._EMPTY_HASH) if CHUNKED_CHECKSUMS else cls.sha256(cls._EMPTY_HASH)


# 8 MiB -- boto3 default:
Expand Down Expand Up @@ -124,12 +125,12 @@ async def get_obj_attributes(location: S3ObjectSource) -> T.Optional[GetObjectAt

def get_compliant_checksum(attrs: GetObjectAttributesOutputTypeDef) -> T.Optional[Checksum]:
checksum_value = attrs.get("Checksum", {}).get("ChecksumSHA256")
if checksum_value is None:
if checksum_value is None or attrs["ObjectSize"] == 0:
return None

part_size = get_part_size(attrs["ObjectSize"])
object_parts = attrs.get("ObjectParts")
if not MULTIPART_CHECKSUMS or part_size is None:
if not CHUNKED_CHECKSUMS or part_size is None:
if object_parts is not None:
assert "TotalPartsCount" in object_parts
if object_parts["TotalPartsCount"] != 1:
Expand All @@ -138,7 +139,9 @@ def get_compliant_checksum(attrs: GetObjectAttributesOutputTypeDef) -> T.Optiona
assert "ChecksumSHA256" in object_parts["Parts"][0]
checksum_value = object_parts["Parts"][0]["ChecksumSHA256"]

return Checksum.singlepart(base64.b64decode(checksum_value))
checksum_bytes = base64.b64decode(checksum_value)

return Checksum.for_parts([checksum_bytes]) if CHUNKED_CHECKSUMS else Checksum.sha256(checksum_bytes)

if object_parts is None:
return None
Expand All @@ -148,10 +151,7 @@ def get_compliant_checksum(attrs: GetObjectAttributesOutputTypeDef) -> T.Optiona
# Make sure we have _all_ parts.
assert len(object_parts["Parts"]) == num_parts
if all(part.get("Size") == part_size for part in object_parts["Parts"][:-1]):
return Checksum(
type=ChecksumType.MP,
value=f"{checksum_value}-{num_parts}",
)
return Checksum.sha256_chunked(base64.b64decode(checksum_value))

return None

Expand Down Expand Up @@ -365,7 +365,7 @@ async def compute_checksum_legacy(location: S3ObjectSource) -> Checksum:
async for chunk in stream.content.iter_any():
hashobj.update(chunk)

return Checksum.singlepart(hashobj.digest())
return Checksum.sha256(hashobj.digest())


async def compute_checksum(location: S3ObjectSource) -> ChecksumResult:
Expand All @@ -380,11 +380,14 @@ async def compute_checksum(location: S3ObjectSource) -> ChecksumResult:
resp = await S3.get().head_object(**location.boto_args)
etag, total_size = resp["ETag"], resp["ContentLength"]

if not MULTIPART_CHECKSUMS and total_size > MAX_PART_SIZE:
if total_size == 0:
return ChecksumResult(checksum=Checksum.empty())

if not CHUNKED_CHECKSUMS and total_size > MAX_PART_SIZE:
checksum = await compute_checksum_legacy(location)
return ChecksumResult(checksum=checksum)

part_defs = get_parts_for_size(total_size) if MULTIPART_CHECKSUMS else PARTS_SINGLE
part_defs = get_parts_for_size(total_size) if CHUNKED_CHECKSUMS else PARTS_SINGLE

async with create_mpu(MPU_DST) as mpu:
part_checksums = await compute_part_checksums(
Expand All @@ -394,7 +397,7 @@ async def compute_checksum(location: S3ObjectSource) -> ChecksumResult:
part_defs,
)

checksum = Checksum.for_parts(part_checksums, part_defs)
checksum = Checksum.for_parts(part_checksums) if CHUNKED_CHECKSUMS else Checksum.sha256(part_checksums[0])
return ChecksumResult(checksum=checksum)


Expand Down
75 changes: 68 additions & 7 deletions lambdas/s3hash/tests/test_compute_checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,78 @@ async def _get_s3():

async def test_compliant(s3_stub: Stubber):
checksum = "MOFJVevxNSJm3C/4Bn5oEEYH51CrudOzZYK4r5Cfy1g="
checksum_hash = "WZ1xAz1wCsiSoOSPphsSXS9ZlBu0XaGQlETUPG7gurI="

s3_stub.add_response(
"get_object_attributes",
{
"Checksum": {"ChecksumSHA256": checksum},
"ObjectSize": 1048576,
"ObjectSize": 1048576, # below the threshold
},
EXPECTED_GETATTR_PARAMS,
)

res = await s3hash.compute_checksum(LOC)

assert res == s3hash.ChecksumResult(checksum=s3hash.Checksum.singlepart(base64.b64decode(checksum)))
assert res == s3hash.ChecksumResult(checksum=s3hash.Checksum.sha256_chunked(base64.b64decode(checksum_hash)))


SHA256_EMPTY = bytes.fromhex("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")


@pytest.mark.parametrize(
"chunked, expected",
[
(True, s3hash.Checksum.sha256_chunked(SHA256_EMPTY)),
(False, s3hash.Checksum.sha256(SHA256_EMPTY)),
],
)
async def test_empty(chunked: bool, expected: s3hash.Checksum, s3_stub: Stubber, mocker: MockerFixture):
mocker.patch("t4_lambda_s3hash.CHUNKED_CHECKSUMS", chunked)

s3_stub.add_response(
"get_object_attributes",
{
"Checksum": {"ChecksumSHA256": "doesnt matter"},
"ObjectSize": 0,
"ETag": "any",
},
EXPECTED_GETATTR_PARAMS,
)

res = await s3hash.compute_checksum(LOC)

assert res == s3hash.ChecksumResult(checksum=expected)


@pytest.mark.parametrize(
"chunked, expected",
[
(True, s3hash.Checksum.sha256_chunked(SHA256_EMPTY)),
(False, s3hash.Checksum.sha256(SHA256_EMPTY)),
],
)
async def test_empty_no_access(chunked: bool, expected: s3hash.Checksum, s3_stub: Stubber, mocker: MockerFixture):
mocker.patch("t4_lambda_s3hash.CHUNKED_CHECKSUMS", chunked)

s3_stub.add_client_error(
"get_object_attributes",
service_error_code="AccessDenied",
expected_params=EXPECTED_GETATTR_PARAMS,
)

s3_stub.add_response(
"head_object",
{
"ETag": '"test-etag"',
"ContentLength": 0,
},
LOC.boto_args,
)

res = await s3hash.compute_checksum(LOC)

assert res == s3hash.ChecksumResult(checksum=expected)


async def test_legacy(s3_stub: Stubber, mocker: MockerFixture):
Expand All @@ -115,12 +174,12 @@ async def test_legacy(s3_stub: Stubber, mocker: MockerFixture):
LOC.boto_args,
)

mocker.patch("t4_lambda_s3hash.MULTIPART_CHECKSUMS", False)
mocker.patch("t4_lambda_s3hash.CHUNKED_CHECKSUMS", False)

res = await s3hash.compute_checksum(LOC)

checksum_hex = bytes.fromhex("d9d865cc54ec60678f1b119084ad79ae7f9357d1c4519c6457de3314b7fbba8a")
assert res == s3hash.ChecksumResult(checksum=s3hash.Checksum.singlepart(checksum_hex))
assert res == s3hash.ChecksumResult(checksum=s3hash.Checksum.sha256(checksum_hex))


async def test_mpu_fail(s3_stub: Stubber):
Expand Down Expand Up @@ -168,12 +227,13 @@ async def test_mpu_single(s3_stub: Stubber):
)

CHECKSUM = bytes.fromhex("d9d865cc54ec60678f1b119084ad79ae7f9357d1c4519c6457de3314b7fbba8a")
CHECKSUM_HASH = bytes.fromhex("7eb12f7f901586f5c53fc5d8aaccd4a18177aa122c0bd166133372f42bc23880")
s3_stub.add_response(
"upload_part_copy",
{
"CopyPartResult": {
"ChecksumSHA256": base64.b64encode(CHECKSUM).decode(),
"ETag": ETAG + "-1",
"ETag": PART_ETAG,
},
},
{
Expand All @@ -193,7 +253,7 @@ async def test_mpu_single(s3_stub: Stubber):

res = await s3hash.compute_checksum(LOC)

assert res == s3hash.ChecksumResult(checksum=s3hash.Checksum.singlepart(CHECKSUM))
assert res == s3hash.ChecksumResult(checksum=s3hash.Checksum.sha256_chunked(CHECKSUM_HASH))


async def test_mpu_multi(s3_stub: Stubber):
Expand All @@ -214,6 +274,7 @@ async def test_mpu_multi(s3_stub: Stubber):

CHECKSUM_1 = bytes.fromhex("d9d865cc54ec60678f1b119084ad79ae7f9357d1c4519c6457de3314b7fbba8a")
CHECKSUM_2 = bytes.fromhex("a9d865cc54ec60678f1b119084ad79ae7f9357d1c4519c6457de3314b7fbba8a")
CHECKSUM_TOP = s3hash.hash_parts([CHECKSUM_1, CHECKSUM_2])
s3_stub.add_response(
"upload_part_copy",
{
Expand Down Expand Up @@ -257,7 +318,7 @@ async def test_mpu_multi(s3_stub: Stubber):

res = await s3hash.compute_checksum(LOC)

assert res == s3hash.ChecksumResult(checksum=s3hash.Checksum.multipart([CHECKSUM_1, CHECKSUM_2]))
assert res == s3hash.ChecksumResult(checksum=s3hash.Checksum.sha256_chunked(CHECKSUM_TOP))


async def test_mpu_multi_complete(s3_stub: Stubber):
Expand Down
Loading

0 comments on commit a2c5260

Please sign in to comment.