Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Jobs] Allowing to specify intermediate bucket for file upload #4257

Open
wants to merge 53 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
bbfe91a
debug
zpoint Nov 3, 2024
043dd28
support workdir_bucket_name config on yaml file
zpoint Nov 4, 2024
bc767c8
change the match statement to if else due to mypy limit
zpoint Nov 4, 2024
8200400
pass mypy
zpoint Nov 4, 2024
0e395ca
yapf format fix
zpoint Nov 4, 2024
2b1a063
reformat
zpoint Nov 4, 2024
fa9cc69
remove debug line
zpoint Nov 4, 2024
4c98e11
all dir to same bucket
zpoint Nov 6, 2024
144a4a3
private member function
zpoint Nov 6, 2024
42de23a
fix mypy
zpoint Nov 6, 2024
888c0fa
support sub dir config to separate to different directory
zpoint Nov 7, 2024
3b1adcc
rename and add smoke test
zpoint Nov 8, 2024
06c7891
bucketname
zpoint Nov 9, 2024
3aaf0f1
support sub dir mount
zpoint Nov 11, 2024
0116fa0
private member for _bucket_sub_path and smoke test fix
zpoint Nov 12, 2024
0750900
support copy mount for sub dir
zpoint Nov 12, 2024
51dfcd3
support gcs, s3 delete folder
zpoint Nov 12, 2024
6ba05cc
doc
zpoint Nov 12, 2024
1751fab
r2 remove_objects_from_sub_path
zpoint Nov 12, 2024
3bb60c8
support azure remove directory and cos remove
zpoint Nov 13, 2024
415a089
doc string for remove_objects_from_sub_path
zpoint Nov 13, 2024
f0f1fe1
fix sky jobs subdir issue
zpoint Nov 13, 2024
1a62d06
test case update
zpoint Nov 13, 2024
79ea48a
rename to _bucket_sub_path
zpoint Nov 14, 2024
8fd2c8c
change the config schema
zpoint Nov 14, 2024
7d57aef
setter
zpoint Nov 14, 2024
5da18de
bug fix and test update
zpoint Nov 14, 2024
7c82fd7
delete bucket depends on user config or sky generated
zpoint Nov 15, 2024
cb2a574
add test case
zpoint Nov 15, 2024
df60317
smoke test bug fix
zpoint Nov 15, 2024
a2efd8c
robust smoke test
zpoint Nov 18, 2024
c43c705
fix comment
zpoint Nov 26, 2024
cc8a8a6
bug fix
zpoint Nov 26, 2024
b939f7f
set the storage manually
zpoint Nov 27, 2024
fb0edf5
better structure
zpoint Nov 27, 2024
b640be6
merge master
zpoint Nov 27, 2024
e4619cb
fix mypy
zpoint Nov 27, 2024
abe0d99
Update docs/source/reference/config.rst
zpoint Dec 2, 2024
1669fb8
Update docs/source/reference/config.rst
zpoint Dec 2, 2024
0893794
Merge branch 'master' into dev/zeping/specific_bucket_for_sky_jobs
zpoint Dec 2, 2024
5d7ea0f
limit creation for bucket and delete sub dir only
zpoint Dec 2, 2024
fc2d48e
resolve comment
zpoint Dec 2, 2024
e032fad
Update docs/source/reference/config.rst
zpoint Dec 14, 2024
284b46d
Update sky/utils/controller_utils.py
zpoint Dec 14, 2024
86ffe01
resolve PR comment
zpoint Dec 14, 2024
0613905
bug fix
zpoint Dec 14, 2024
bfd9fd5
merge master
zpoint Dec 14, 2024
d1ae190
bug fix
zpoint Dec 14, 2024
469cede
fix test case
zpoint Dec 14, 2024
faedd28
bug fix
zpoint Dec 14, 2024
8576e14
fix
zpoint Dec 14, 2024
94a0d43
fix test case
zpoint Dec 16, 2024
d4e28dc
bug fix
zpoint Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,29 @@ def get_endpoint_url(cls, store: 'AbstractStore', path: str) -> str:
return bucket_endpoint_url


class StorePrefix(enum.Enum):
"""Enum for the prefix of different stores."""
S3 = 's3://'
GCS = 'gs://'
AZURE = 'https://'
R2 = 'r2://'
IBM = 'cos://'

