Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Jobs] Allowing to specify intermediate bucket for file upload #4257

Open
wants to merge 51 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
bbfe91a
debug
zpoint Nov 3, 2024
043dd28
support workdir_bucket_name config on yaml file
zpoint Nov 4, 2024
bc767c8
change the match statement to if else due to mypy limit
zpoint Nov 4, 2024
8200400
pass mypy
zpoint Nov 4, 2024
0e395ca
yapf format fix
zpoint Nov 4, 2024
2b1a063
reformat
zpoint Nov 4, 2024
fa9cc69
remove debug line
zpoint Nov 4, 2024
4c98e11
all dir to same bucket
zpoint Nov 6, 2024
144a4a3
private member function
zpoint Nov 6, 2024
42de23a
fix mypy
zpoint Nov 6, 2024
888c0fa
support sub dir config to separate to different directory
zpoint Nov 7, 2024
3b1adcc
rename and add smoke test
zpoint Nov 8, 2024
06c7891
bucketname
zpoint Nov 9, 2024
3aaf0f1
support sub dir mount
zpoint Nov 11, 2024
0116fa0
private member for _bucket_sub_path and smoke test fix
zpoint Nov 12, 2024
0750900
support copy mount for sub dir
zpoint Nov 12, 2024
51dfcd3
support gcs, s3 delete folder
zpoint Nov 12, 2024
6ba05cc
doc
zpoint Nov 12, 2024
1751fab
r2 remove_objects_from_sub_path
zpoint Nov 12, 2024
3bb60c8
support azure remove directory and cos remove
zpoint Nov 13, 2024
415a089
doc string for remove_objects_from_sub_path
zpoint Nov 13, 2024
f0f1fe1
fix sky jobs subdir issue
zpoint Nov 13, 2024
1a62d06
test case update
zpoint Nov 13, 2024
79ea48a
rename to _bucket_sub_path
zpoint Nov 14, 2024
8fd2c8c
change the config schema
zpoint Nov 14, 2024
7d57aef
setter
zpoint Nov 14, 2024
5da18de
bug fix and test update
zpoint Nov 14, 2024
7c82fd7
delete bucket depends on user config or sky generated
zpoint Nov 15, 2024
cb2a574
add test case
zpoint Nov 15, 2024
df60317
smoke test bug fix
zpoint Nov 15, 2024
a2efd8c
robust smoke test
zpoint Nov 18, 2024
c43c705
fix comment
zpoint Nov 26, 2024
cc8a8a6
bug fix
zpoint Nov 26, 2024
b939f7f
set the storage manually
zpoint Nov 27, 2024
fb0edf5
better structure
zpoint Nov 27, 2024
b640be6
merge master
zpoint Nov 27, 2024
e4619cb
fix mypy
zpoint Nov 27, 2024
abe0d99
Update docs/source/reference/config.rst
zpoint Dec 2, 2024
1669fb8
Update docs/source/reference/config.rst
zpoint Dec 2, 2024
0893794
Merge branch 'master' into dev/zeping/specific_bucket_for_sky_jobs
zpoint Dec 2, 2024
5d7ea0f
limit creation for bucket and delete sub dir only
zpoint Dec 2, 2024
fc2d48e
resolve comment
zpoint Dec 2, 2024
e032fad
Update docs/source/reference/config.rst
zpoint Dec 14, 2024
284b46d
Update sky/utils/controller_utils.py
zpoint Dec 14, 2024
86ffe01
resolve PR comment
zpoint Dec 14, 2024
0613905
bug fix
zpoint Dec 14, 2024
bfd9fd5
merge master
zpoint Dec 14, 2024
d1ae190
bug fix
zpoint Dec 14, 2024
469cede
fix test case
zpoint Dec 14, 2024
faedd28
bug fix
zpoint Dec 14, 2024
8576e14
fix
zpoint Dec 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 90 additions & 31 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,29 @@ def get_endpoint_url(cls, store: 'AbstractStore', path: str) -> str:
return bucket_endpoint_url


