Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Jobs] Allowing to specify intermediate bucket for file upload #4257

Open
wants to merge 51 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
bbfe91a
debug
zpoint Nov 3, 2024
043dd28
support workdir_bucket_name config on yaml file
zpoint Nov 4, 2024
bc767c8
change the match statement to if else due to mypy limit
zpoint Nov 4, 2024
8200400
pass mypy
zpoint Nov 4, 2024
0e395ca
yapf format fix
zpoint Nov 4, 2024
2b1a063
reformat
zpoint Nov 4, 2024
fa9cc69
remove debug line
zpoint Nov 4, 2024
4c98e11
all dir to same bucket
zpoint Nov 6, 2024
144a4a3
private member function
zpoint Nov 6, 2024
42de23a
fix mypy
zpoint Nov 6, 2024
888c0fa
support sub dir config to separate to different directory
zpoint Nov 7, 2024
3b1adcc
rename and add smoke test
zpoint Nov 8, 2024
06c7891
bucketname
zpoint Nov 9, 2024
3aaf0f1
support sub dir mount
zpoint Nov 11, 2024
0116fa0
private member for _bucket_sub_path and smoke test fix
zpoint Nov 12, 2024
0750900
support copy mount for sub dir
zpoint Nov 12, 2024
51dfcd3
support gcs, s3 delete folder
zpoint Nov 12, 2024
6ba05cc
doc
zpoint Nov 12, 2024
1751fab
r2 remove_objects_from_sub_path
zpoint Nov 12, 2024
3bb60c8
support azure remove directory and cos remove
zpoint Nov 13, 2024
415a089
doc string for remove_objects_from_sub_path
zpoint Nov 13, 2024
f0f1fe1
fix sky jobs subdir issue
zpoint Nov 13, 2024
1a62d06
test case update
zpoint Nov 13, 2024
79ea48a
rename to _bucket_sub_path
zpoint Nov 14, 2024
8fd2c8c
change the config schema
zpoint Nov 14, 2024
7d57aef
setter
zpoint Nov 14, 2024
5da18de
bug fix and test update
zpoint Nov 14, 2024
7c82fd7
delete bucket depends on user config or sky generated
zpoint Nov 15, 2024
cb2a574
add test case
zpoint Nov 15, 2024
df60317
smoke test bug fix
zpoint Nov 15, 2024
a2efd8c
robust smoke test
zpoint Nov 18, 2024
c43c705
fix comment
zpoint Nov 26, 2024
cc8a8a6
bug fix
zpoint Nov 26, 2024
b939f7f
set the storage manually
zpoint Nov 27, 2024
fb0edf5
better structure
zpoint Nov 27, 2024
b640be6
merge master
zpoint Nov 27, 2024
e4619cb
fix mypy
zpoint Nov 27, 2024
abe0d99
Update docs/source/reference/config.rst
zpoint Dec 2, 2024
1669fb8
Update docs/source/reference/config.rst
zpoint Dec 2, 2024
0893794
Merge branch 'master' into dev/zeping/specific_bucket_for_sky_jobs
zpoint Dec 2, 2024
5d7ea0f
limit creation for bucket and delete sub dir only
zpoint Dec 2, 2024
fc2d48e
resolve comment
zpoint Dec 2, 2024
e032fad
Update docs/source/reference/config.rst
zpoint Dec 14, 2024
284b46d
Update sky/utils/controller_utils.py
zpoint Dec 14, 2024
86ffe01
resolve PR comment
zpoint Dec 14, 2024
0613905
bug fix
zpoint Dec 14, 2024
bfd9fd5
merge master
zpoint Dec 14, 2024
d1ae190
bug fix
zpoint Dec 14, 2024
469cede
fix test case
zpoint Dec 14, 2024
faedd28
bug fix
zpoint Dec 14, 2024
8576e14
fix
zpoint Dec 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/source/reference/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ Available fields and semantics:
#
# Ref: https://skypilot.readthedocs.io/en/latest/examples/managed-jobs.html#customizing-job-controller-resources
jobs:
bucket:
# Bucket to store managed jobs mount files and tmp files.
# Its optional, if not set, SkyPilot will create a new bucket for each managed jobs .
# Support s3, gcs, azure, r2, ibm, default.
s3: "bucket-jobs-s3"
gcs: "bucket-jobs-gcs"
default: "bucket-jobs-default"
controller:
resources: # same spec as 'resources' in a task YAML
cloud: gcp
Expand Down
10 changes: 9 additions & 1 deletion sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3505,7 +3505,15 @@ def _teardown_ephemeral_storage(self, task: task_lib.Task) -> None:
if storage_mounts is not None:
for _, storage in storage_mounts.items():
if not storage.persistent:
storage.delete()
is_bucket_name_generated_by_sky = (
storage.is_bucket_name_auto_generated_by_sky())
zpoint marked this conversation as resolved.
Show resolved Hide resolved
# If the bucket name is auto-generated by SkyPilot, we keep
# the original behaviour delete the bucket, otherwise, we
# only delete the sub-path if it exists because miltiple
# jobs might share the same bucket, delete bucket could
# potential cause other jobs to fail during file operation
storage.delete(only_delete_sub_path_if_exists=
not is_bucket_name_generated_by_sky)