def to_store_type(self) -> StoreType:
if self == StorePrefix.S3:
return StoreType.S3
elif self == StorePrefix.GCS:
return StoreType.GCS
elif self == StorePrefix.AZURE:
return StoreType.AZURE
elif self == StorePrefix.R2:
return StoreType.R2
elif self == StorePrefix.IBM:
return StoreType.IBM
else:
raise ValueError(f'Unknown store prefix: {self}')
zpoint marked this conversation as resolved.
Show resolved Hide resolved


class StorageMode(enum.Enum):
MOUNT = 'MOUNT'
COPY = 'COPY'
Expand Down
4 changes: 1 addition & 3 deletions sky/skylet/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,7 @@
# Used for translate local file mounts to cloud storage. Please refer to
# sky/execution.py::_maybe_translate_local_file_mounts_and_sync_up for
# more details.
WORKDIR_BUCKET_NAME = 'skypilot-workdir-{username}-{id}'
FILE_MOUNTS_BUCKET_NAME = 'skypilot-filemounts-folder-{username}-{id}'
FILE_MOUNTS_FILE_ONLY_BUCKET_NAME = 'skypilot-filemounts-files-{username}-{id}'
FILE_MOUNTS_BUCKET_NAME = 'skypilot-filemounts-{username}-{id}'
FILE_MOUNTS_LOCAL_TMP_DIR = 'skypilot-filemounts-files-{id}'
FILE_MOUNTS_REMOTE_TMP_DIR = '/tmp/sky-{}-filemounts-files'

Expand Down
41 changes: 29 additions & 12 deletions sky/utils/controller_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
zpoint marked this conversation as resolved.
Show resolved Hide resolved
import tempfile
import typing
from typing import Any, Dict, Iterable, List, Optional, Set
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple

import colorama

Expand Down Expand Up @@ -676,6 +676,21 @@ def replace_skypilot_config_path_in_file_mounts(
f'with the real path in file mounts: {file_mounts}')


def _get_bucket_name_and_store_type_from_job_config(
) -> Tuple[Optional[str], Optional[str]]:
bucket_wth_prefix = skypilot_config.get_nested(('jobs', 'bucket'), None)
if bucket_wth_prefix is None:
return None, None

for prefix in storage_lib.StorePrefix:
if bucket_wth_prefix.startswith(prefix.value):
bucket_name = bucket_wth_prefix[len(prefix.value):]
store = prefix.to_store_type().value
return bucket_name, store

raise ValueError(f'Invalid bucket name with prefix: {bucket_wth_prefix}')


