Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Put object options #4219

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
5 changes: 3 additions & 2 deletions api/python/quilt3/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)


def copy(src, dest):
def copy(src, dest, put_options=None):
"""
Copies ``src`` object from QUILT to ``dest``.

Expand All @@ -31,8 +31,9 @@
Parameters:
src (str): a path to retrieve
dest (str): a path to write to
put_options (dict): optional arguments to pass to the PutObject operation
"""
copy_file(PhysicalKey.from_url(fix_url(src)), PhysicalKey.from_url(fix_url(dest)))
copy_file(PhysicalKey.from_url(fix_url(src)), PhysicalKey.from_url(fix_url(dest)), put_options=put_options)

Check warning on line 36 in api/python/quilt3/api.py

View check run for this annotation

Codecov / codecov/patch/informational

api/python/quilt3/api.py#L36

Added line #L36 was not covered by tests


@ApiTelemetry("api.delete_package")
Expand Down
10 changes: 6 additions & 4 deletions api/python/quilt3/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ def search(self, query: T.Union[str, dict], limit: int = 10) -> T.List[dict]:
"""
return search_api(query, index=f"{self._pk.bucket},{self._pk.bucket}_packages", limit=limit)["hits"]["hits"]

def put_file(self, key, path):
def put_file(self, key, path, put_options=None):
"""
Stores file at path to key in bucket.

Args:
key(str): key in bucket to store file at
path(str): string representing local path to file
put_options(dict): optional arguments to pass to the PutObject operation

Returns:
None
Expand All @@ -71,15 +72,16 @@ def put_file(self, key, path):
* if copy fails
"""
dest = self._pk.join(key)
copy_file(PhysicalKey.from_url(fix_url(path)), dest)
copy_file(PhysicalKey.from_url(fix_url(path)), dest, put_options=put_options)

def put_dir(self, key, directory):
def put_dir(self, key, directory, put_options=None):
"""
Stores all files in the `directory` under the prefix `key`.

Args:
key(str): prefix to store files under in bucket
directory(str): path to directory to grab files from
put_options(dict): optional arguments to pass to the PutObject operation

Returns:
None
Expand All @@ -97,7 +99,7 @@ def put_dir(self, key, directory):

src = PhysicalKey.from_path(str(src_path) + '/')
dest = self._pk.join(key)
copy_file(src, dest)
copy_file(src, dest, put_options=put_options)

def keys(self):
"""
Expand Down
33 changes: 17 additions & 16 deletions api/python/quilt3/data_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@
ctx.done(PhysicalKey.from_path(dest_path), None)


def _upload_file(ctx: WorkerContext, size: int, src_path: str, dest_bucket: str, dest_key: str):
def _upload_file(ctx: WorkerContext, size: int, src_path: str, dest_bucket: str, dest_key: str, put_options=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't see where this argument is used.
also, this case is somewhat less straightforward, because there may be (and usually are) operations other than PutObject: create / update MPU, upload part -- what's the plan regarding those?

s3_client = ctx.s3_client_provider.standard_client

if not is_mpu(size):
Expand Down Expand Up @@ -580,15 +580,16 @@
return None


def _upload_or_reuse_file(ctx: WorkerContext, size: int, src_path: str, dest_bucket: str, dest_path: str):
def _upload_or_reuse_file(ctx: WorkerContext, size: int, src_path: str, dest_bucket: str,
dest_path: str, put_options=None):
drernie marked this conversation as resolved.
Show resolved Hide resolved
result = _reuse_remote_file(ctx, size, src_path, dest_bucket, dest_path)
if result is not None:
dest_version_id, checksum = result
ctx.progress(size)
ctx.done(PhysicalKey(dest_bucket, dest_path, dest_version_id), checksum)
return # Optimization succeeded.
# If the optimization didn't happen, do the normal upload.
_upload_file(ctx, size, src_path, dest_bucket, dest_path)
_upload_file(ctx, size, src_path, dest_bucket, dest_path, put_options)


def _copy_file_list_last_retry(retry_state):
Expand All @@ -602,7 +603,8 @@
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_not_result(all),
retry_error_callback=_copy_file_list_last_retry)
def _copy_file_list_internal(file_list, results, message, callback, exceptions_to_ignore=(ClientError,)):
def _copy_file_list_internal(file_list, results, message, callback,
exceptions_to_ignore=(ClientError,), put_options=None):
"""
Takes a list of tuples (src, dest, size) and copies the data in parallel.
`results` is the list where results will be stored.
Expand Down Expand Up @@ -668,13 +670,13 @@
else:
if dest.version_id:
raise ValueError("Cannot set VersionId on destination")
_upload_or_reuse_file(ctx, size, src.path, dest.bucket, dest.path)
_upload_or_reuse_file(ctx, size, src.path, dest.bucket, dest.path, put_options)
else:
if dest.is_local():
_download_file(ctx, size, src.bucket, src.path, src.version_id, dest.path)
else:
_copy_remote_file(ctx, size, src.bucket, src.path, src.version_id,
dest.bucket, dest.path)
dest.bucket, dest.path, extra_args=put_options)