def _teardown(self,
handle: CloudVmRayResourceHandle,
Expand Down
63 changes: 49 additions & 14 deletions sky/data/mounting_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,19 @@ def get_s3_mount_install_cmd() -> str:
return install_cmd


def get_s3_mount_cmd(bucket_name: str, mount_path: str) -> str:
# pylint: disable=invalid-name
def get_s3_mount_cmd(bucket_name: str,
mount_path: str,
_bucket_sub_path: Optional[str] = None) -> str:
"""Returns a command to mount an S3 bucket using goofys."""
if _bucket_sub_path is None:
_bucket_sub_path = ''
else:
_bucket_sub_path = f':{_bucket_sub_path}'
mount_cmd = ('goofys -o allow_other '
f'--stat-cache-ttl {_STAT_CACHE_TTL} '
f'--type-cache-ttl {_TYPE_CACHE_TTL} '
f'{bucket_name} {mount_path}')
f'{bucket_name}{_bucket_sub_path} {mount_path}')
return mount_cmd


Expand All @@ -49,15 +56,20 @@ def get_gcs_mount_install_cmd() -> str:
return install_cmd


def get_gcs_mount_cmd(bucket_name: str, mount_path: str) -> str:
# pylint: disable=invalid-name
def get_gcs_mount_cmd(bucket_name: str,
mount_path: str,
_bucket_sub_path: Optional[str] = None) -> str:
"""Returns a command to mount a GCS bucket using gcsfuse."""

bucket_sub_path_arg = f'--only-dir {_bucket_sub_path} '\
if _bucket_sub_path else ''
mount_cmd = ('gcsfuse -o allow_other '
'--implicit-dirs '
f'--stat-cache-capacity {_STAT_CACHE_CAPACITY} '
f'--stat-cache-ttl {_STAT_CACHE_TTL} '
f'--type-cache-ttl {_TYPE_CACHE_TTL} '
f'--rename-dir-limit {_RENAME_DIR_LIMIT} '
f'{bucket_sub_path_arg}'
f'{bucket_name} {mount_path}')
return mount_cmd

Expand All @@ -78,10 +90,12 @@ def get_az_mount_install_cmd() -> str:
return install_cmd


