Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add download_presigned_url method #25

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ verify_ssl = true
name = "pypi"

[packages]
fqdn = "==1.5.1"
aiobotocore = "==2.13.2"
aiohttp = "==3.10.5"
fqdn = "==1.5.1"

[dev-packages]
pytest = "==8.3.2"
Expand Down
4 changes: 2 additions & 2 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 69 additions & 0 deletions cdmtaskservice/s3/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@
python versions.
"""

import aiohttp
from hashlib import md5
from pathlib import Path
from typing import Any
import zlib

# Probably not necessary, but coould look into aiofiles for some of these methods
# Potential future (minor) performance improvement, but means more installs on remote clusters

_CHUNK_SIZE_64KB = 2 ** 16


Expand Down Expand Up @@ -48,6 +53,7 @@ def calculate_etag(infile: Path, partsize: int) -> str:
raise ValueError("partsize must be > 0")
md5_digests = []
with open(infile.expanduser(), 'rb') as f:
# this could theoretically be a 5GB read. May need to read smaller chunks?
while chunk := f.read(partsize):
md5_digests.append(md5(chunk).digest())
if len(md5_digests) == 0:
Expand All @@ -71,3 +77,66 @@ def crc32(infile: Path) -> bytes:
def _check_file(infile: Path):
if not infile or not infile.expanduser().is_file():
raise ValueError("infile must be exist and be a file")


async def download_presigned_url(
session: aiohttp.ClientSession,
url: str,
etag: str,
partsize: int,
outputpath: Path):
"""
Download a presigned url from S3 and verify the E-tag.