class StorePrefix(enum.Enum):
"""Enum for the prefix of different stores."""
S3 = 's3://'
GCS = 'gs://'
AZURE = 'https://'
R2 = 'r2://'
IBM = 'cos://'

def to_store_type(self) -> StoreType:
if self == StorePrefix.S3:
return StoreType.S3
elif self == StorePrefix.GCS:
return StoreType.GCS
elif self == StorePrefix.AZURE:
return StoreType.AZURE
elif self == StorePrefix.R2:
return StoreType.R2
elif self == StorePrefix.IBM:
return StoreType.IBM
else:
raise ValueError(f'Unknown store prefix: {self}')
zpoint marked this conversation as resolved.
Show resolved Hide resolved


class StorageMode(enum.Enum):
MOUNT = 'MOUNT'
COPY = 'COPY'
Expand Down Expand Up @@ -232,7 +255,8 @@ def __init__(self,
source: Optional[SourceType],
region: Optional[str] = None,
is_sky_managed: Optional[bool] = None,
sync_on_reconstruction: Optional[bool] = True):
sync_on_reconstruction: Optional[bool] = True,
sub_dir: Optional[str] = None):
"""Initialize AbstractStore

Args:
Expand All @@ -246,6 +270,11 @@ def __init__(self,
there. This is set to false when the Storage object is created not
for direct use, e.g. for 'sky storage delete', or the storage is
being re-used, e.g., for `sky start` on a stopped cluster.
sub_dir: str; The prefix of the directory to be created in the
store, e.g. if sub_dir=my-dir, the files will be uploaded to
s3://<bucket>/my-dir/.
zpoint marked this conversation as resolved.
Show resolved Hide resolved
This only works if source is a local directory.
# TODO(zpoint): Add support for non-local source.

Raises:
StorageBucketCreateError: If bucket creation fails
Expand All @@ -257,6 +286,8 @@ def __init__(self,
self.region = region
self.is_sky_managed = is_sky_managed
self.sync_on_reconstruction = sync_on_reconstruction

self.sub_dir = sub_dir
# Whether sky is responsible for the lifecycle of the Store.
self._validate()
self.initialize()
Expand Down Expand Up @@ -458,7 +489,8 @@ def __init__(self,
stores: Optional[Dict[StoreType, AbstractStore]] = None,
persistent: Optional[bool] = True,
mode: StorageMode = StorageMode.MOUNT,
sync_on_reconstruction: bool = True) -> None:
sync_on_reconstruction: bool = True,
sub_dir: Optional[str] = None) -> None:
"""Initializes a Storage object.