try:
for idx, (args, result) in enumerate(zip(file_list, results)):
Expand Down Expand Up @@ -855,7 +857,7 @@
s3_client.delete_object(Bucket=src.bucket, Key=src.path)


def copy_file_list(file_list, message=None, callback=None):
def copy_file_list(file_list, message=None, callback=None, put_options=None):
"""
Takes a list of tuples (src, dest, size) and copies them in parallel.
URLs must be regular files, not directories.
Expand All @@ -865,10 +867,10 @@
if _looks_like_dir(src) or _looks_like_dir(dest):
raise ValueError("Directories are not allowed")

return _copy_file_list_internal(file_list, [None] * len(file_list), message, callback)
return _copy_file_list_internal(file_list, [None] * len(file_list), message, callback, put_options=None)


def copy_file(src: PhysicalKey, dest: PhysicalKey, size=None, message=None, callback=None):
def copy_file(src: PhysicalKey, dest: PhysicalKey, size=None, message=None, callback=None, put_options=None):
"""
Copies a single file or directory.
If src is a file, dest can be a file or a directory.
Expand Down Expand Up @@ -900,10 +902,10 @@
src = PhysicalKey(src.bucket, src.path, version_id)
url_list.append((src, dest, size))

_copy_file_list_internal(url_list, [None] * len(url_list), message, callback)
_copy_file_list_internal(url_list, [None] * len(url_list), message, callback, put_options=None)


def put_bytes(data: bytes, dest: PhysicalKey):
def put_bytes(data: bytes, dest: PhysicalKey, put_options=None):
if _looks_like_dir(dest):
raise ValueError("Invalid path: %r" % dest.path)

Expand All @@ -915,11 +917,10 @@
if dest.version_id is not None:
raise ValueError("Cannot set VersionId on destination")
s3_client = S3ClientProvider().standard_client
s3_client.put_object(
Bucket=dest.bucket,
Key=dest.path,
Body=data,
)
s3_params = dict(Bucket=dest.bucket, Key=dest.path, Body=data)
if put_options:
s3_params.update(put_options)

Check warning on line 922 in api/python/quilt3/data_transfer.py

View check run for this annotation

Codecov / codecov/patch/informational

api/python/quilt3/data_transfer.py#L922

Added line #L922 was not covered by tests
s3_client.put_object(**s3_params)


def _local_get_bytes(pk: PhysicalKey):
Expand Down
19 changes: 11 additions & 8 deletions api/python/quilt3/packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,14 @@ def deserialize(self, func=None, **format_opts):

return formats[0].deserialize(data, self._meta, pkey_ext, **format_opts)