# pylint: disable=invalid-name
def get_az_mount_cmd(container_name: str,
storage_account_name: str,
mount_path: str,
storage_account_key: Optional[str] = None) -> str:
storage_account_key: Optional[str] = None,
_bucket_sub_path: Optional[str] = None) -> str:
"""Returns a command to mount an AZ Container using blobfuse2.

Args:
Expand All @@ -90,6 +104,7 @@ def get_az_mount_cmd(container_name: str,
belongs to.
mount_path: Path where the container will be mounting.
storage_account_key: Access key for the given storage account.
_bucket_sub_path: Sub path of the mounting container.

Returns:
str: Command used to mount AZ container with blobfuse2.
Expand All @@ -106,25 +121,38 @@ def get_az_mount_cmd(container_name: str,
cache_path = _BLOBFUSE_CACHE_DIR.format(
storage_account_name=storage_account_name,
container_name=container_name)
if _bucket_sub_path is None:
bucket_sub_path_arg = ''
else:
bucket_sub_path_arg = f'--subdirectory={_bucket_sub_path}/ '
mount_cmd = (f'AZURE_STORAGE_ACCOUNT={storage_account_name} '
f'{key_env_var} '
f'blobfuse2 {mount_path} --allow-other --no-symlinks '
'-o umask=022 -o default_permissions '
f'--tmp-path {cache_path} '
f'{bucket_sub_path_arg}'
f'--container-name {container_name}')
return mount_cmd


def get_r2_mount_cmd(r2_credentials_path: str, r2_profile_name: str,
endpoint_url: str, bucket_name: str,
mount_path: str) -> str:
# pylint: disable=invalid-name
def get_r2_mount_cmd(r2_credentials_path: str,
r2_profile_name: str,
endpoint_url: str,
bucket_name: str,
mount_path: str,
_bucket_sub_path: Optional[str] = None) -> str:
"""Returns a command to install R2 mount utility goofys."""
if _bucket_sub_path is None:
_bucket_sub_path = ''
else:
_bucket_sub_path = f':{_bucket_sub_path}'
mount_cmd = (f'AWS_SHARED_CREDENTIALS_FILE={r2_credentials_path} '
f'AWS_PROFILE={r2_profile_name} goofys -o allow_other '
f'--stat-cache-ttl {_STAT_CACHE_TTL} '
f'--type-cache-ttl {_TYPE_CACHE_TTL} '
f'--endpoint {endpoint_url} '
f'{bucket_name} {mount_path}')
f'{bucket_name}{_bucket_sub_path} {mount_path}')
return mount_cmd


Expand All @@ -136,9 +164,12 @@ def get_cos_mount_install_cmd() -> str:
return install_cmd


def get_cos_mount_cmd(rclone_config_data: str, rclone_config_path: str,
bucket_rclone_profile: str, bucket_name: str,
mount_path: str) -> str:
def get_cos_mount_cmd(rclone_config_data: str,
rclone_config_path: str,
bucket_rclone_profile: str,
bucket_name: str,
mount_path: str,
_bucket_sub_path: Optional[str] = None) -> str:
"""Returns a command to mount an IBM COS bucket using rclone."""
# creates a fusermount soft link on older (<22) Ubuntu systems for
# rclone's mount utility.
Expand All @@ -150,10 +181,14 @@ def get_cos_mount_cmd(rclone_config_data: str, rclone_config_path: str,
'mkdir -p ~/.config/rclone/ && '
f'echo "{rclone_config_data}" >> '
f'{rclone_config_path}')
if _bucket_sub_path is None:
sub_path_arg = f'{bucket_name}/{_bucket_sub_path}'
else:
sub_path_arg = f'/{bucket_name}'
# --daemon will keep the mounting process running in the background.
mount_cmd = (f'{configure_rclone_profile} && '
'rclone mount '
f'{bucket_rclone_profile}:{bucket_name} {mount_path} '
f'{bucket_rclone_profile}:{sub_path_arg} {mount_path} '
'--daemon')
return mount_cmd

Expand Down Expand Up @@ -209,7 +244,7 @@ def get_mounting_script(
script = textwrap.dedent(f"""
#!/usr/bin/env bash
set -e

{command_runner.ALIAS_SUDO_TO_EMPTY_FOR_ROOT_CMD}

MOUNT_PATH={mount_path}
Expand Down
Loading
Loading