Three fields are required: the name of the storage, the source
Expand Down Expand Up @@ -496,13 +528,16 @@ def __init__(self,
there. This is set to false when the Storage object is created not
for direct use, e.g. for 'sky storage delete', or the storage is
being re-used, e.g., for `sky start` on a stopped cluster.
sub_dir: Optional[str]; The subdirectory to use for the
storage object.
"""
self.name: str
self.source = source
self.persistent = persistent
self.mode = mode
assert mode in StorageMode
self.sync_on_reconstruction = sync_on_reconstruction
self.sub_dir = sub_dir

# TODO(romilb, zhwu): This is a workaround to support storage deletion
# for spot. Once sky storage supports forced management for external
Expand Down Expand Up @@ -815,7 +850,9 @@ def _add_store_from_metadata(
'to be reconstructed while the corresponding '
'bucket was externally deleted.')
continue

# This one can't be retrieved from metadata since its set every time
# we create a new storage object.
store.sub_dir = self.sub_dir
self._add_store(store, is_reconstructed=True)

@classmethod
Expand Down Expand Up @@ -871,6 +908,7 @@ def add_store(self,
f'storage account {storage_account_name!r}.')
else:
logger.info(f'Storage type {store_type} already exists.')

return self.stores[store_type]

store_cls: Type[AbstractStore]
Expand All @@ -895,7 +933,8 @@ def add_store(self,
name=self.name,
source=self.source,
region=region,
sync_on_reconstruction=self.sync_on_reconstruction)
sync_on_reconstruction=self.sync_on_reconstruction,
sub_dir=self.sub_dir)
except exceptions.StorageBucketCreateError:
# Creation failed, so this must be sky managed store. Add failure
# to state.
Expand Down Expand Up @@ -1024,6 +1063,7 @@ def from_yaml_config(cls, config: Dict[str, Any]) -> 'Storage':
store = config.pop('store', None)
mode_str = config.pop('mode', None)
force_delete = config.pop('_force_delete', None)
sub_dir = config.pop('sub_dir', None)
if force_delete is None:
force_delete = False

Expand All @@ -1043,7 +1083,8 @@ def from_yaml_config(cls, config: Dict[str, Any]) -> 'Storage':
storage_obj = cls(name=name,
source=source,
persistent=persistent,
mode=mode)
mode=mode,
sub_dir=sub_dir)
if store is not None:
storage_obj.add_store(StoreType(store.upper()))

Expand Down Expand Up @@ -1089,11 +1130,12 @@ def __init__(self,
source: str,
region: Optional[str] = 'us-east-2',
is_sky_managed: Optional[bool] = None,
sync_on_reconstruction: bool = True):
sync_on_reconstruction: bool = True,
sub_dir: Optional[str] = None):
self.client: 'boto3.client.Client'
self.bucket: 'StorageHandle'
super().__init__(name, source, region, is_sky_managed,
sync_on_reconstruction)
sync_on_reconstruction, sub_dir)

def _validate(self):
if self.source is not None and isinstance(self.source, str):
Expand Down Expand Up @@ -1291,9 +1333,10 @@ def get_file_sync_command(base_dir_path, file_names):
for file_name in file_names
])
base_dir_path = shlex.quote(base_dir_path)
sub_dir = f'/{self.sub_dir}' if self.sub_dir else ''
sync_command = ('aws s3 sync --no-follow-symlinks --exclude="*" '
f'{includes} {base_dir_path} '
f's3://{self.name}')
f's3://{self.name}{sub_dir}')
return sync_command

def get_dir_sync_command(src_dir_path, dest_dir_name):
Expand All @@ -1305,9 +1348,11 @@ def get_dir_sync_command(src_dir_path, dest_dir_name):
for file_name in excluded_list
])
src_dir_path = shlex.quote(src_dir_path)
sub_dir = f'/{self.sub_dir}' if self.sub_dir else ''
sync_command = (f'aws s3 sync --no-follow-symlinks {excludes} '
f'{src_dir_path} '
f's3://{self.name}/{dest_dir_name}')
f's3://{self.name}{sub_dir}/{dest_dir_name}')
print(sync_command)
return sync_command

# Generate message for upload
Expand Down Expand Up @@ -1521,11 +1566,12 @@ def __init__(self,
source: str,
region: Optional[str] = 'us-central1',
is_sky_managed: Optional[bool] = None,
sync_on_reconstruction: Optional[bool] = True):
sync_on_reconstruction: Optional[bool] = True,
sub_dir: Optional[str] = None):
self.client: 'storage.Client'
self.bucket: StorageHandle
super().__init__(name, source, region, is_sky_managed,
sync_on_reconstruction)
sync_on_reconstruction, sub_dir)

def _validate(self):
if self.source is not None and isinstance(self.source, str):
Expand Down Expand Up @@ -1759,9 +1805,10 @@ def get_file_sync_command(base_dir_path, file_names):
sync_format = '|'.join(file_names)
gsutil_alias, alias_gen = data_utils.get_gsutil_command()
base_dir_path = shlex.quote(base_dir_path)
sub_dir = f'/{self.sub_dir}' if self.sub_dir else ''
sync_command = (f'{alias_gen}; {gsutil_alias} '
f'rsync -e -x \'^(?!{sync_format}$).*\' '
f'{base_dir_path} gs://{self.name}')
f'{base_dir_path} gs://{self.name}{sub_dir}')
return sync_command

def get_dir_sync_command(src_dir_path, dest_dir_name):
Expand All @@ -1771,9 +1818,10 @@ def get_dir_sync_command(src_dir_path, dest_dir_name):
excludes = '|'.join(excluded_list)
gsutil_alias, alias_gen = data_utils.get_gsutil_command()
src_dir_path = shlex.quote(src_dir_path)
sub_dir = f'/{self.sub_dir}' if self.sub_dir else ''
sync_command = (f'{alias_gen}; {gsutil_alias} '
f'rsync -e -r -x \'({excludes})\' {src_dir_path} '
f'gs://{self.name}/{dest_dir_name}')
f'gs://{self.name}{sub_dir}/{dest_dir_name}')
return sync_command

# Generate message for upload
Expand Down Expand Up @@ -2003,7 +2051,8 @@ def __init__(self,
storage_account_name: str = '',
region: Optional[str] = 'eastus',
is_sky_managed: Optional[bool] = None,
sync_on_reconstruction: bool = True):
sync_on_reconstruction: bool = True,
sub_dir: Optional[str] = None):
self.storage_client: 'storage.Client'
self.resource_client: 'storage.Client'
self.container_name: str
Expand All @@ -2015,7 +2064,7 @@ def __init__(self,
if region is None:
region = 'eastus'
super().__init__(name, source, region, is_sky_managed,
sync_on_reconstruction)
sync_on_reconstruction, sub_dir)

@classmethod
def from_metadata(cls, metadata: AbstractStore.StoreMetadata,
Expand Down Expand Up @@ -2483,13 +2532,15 @@ def get_file_sync_command(base_dir_path, file_names) -> str:
includes_list = ';'.join(file_names)
includes = f'--include-pattern "{includes_list}"'
base_dir_path = shlex.quote(base_dir_path)
container_path = (f'{self.container_name}/{self.sub_dir}'
if self.sub_dir else self.container_name)
sync_command = (f'az storage blob sync '
f'--account-name {self.storage_account_name} '
f'--account-key {self.storage_account_key} '
f'{includes} '
'--delete-destination false '
f'--source {base_dir_path} '
f'--container {self.container_name}')
f'--container {container_path}')
return sync_command

def get_dir_sync_command(src_dir_path, dest_dir_name) -> str:
Expand All @@ -2500,8 +2551,9 @@ def get_dir_sync_command(src_dir_path, dest_dir_name) -> str:
[file_name.rstrip('*') for file_name in excluded_list])
excludes = f'--exclude-path "{excludes_list}"'
src_dir_path = shlex.quote(src_dir_path)
container_path = (f'{self.container_name}/{dest_dir_name}'
if dest_dir_name else self.container_name)
container_path = (
f'{self.container_name}/{self.sub_dir}/{dest_dir_name}'
if self.sub_dir else f'{self.container_name}/{dest_dir_name}')
sync_command = (f'az storage blob sync '
f'--account-name {self.storage_account_name} '
f'--account-key {self.storage_account_key} '
Expand Down Expand Up @@ -2746,11 +2798,12 @@ def __init__(self,
source: str,
region: Optional[str] = 'auto',
is_sky_managed: Optional[bool] = None,
sync_on_reconstruction: Optional[bool] = True):
sync_on_reconstruction: Optional[bool] = True,
sub_dir: Optional[str] = None):
self.client: 'boto3.client.Client'
self.bucket: 'StorageHandle'
super().__init__(name, source, region, is_sky_managed,
sync_on_reconstruction)
sync_on_reconstruction, sub_dir)

def _validate(self):
if self.source is not None and isinstance(self.source, str):
Expand Down Expand Up @@ -2889,11 +2942,12 @@ def get_file_sync_command(base_dir_path, file_names):
])
endpoint_url = cloudflare.create_endpoint()
base_dir_path = shlex.quote(base_dir_path)
sub_dir = f'/{self.sub_dir}' if self.sub_dir else ''
sync_command = ('AWS_SHARED_CREDENTIALS_FILE='
f'{cloudflare.R2_CREDENTIALS_PATH} '
'aws s3 sync --no-follow-symlinks --exclude="*" '
f'{includes} {base_dir_path} '
f's3://{self.name} '
f's3://{self.name}{sub_dir} '
f'--endpoint {endpoint_url} '
f'--profile={cloudflare.R2_PROFILE_NAME}')
return sync_command
Expand All @@ -2908,11 +2962,12 @@ def get_dir_sync_command(src_dir_path, dest_dir_name):
])
endpoint_url = cloudflare.create_endpoint()
src_dir_path = shlex.quote(src_dir_path)
sub_dir = f'/{self.sub_dir}' if self.sub_dir else ''
sync_command = ('AWS_SHARED_CREDENTIALS_FILE='
f'{cloudflare.R2_CREDENTIALS_PATH} '
f'aws s3 sync --no-follow-symlinks {excludes} '
f'{src_dir_path} '
f's3://{self.name}/{dest_dir_name} '
f's3://{self.name}{sub_dir}/{dest_dir_name} '
f'--endpoint {endpoint_url} '
f'--profile={cloudflare.R2_PROFILE_NAME}')
return sync_command
Expand Down Expand Up @@ -3131,11 +3186,12 @@ def __init__(self,
source: str,
region: Optional[str] = 'us-east',
is_sky_managed: Optional[bool] = None,
sync_on_reconstruction: bool = True):
sync_on_reconstruction: bool = True,
sub_dir: Optional[str] = None):
self.client: 'storage.Client'
self.bucket: 'StorageHandle'
super().__init__(name, source, region, is_sky_managed,
sync_on_reconstruction)
sync_on_reconstruction, sub_dir)
self.bucket_rclone_profile = \
Rclone.generate_rclone_bucket_profile_name(
self.name, Rclone.RcloneClouds.IBM)
Expand Down Expand Up @@ -3324,10 +3380,11 @@ def get_dir_sync_command(src_dir_path, dest_dir_name) -> str:
# .git directory is excluded from the sync
# wrapping src_dir_path with "" to support path with spaces
src_dir_path = shlex.quote(src_dir_path)
sync_command = (
'rclone copy --exclude ".git/*" '
f'{src_dir_path} '
f'{self.bucket_rclone_profile}:{self.name}/{dest_dir_name}')
sub_dir = f'/{self.sub_dir}' if self.sub_dir else ''
sync_command = ('rclone copy --exclude ".git/*" '
f'{src_dir_path} '
f'{self.bucket_rclone_profile}:{self.name}{sub_dir}'
f'/{dest_dir_name}')
return sync_command

def get_file_sync_command(base_dir_path, file_names) -> str:
Expand All @@ -3353,9 +3410,11 @@ def get_file_sync_command(base_dir_path, file_names) -> str:
for file_name in file_names
])
base_dir_path = shlex.quote(base_dir_path)
sync_command = ('rclone copy '
f'{includes} {base_dir_path} '
f'{self.bucket_rclone_profile}:{self.name}')
sub_dir = f'/{self.sub_dir}' if self.sub_dir else ''
sync_command = (
'rclone copy '
f'{includes} {base_dir_path} '
f'{self.bucket_rclone_profile}:{self.name}{sub_dir}')
return sync_command

# Generate message for upload
Expand Down
4 changes: 1 addition & 3 deletions sky/skylet/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,7 @@
# Used for translate local file mounts to cloud storage. Please refer to
# sky/execution.py::_maybe_translate_local_file_mounts_and_sync_up for
# more details.
WORKDIR_BUCKET_NAME = 'skypilot-workdir-{username}-{id}'
FILE_MOUNTS_BUCKET_NAME = 'skypilot-filemounts-folder-{username}-{id}'
FILE_MOUNTS_FILE_ONLY_BUCKET_NAME = 'skypilot-filemounts-files-{username}-{id}'
FILE_MOUNTS_BUCKET_NAME = 'skypilot-filemounts-{username}-{id}'
FILE_MOUNTS_LOCAL_TMP_DIR = 'skypilot-filemounts-files-{id}'
FILE_MOUNTS_REMOTE_TMP_DIR = '/tmp/sky-{}-filemounts-files'

Expand Down
Loading
Loading