def fetch(self, dest=None):
def fetch(self, dest=None, put_options=None):
"""
Gets objects from entry and saves them to dest.

Args:
dest: where to put the files
Defaults to the entry name
put_options: optional arguments to pass to the PutObject operation

Returns:
None
Expand All @@ -368,7 +369,7 @@ def fetch(self, dest=None):
else:
dest = PhysicalKey.from_url(fix_url(dest))

copy_file(self.physical_key, dest)
copy_file(self.physical_key, dest, put_options=put_options)

# return a package reroot package physical keys after the copy operation succeeds
# see GH#388 for context
Expand Down Expand Up @@ -700,13 +701,14 @@ def __getitem__(self, logical_key):
return pkg

@ApiTelemetry("package.fetch")
def fetch(self, dest='./'):
def fetch(self, dest='./', put_options=None):
"""
Copy all descendants to `dest`. Descendants are written under their logical
names _relative_ to self.

Args:
dest: where to put the files (locally)
put_options: optional arguments to pass to the PutObject operation

Returns:
A new Package object with entries from self, but with physical keys
Expand All @@ -727,7 +729,7 @@ def fetch(self, dest='./'):
new_entry = entry.with_physical_key(new_physical_key)
pkg._set(logical_key, new_entry)

copy_file_list(file_list, message="Copying objects")
copy_file_list(file_list, message="Copying objects", put_options=put_options)

return pkg

Expand Down Expand Up @@ -1355,7 +1357,7 @@ def _get_top_hash_parts(cls, meta, entries):
@_fix_docstring(workflow=_WORKFLOW_PARAM_DOCSTRING)
def push(
self, name, registry=None, dest=None, message=None, selector_fn=None, *,
workflow=..., force: bool = False, dedupe: bool = False
workflow=..., force: bool = False, dedupe: bool = False, put_options=None
):
"""
Copies objects to path, then creates a new package that points to those objects.
Expand Down Expand Up @@ -1398,19 +1400,20 @@ def push(
%(workflow)s
force: skip the top hash check and overwrite any existing package
dedupe: don't push if the top hash matches the existing package top hash; return the current package
put_options: optional arguments to pass to the PutObject operation

Returns:
A new package that points to the copied objects.
"""
return self._push(
name, registry, dest, message, selector_fn, workflow=workflow,
print_info=True, force=force, dedupe=dedupe
print_info=True, force=force, dedupe=dedupe, put_options=put_options
)

def _push(
self, name, registry=None, dest=None, message=None, selector_fn=None, *,
workflow, print_info, force: bool, dedupe: bool,
copy_file_list_fn: T.Optional[CopyFileListFn] = None,
copy_file_list_fn: T.Optional[CopyFileListFn] = None, put_options=None
):
if selector_fn is None:
def selector_fn(*args):
Expand Down Expand Up @@ -1531,7 +1534,7 @@ def check_hash_conficts(latest_hash):
entries.append((logical_key, entry))
file_list.append((physical_key, new_physical_key, entry.size))

results = copy_file_list_fn(file_list, message="Copying objects")
results = copy_file_list_fn(file_list, message="Copying objects", put_options=put_options)

for (logical_key, entry), (versioned_key, checksum) in zip(entries, results):
# Create a new package entry pointing to the new remote key.
Expand Down
5 changes: 3 additions & 2 deletions api/python/tests/integration/test_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
LOCAL_REGISTRY = Path('local_registry') # Set by QuiltTestCase


def _mock_copy_file_list(file_list, callback=None, message=None):
def _mock_copy_file_list(file_list, callback=None, message=None, **kwargs):
return [(key, None) for _, key, _ in file_list]


Expand Down Expand Up @@ -449,7 +449,8 @@ def test_fetch_default_dest(tmpdir):
filepath = os.path.join(os.path.dirname(__file__), 'data', 'foo.txt')
copy_mock.assert_called_once_with(
PhysicalKey.from_path(filepath),
PhysicalKey.from_path('foo.txt')
PhysicalKey.from_path('foo.txt'),
put_options=None
drernie marked this conversation as resolved.
Show resolved Hide resolved
)

@patch('quilt3.workflows.validate', mock.MagicMock(return_value=None))
Expand Down
23 changes: 15 additions & 8 deletions api/python/tests/test_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,30 +172,37 @@ def test_bucket_select(self):