session - the http session.
url - the presigned url.
etag - the etag to check the download against.
partsize - the partsize used when uploading the file to S3
path - where to store the file. If the file exists, it will be overwritten
"""
_not_falsy(session, "session")
_require_string(url, "url")
_require_string(etag, "etag")
_not_falsy(outputpath, "outputpath")
async with session.get(url) as resp:
if resp.status > 199 and resp.status < 300: # redirects are handled automatically
with open(outputpath, "wb") as f:
async for chunk in resp.content.iter_chunked(_CHUNK_SIZE_64KB):
f.write(chunk)
else:
# assume the error output isn't too huge
err = await resp.read()
raise TransferError(f"GET URL: {url.split('?')[0]} {resp.status}\nError:\n{err}")
try:
got_etag = calculate_etag(outputpath, partsize)
except ValueError:
outputpath.unlink(missing_ok=True)
raise
if etag != got_etag:
outputpath.unlink(missing_ok=True)
raise FileCorruptionError(
f"Etag check failed for url {url.split('?')[0]}. Expected {etag}, got {got_etag}")


# These arg checkers are duplicated in other places, but we want to minimize the number of files
# we have to transfer to the remote cluster and they're simple enough that duplication isn't
# a huge problem


def _require_string(string: str, name: str):
if not string or not string.strip():
raise ValueError(f"{name} is required")
return string.strip()


def _not_falsy(obj: Any, name: str):
if not obj:
raise ValueError(f"{name} is required")


class TransferError(Exception):
""" Thrown when a S3 transfer fails. """


class FileCorruptionError(Exception):
""" Thrown when a file transfer results in a corrupt file """
20 changes: 10 additions & 10 deletions test/s3/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,28 +89,28 @@ async def test_get_object_meta_multipart(minio):
await minio.clean() # couldn't get this to work as a fixture
await minio.create_bucket("test-bucket")
await minio.upload_file(
"test-bucket/big_test_file", b"abcdefghij" * 6000000, 3, b"bigolfile")
"test-bucket/big_test_file", b"abcdefghij" * 600000, 3, b"bigolfile")

s3c = await _client(minio)
objm = await s3c.get_object_meta(S3Paths(["test-bucket/big_test_file"]))
assert len(objm) == 1
_check_obj_meta(
objm[0],
"test-bucket/big_test_file",
"e0fcd4584a5157e2d465bf0217ab8268-4",
180000009,
60000000,
"b8185adaf462a5ac2ca9db335b290d23-4",
18000009,
6000000,
True,
4,
60000000,
6000000,
)

@pytest.mark.asyncio
async def test_get_object_meta_mix(minio):
await minio.clean() # couldn't get this to work as a fixture
await minio.create_bucket("nice-bucket")
await minio.upload_file(
"nice-bucket/big_test_file", b"abcdefghij" * 6000000, 4, b"bigolfile")
"nice-bucket/big_test_file", b"abcdefghij" * 600000, 4, b"bigolfile")
await minio.upload_file("nice-bucket/test_file", b"abcdefghij")

s3c = await _client(minio)
Expand All @@ -120,12 +120,12 @@ async def test_get_object_meta_mix(minio):
_check_obj_meta(
objm[0],
"nice-bucket/big_test_file",
"2c0fa9e12a28c40de69cab92da528adf-5",
240000009,
60000000,
"9728af2f2c566b2b944b96203769175d-5",
24000009,
6000000,
True,
5,
60000000,
6000000,
)
_check_obj_meta(
objm[1],
Expand Down
159 changes: 153 additions & 6 deletions test/s3/remote_test.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,36 @@
import aiohttp
import os
from pathlib import Path
import pytest
import shutil
import tempfile

from pytest import raises

from conftest import assert_exception_correct
from cdmtaskservice.s3.remote import calculate_etag, crc32
from conftest import assert_exception_correct, minio # @UnusedImport
from cdmtaskservice.s3.client import S3Client
from cdmtaskservice.s3.paths import S3Paths
from cdmtaskservice.s3.remote import (
calculate_etag,
crc32,
download_presigned_url,
FileCorruptionError,
TransferError,
)
import config

TESTDATA = Path(os.path.normpath((Path(__file__) / ".." / ".." / "testdata")))


@pytest.fixture(scope="module")
def temp_dir():

td = Path(tempfile.mkdtemp(prefix="remote_test", dir=config.TEMP_DIR))

yield td

if not config.TEMP_DIR_KEEP:
shutil.rmtree(td)


def test_calculate_etag():
testset = [
(TESTDATA / "random_bytes_1kB", 1024, "b10278db14633f102103c5e9d75c0af0"),
Expand All @@ -31,7 +54,7 @@ def test_calculate_etag_fail():
(TESTDATA / "random_bytes_1kB", -10000, ValueError("partsize must be > 0")),
]
for infile, size, expected in testset:
with raises(Exception) as got:
with pytest.raises(Exception) as got:
calculate_etag(infile, size)
assert_exception_correct(got.value, expected)

Expand All @@ -54,6 +77,130 @@ def test_crc32_fail():
(TESTDATA, ValueError("infile must be exist and be a file")),
]
for infile, expected in testset:
with raises(Exception) as got:
with pytest.raises(Exception) as got:
crc32(infile)
assert_exception_correct(got.value, expected)


@pytest.mark.asyncio
async def test_download_presigned_url(minio, temp_dir):
await minio.clean() # couldn't get this to work as a fixture
await minio.create_bucket("test-bucket")
await minio.upload_file("test-bucket/myfile", b"abcdefghij")

s3c = await _client(minio)
url = (await s3c.presign_get_urls(S3Paths(["test-bucket/myfile"])))[0]
output = temp_dir / "temp1.txt"
async with aiohttp.ClientSession() as sess:
await download_presigned_url(sess, url, "a925576942e94b2ef57a066101b48876", 10, output)
with open(output) as f:
assert f.read() == "abcdefghij"


@pytest.mark.asyncio
async def test_download_presigned_url_multipart(minio, temp_dir):
await minio.clean() # couldn't get this to work as a fixture
await minio.create_bucket("nice-bucket")
res = await minio.upload_file(
"nice-bucket/big_test_file", b"abcdefghij" * 600000, 4, b"bigolfile")

s3c = await _client(minio)
url = (await s3c.presign_get_urls(S3Paths(["nice-bucket/big_test_file"])))[0]
output = temp_dir / "temp2.txt"
async with aiohttp.ClientSession() as sess:
await download_presigned_url(
sess, url, "9728af2f2c566b2b944b96203769175d-5", 6000000, output)
with open(output) as f:
assert f.read() == "abcdefghij" * 600000 * 4 + "bigolfile"


@pytest.mark.asyncio
async def test_download_presigned_url_fail_no_session(minio, temp_dir):
output = temp_dir / "fail.txt"
et = "a925576942e94b2ef57a066101b48876"
s3c = await _client(minio)
url = (await s3c.presign_get_urls(S3Paths(["test-bucket/myfile"])))[0]
with pytest.raises(Exception) as got:
await download_presigned_url(None, url, et, 10, output)
assert_exception_correct(got.value, ValueError("session is required"))
assert not output.exists()


@pytest.mark.asyncio
async def test_download_presigned_url_fail_bad_args(minio, temp_dir):
await minio.clean() # couldn't get this to work as a fixture
await minio.create_bucket("test-bucket")
await minio.upload_file("test-bucket/myfile", b"abcdefghij")
et = "a925576942e94b2ef57a066101b48876"
ps = 10
o = temp_dir / "fail.txt"

s3c = await _client(minio)
url = (await s3c.presign_get_urls(S3Paths(["test-bucket/myfile"])))[0]
await _download_presigned_url_fail(None, et, ps, o, ValueError("url is required"))
await _download_presigned_url_fail(" \t ", et, ps, o, ValueError("url is required"))
await _download_presigned_url_fail(url, None, ps, o, ValueError("etag is required"))
await _download_presigned_url_fail(url, " \t ", ps, o, ValueError("etag is required"))
await _download_presigned_url_fail(url, "foo", ps, o, FileCorruptionError(
f"Etag check failed for url http://localhost:{minio.port}/test-bucket/myfile. "
+ "Expected foo, got a925576942e94b2ef57a066101b48876"))
await _download_presigned_url_fail(url, et, 0, o, ValueError("partsize must be > 0"))
await _download_presigned_url_fail(url, et, 3, o, FileCorruptionError(
f"Etag check failed for url http://localhost:{minio.port}/test-bucket/myfile. "
+ "Expected a925576942e94b2ef57a066101b48876, got 1543089f5b20740cc5713f0437fcea8c-4"))
await _download_presigned_url_fail(url, et, ps, None, ValueError("outputpath is required"))


async def _download_presigned_url_fail(url, etag, partsize, output, expected):
with pytest.raises(Exception) as got:
async with aiohttp.ClientSession() as sess:
await download_presigned_url(sess, url, etag, partsize, output)
assert_exception_correct(got.value, expected)
if output:
assert not output.exists()


@pytest.mark.asyncio
async def test_download_presigned_url_fail_bad_sig(minio, temp_dir):
starts_with = f"GET URL: http://localhost:{minio.port}/test-bucket/myfilex 403\nError:\n"
contains = ("<Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we "
+ "calculated does not match the signature you provided. Check your key and "
+ "signing method.</Message><Key>myfilex</Key><BucketName>test-bucket</BucketName>"
+ "<Resource>/test-bucket/myfilex</Resource>"
)
s3c = await _client(minio)
url = (await s3c.presign_get_urls(S3Paths(["test-bucket/myfile"]))
)[0].replace("myfile", "myfilex")
await _download_presigned_url_fail_s3_error(minio, temp_dir, url, starts_with, contains)


@pytest.mark.asyncio
async def test_download_presigned_url_fail_nofile(minio, temp_dir):
starts_with = f"GET URL: http://localhost:{minio.port}/test-bucket/myfilex 404\nError:\n"
contains = ("<Error><Code>NoSuchKey</Code><Message>The specified key does not exist."
+ "</Message><Key>myfilex</Key><BucketName>test-bucket</BucketName>"
+"<Resource>/test-bucket/myfilex</Resource>"
)
s3c = await _client(minio)
url = (await s3c.presign_get_urls(S3Paths(["test-bucket/myfilex"])))[0]
await _download_presigned_url_fail_s3_error(minio, temp_dir, url, starts_with, contains)


async def _download_presigned_url_fail_s3_error(minio, temp_dir, url, starts_with, contains):
await minio.clean() # couldn't get this to work as a fixture
await minio.create_bucket("test-bucket")
await minio.upload_file("test-bucket/myfile", b"abcdefghij")
et = "a925576942e94b2ef57a066101b48876"
output = temp_dir / "fail_s3.txt"
with pytest.raises(Exception) as got:
async with aiohttp.ClientSession() as sess:
await download_presigned_url(sess, url, et, 10, output)
errmsg = str(got.value)
assert errmsg.startswith(starts_with)
assert contains in errmsg
assert type(got.value) == TransferError
assert not output.exists()


async def _client(minio):
return await S3Client.create(minio.host, minio.access_key, minio.secret_key)
Loading