def maybe_translate_local_file_mounts_and_sync_up(task: 'task_lib.Task',
path: str) -> None:
"""Translates local->VM mounts into Storage->VM, then syncs up any Storage.
Expand Down Expand Up @@ -717,11 +732,16 @@ def maybe_translate_local_file_mounts_and_sync_up(task: 'task_lib.Task',
ux_utils.spinner_message(
f'Translating {msg} to SkyPilot Storage...'))

# Get the bucket name for the workdir and file mounts,
# we stores all these files in same bucket from config.
zpoint marked this conversation as resolved.
Show resolved Hide resolved
bucket_name, store = _get_bucket_name_and_store_type_from_job_config()
if bucket_name is None:
bucket_name = constants.FILE_MOUNTS_BUCKET_NAME.format(
username=common_utils.get_cleaned_username(), id=run_id)

# Step 1: Translate the workdir to SkyPilot storage.
new_storage_mounts = {}
if task.workdir is not None:
bucket_name = constants.WORKDIR_BUCKET_NAME.format(
username=common_utils.get_cleaned_username(), id=run_id)
workdir = task.workdir
task.workdir = None
if (constants.SKY_REMOTE_WORKDIR in original_file_mounts or
Expand All @@ -736,6 +756,7 @@ def maybe_translate_local_file_mounts_and_sync_up(task: 'task_lib.Task',
'source': workdir,
'persistent': False,
'mode': 'COPY',
'store': store,
})
# Check of the existence of the workdir in file_mounts is done in
# the task construction.
Expand All @@ -754,15 +775,12 @@ def maybe_translate_local_file_mounts_and_sync_up(task: 'task_lib.Task',
if os.path.isfile(os.path.abspath(os.path.expanduser(src))):
copy_mounts_with_file_in_src[dst] = src
continue
bucket_name = constants.FILE_MOUNTS_BUCKET_NAME.format(
username=common_utils.get_cleaned_username(),
id=f'{run_id}-{i}',
)
new_storage_mounts[dst] = storage_lib.Storage.from_yaml_config({
'name': bucket_name,
'source': src,
'persistent': False,
'mode': 'COPY',
'store': store,
})
logger.info(f' {colorama.Style.DIM}Folder : {src!r} '
f'-> storage: {bucket_name!r}.{colorama.Style.RESET_ALL}')
Expand All @@ -773,8 +791,6 @@ def maybe_translate_local_file_mounts_and_sync_up(task: 'task_lib.Task',
tempfile.gettempdir(),
constants.FILE_MOUNTS_LOCAL_TMP_DIR.format(id=run_id))
os.makedirs(local_fm_path, exist_ok=True)
file_bucket_name = constants.FILE_MOUNTS_FILE_ONLY_BUCKET_NAME.format(
username=common_utils.get_cleaned_username(), id=run_id)
file_mount_remote_tmp_dir = constants.FILE_MOUNTS_REMOTE_TMP_DIR.format(
path)
if copy_mounts_with_file_in_src:
Expand All @@ -786,10 +802,11 @@ def maybe_translate_local_file_mounts_and_sync_up(task: 'task_lib.Task',

new_storage_mounts[
file_mount_remote_tmp_dir] = storage_lib.Storage.from_yaml_config({
'name': file_bucket_name,
'name': bucket_name,
zpoint marked this conversation as resolved.
Show resolved Hide resolved
'source': local_fm_path,
'persistent': False,
'mode': 'MOUNT',
'store': store,
})
if file_mount_remote_tmp_dir in original_storage_mounts:
with ux_utils.print_exception_no_traceback():
Expand All @@ -800,7 +817,7 @@ def maybe_translate_local_file_mounts_and_sync_up(task: 'task_lib.Task',
sources = list(src_to_file_id.keys())
sources_str = '\n '.join(sources)
logger.info(f' {colorama.Style.DIM}Files (listed below) '
f' -> storage: {file_bucket_name}:'
f' -> storage: {bucket_name}:'
f'\n {sources_str}{colorama.Style.RESET_ALL}')
rich_utils.force_update_status(
ux_utils.spinner_message('Uploading translated local files/folders'))
Expand Down Expand Up @@ -849,7 +866,7 @@ def maybe_translate_local_file_mounts_and_sync_up(task: 'task_lib.Task',
store_type = list(storage_obj.stores.keys())[0]
store_object = storage_obj.stores[store_type]
bucket_url = storage_lib.StoreType.get_endpoint_url(
store_object, file_bucket_name)
store_object, bucket_name)
for dst, src in copy_mounts_with_file_in_src.items():
file_id = src_to_file_id[src]
new_file_mounts[dst] = bucket_url + f'/file-{file_id}'
Expand Down
9 changes: 8 additions & 1 deletion sky/utils/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Schemas conform to the JSON Schema specification as defined at
https://json-schema.org/
"""
import copy
import enum
from typing import Any, Dict, List, Tuple

Expand Down Expand Up @@ -707,6 +708,12 @@ def get_config_schema():
},
}
}
jobs_configs = copy.deepcopy(controller_resources_schema)
jobs_configs['properties']['bucket'] = {
'type': 'string',
'pattern': '^(https|s3|gs|r2|cos)://.+',
'required': []
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should include it in controller_resources_schema and enable this feature for SkyServe as well?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adde to controller_resources_schema

Could u point me out the code to change if I need to enable for SkyServe?

cloud_configs = {
'aws': {
'type': 'object',
Expand Down Expand Up @@ -927,7 +934,7 @@ def get_config_schema():
'required': [],
'additionalProperties': False,
'properties': {
'jobs': controller_resources_schema,
'jobs': jobs_configs,
'spot': controller_resources_schema,
'serve': controller_resources_schema,
'allowed_clouds': allowed_clouds,
Expand Down
Loading