def test_bucket_put_file(self):
with patch("quilt3.bucket.copy_file") as copy_mock:
opts = {'SSECustomerKey': 'FakeKey'}
bucket = Bucket('s3://test-bucket')
bucket.put_file(key='README.md', path='./README') # put local file to bucket
bucket.put_file(key='README.md', path='./README', put_options=opts)
# put local file to bucket
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment should be either above or on the line it refers to


copy_mock.assert_called_once_with(
PhysicalKey.from_path('README'), PhysicalKey.from_url('s3://test-bucket/README.md'))
PhysicalKey.from_path('README'),
PhysicalKey.from_url('s3://test-bucket/README.md'), put_options=opts)
drernie marked this conversation as resolved.
Show resolved Hide resolved

def test_bucket_put_dir(self):
path = pathlib.Path(__file__).parent / 'data'
bucket = Bucket('s3://test-bucket')
opts = {'SSECustomerKey': 'FakeKey'}

with patch("quilt3.bucket.copy_file") as copy_mock:
bucket.put_dir('test', path)
bucket.put_dir('test', path, opts)
copy_mock.assert_called_once_with(
PhysicalKey.from_path(str(path) + '/'), PhysicalKey.from_url('s3://test-bucket/test/'))
PhysicalKey.from_path(str(path) + '/'),
PhysicalKey.from_url('s3://test-bucket/test/'), put_options=opts)

with patch("quilt3.bucket.copy_file") as copy_mock:
bucket.put_dir('test/', path)
bucket.put_dir('test/', path, opts)
copy_mock.assert_called_once_with(
PhysicalKey.from_path(str(path) + '/'), PhysicalKey.from_url('s3://test-bucket/test/'))
PhysicalKey.from_path(str(path) + '/'),
PhysicalKey.from_url('s3://test-bucket/test/'), put_options=opts)

with patch("quilt3.bucket.copy_file") as copy_mock:
bucket.put_dir('', path)
bucket.put_dir('', path, opts)
copy_mock.assert_called_once_with(
PhysicalKey.from_path(str(path) + '/'), PhysicalKey.from_url('s3://test-bucket/'))
PhysicalKey.from_path(str(path) + '/'),
PhysicalKey.from_url('s3://test-bucket/'), put_options=opts)

def test_remote_delete(self):
self.s3_stubber.add_response(
Expand Down
5 changes: 5 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ Entries inside each section should be ordered by type:

## CLI
!-->
# unreleased - YYYY-MM-DD

## Python API

* [Added] Extend public methods that write to S3 to take optional `put_options` argument to pass to PutObject: `quilt3.Package.push`, `quilt3.Package.fetch`, `quilt3.PackageEntry.fetch`, `quilt3.Bucket.put_file`, `quilt3.Bucket.put_dir` ([#4219](https://github.com/quiltdata/quilt/pull/4219))

# 6.1.0 - 2024-10-14

## Python API
Expand Down
6 changes: 4 additions & 2 deletions docs/api-reference/Bucket.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ __Returns__
search results


## Bucket.put\_file(self, key, path) {#Bucket.put\_file}
## Bucket.put\_file(self, key, path, put\_options=None) {#Bucket.put\_file}

Stores file at path to key in bucket.

__Arguments__

* __key(str)__: key in bucket to store file at
* __path(str)__: string representing local path to file
* __put_options(dict)__: optional arguments to pass to the PutObject operation

__Returns__

Expand All @@ -52,14 +53,15 @@ __Raises__
* if copy fails


## Bucket.put\_dir(self, key, directory) {#Bucket.put\_dir}
## Bucket.put\_dir(self, key, directory, put\_options=None) {#Bucket.put\_dir}

Stores all files in the `directory` under the prefix `key`.

__Arguments__

* __key(str)__: prefix to store files under in bucket
* __directory(str)__: path to directory to grab files from
* __put_options(dict)__: optional arguments to pass to the PutObject operation

__Returns__

Expand Down
Loading