Skip to content

Commit

Permalink
78 swift dask (#80)
Browse files Browse the repository at this point in the history
* added api option to CLI

* adding swift helper functions to bm and allowing api choice in lsst_backup.py

* made swift version of object_list

* main done

* tested download_file_swift

* print api name

* .

* fix cli

* .

* fix find_metadata args

* .

* removed large file

* initiate Dask before checking metadata

* print Dask details

* added timing

* parallel metadata search

* download keys for swift api

* explicit dask dataframe calls

* npartitions

* test boto3

* genuinely excited

* pass api to process_files

* undo test change

* .

* partition by worker count

* .

* 21 seconds to go

* n_workers*100 partitions

* .

* nightly - looking at line 692

* adding swift functions

* added swift test script for large file uplaod

* .

* fixed paths

* .

* create SwiftUploadObjects

* add SwiftUploadObject to bm

* .

* try service.upload

* .

* .

* did it upload?

* prepend remote path

* use generator

* use auth_version 1

* export within Python

* typo

* bm version CI

* updatae (#79)

* dask can serialise swift!!

* Create bm_version.yml

* push branch

* .

* doublequotes

* update in bm folder

* test gh actions

* bm version: updated to -dev-

* proper dev version naming

* bm version: updated to bm_version-dev-git_hash

* remove file

* bm version: updated to bm_version-dev-git_hash

* finished versioning actions

* bm version: updated to bm_version-dev-git_hash

* remove option to switch off checksumming

* no_checksum option deprecated

* always use separate object for zip-contents metadata (as S3 and Swift have differenve value size limitations)

* .

* .

* remove exit

* prepend object name

* print result

* print result

* .

* .

* try local path in segmented_upload

* segmented upload may just work

* still segment, but give original path for each segment

* prepend all S3 host URLs with https://

* put_container before list contents

* print put results

* print from generator

* if not None

* try None

* prepend s3_host iwth https://

* print remote path

* use remote path

* was giving bucket_name in as source

* https

* .

* segment upload working in swift_tests.py

* separate container

* print all results

* swap bucket and segment bucket

* https

* print results

* time test

* try remote path

* large file path in SwiftUploadObjects

* pass object_name to SwiftUploadObject

* use only environment variables for creds

* bm version: updated to bm_version-dev-git_hash

* test environment from bm

* bm version: updated to bm_version-dev-git_hash

* test with only env variables

* ST_KEY *not* ST_SECRET

* bm version: updated to bm_version-dev-git_hash

* default api is now swift!

* bm version: updated to bm_version-dev-git_hash

* all done except Swfit segment upload

* segment upload done - need to print success/fail from results

* update zip_and_upload

* pass api

* update remove_bucket

* update list_buckets.py

* updating DAGs to use env vars

* try content_type None

* update bucket_contents to use new env vars

* try checksum string as etag

* remove file_data_size line

* send mem_check info to stderr

* done.

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
davedavemckay and github-actions[bot] authored Nov 21, 2024
1 parent 533c1bb commit b995cd7
Show file tree
Hide file tree
Showing 17 changed files with 1,164 additions and 447 deletions.
19 changes: 11 additions & 8 deletions .github/workflows/bm_version.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ name: bm version CI

on:
push:
branches:
- "**"
paths:
- bucket_manager/**

Expand All @@ -13,21 +15,22 @@ jobs:

steps:
- uses: actions/checkout@v4
- name: get bm version
run: |
grep "version" csd3-echo-somerville/setup.py | awk -F\' '{print $2}' | awk -F\- '{print $1}' > version.txt
- name: set bm version
run: |
bm_version=$(cat version.txt)
git_hash=$(git rev-parse --short "$GITHUB_SHA")
sed -i "s/version.*/version='${bm_version}-dev-${git_hash}'/" csd3-echo-somerville/setup.py
bm_version=$(grep "version" setup.py | awk -F\' '{print $2}')
if [[ $bm_version == *"dev"* ]]; then
new_bm_version=$(echo $bm_version | awk -Fv '{print $1"v"$2+1}')
else
new_bm_version=${bm_version}.dev1
fi
sed -i "s/$bm_version/$new_bm_version/" setup.py
- name: commit bm version CI
run: |
git config --global user.name "github-actions[bot]"
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
git add csd3-echo-somerville/setup.py
git commit -m "bm version: updated to ${bm_version}-dev-${git_hash}"
git add setup.py
git commit -m "bm version: updated to bm_version-dev-git_hash"
- name: push bm version CI
run: |
git push
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ e.g., to list current bucket, with environemnt variables for S3 access set on th

```shell
docker run \
-e ECHO_S3_ACCESS_KEY=$ECHO_S3_ACCESS_KEY \
-e ECHO_S3_SECRET_KEY=$ECHO_S3_SECRET_KEY \
-e ECHO_S3_ACCESS_KEY=$S3_ACCESS_KEY \
-e ECHO_S3_SECRET_KEY=$S3_SECRET_KEY \
ghcr.io/lsst-uk/csd3-echo-somerville:latest python csd3-echo-somerville/scripts/list_buckets.py
```
179 changes: 143 additions & 36 deletions bucket_manager/bucket_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
from botocore.exceptions import ClientError
import swiftclient
import swiftclient.service

def print_buckets(resource) -> None:
"""
Expand All @@ -21,7 +22,7 @@ def print_buckets(resource) -> None:
for b in resource.buckets.all():
print(b.name)

def get_keys(api: str ='S3') -> None:
def check_keys(api: str ='swift') -> None:
"""
Retrieves the access key and secret key for the specified API.
Expand All @@ -32,64 +33,79 @@ def get_keys(api: str ='S3') -> None:
For S3 API: a dictionary containing the access key and secret key.
For Swift API: a dictionary containing the user and secret key.
"""
if api == 'S3':
if api == 's3':
try:
access_key = os.environ['ECHO_S3_ACCESS_KEY']
secret_key = os.environ['ECHO_S3_SECRET_KEY']
except KeyError:
raise KeyError('Set ECHO_S3_ACCESS_KEY and ECHO_S3_SECRET_KEY environment variables.')
return {'access_key': access_key, 'secret_key': secret_key}
elif api == 'Swift':
assert 'S3_ACCESS_KEY' in os.environ
assert 'S3_SECRET_KEY' in os.environ
assert 'S3_HOST_URL' in os.environ
except AssertionError:
raise AssertionError('Set S3_ACCESS_KEY, S3_SECRET_KEY and S3_HOST_URL environment variables.')
return True

elif api == 'swift':
try:
user = os.environ['ECHO_SWIFT_USER']
secret_key = os.environ['ECHO_SWIFT_SECRET_KEY']
except KeyError:
raise KeyError('Set ECHO_SWIFT_USER and ECHO_SWIFT_SECRET_KEY environment variables.')
return {'user': user, 'secret_key': secret_key}
assert 'ST_USER' in os.environ
assert 'ST_KEY' in os.environ
assert 'ST_AUTH' in os.environ
except AssertionError:
raise AssertionError('Set ST_USER, ST_KEY and ST_AUTH environment variables.')
return True
else:
raise ValueError(f'Invalid API: {api}')

def get_resource(access_key: str, secret_key: str, s3_host: str):
def get_resource():
"""
Creates and returns an S3 resource object for the specified S3 endpoint.
Parameters:
- access_key: The access key for the S3 endpoint.
- secret_key: The secret key for the S3 endpoint.
- s3_host: The hostname of the S3 endpoint.
Requires the following environment variables to be set:
- S3_ACCESS_KEY: The S3 access key.
- S3_SECRET_KEY: The S3 secret key.
- S3_HOST_URL: The S3 endpoint URL.
Returns:
An S3 resource object.
"""
try:
creds = {'access_key': os.environ['S3_ACCESS_KEY'],
'secret_key': os.environ['S3_SECRET_KEY'],
'host_url': os.environ['S3_HOST_URL']}
except KeyError as e:
raise KeyError('Set S3_ACCESS_KEY, S3_SECRET_KEY and S3_HOST_URL environment variables.')
session = boto3.Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key
aws_access_key_id=creds['access_key'],
aws_secret_access_key=creds['secret_key']
)
return session.resource(
service_name='s3',
endpoint_url=f'https://{s3_host}',
endpoint_url=creds['host_url'],
verify=False # Disable SSL verification for non-AWS S3 endpoints
)

def get_client(access_key: str, secret_key: str, s3_host:str):
def get_client():
"""
Creates and returns an S3 client object for the specified S3 endpoint.
Parameters:
- access_key: The access key for the S3 endpoint.
- secret_key: The secret key for the S3 endpoint.
- s3_host: The hostname of the S3 endpoint.
Requires the following environment variables to be set:
- S3_ACCESS_KEY: The S3 access key.
- S3_SECRET_KEY: The S3 secret key.
- S3_HOST_URL: The S3 endpoint URL.
Returns:
An S3 client object.
"""
try:
creds = {'access_key': os.environ['S3_ACCESS_KEY'],
'secret_key': os.environ['S3_SECRET_KEY'],
'host_url': os.environ['S3_HOST_URL']}
except KeyError as e:
raise KeyError('Set S3_ACCESS_KEY, S3_SECRET_KEY and S3_HOST_URL environment variables.')
session = boto3.Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key
aws_access_key_id=creds['access_key'],
aws_secret_access_key=creds['secret_key']
)
return session.client(
service_name='s3',
endpoint_url=f'https://{s3_host}',
endpoint_url=creds['host_url'],
verify=False # Disable SSL verification for non-AWS S3 endpoints
)

Expand All @@ -105,6 +121,18 @@ def bucket_list(resource) -> list[str]:
"""
return [ b.name for b in resource.buckets.all() ]

def bucket_list_swift(conn: swiftclient.Connection) -> list[str]:
"""
Returns a list of container names in the Swift S3 endpoint.
Parameters:
- conn: swiftclient.client.Connection object.
Returns:
A list of container names.
"""
return [ c['name'] for c in conn.get_account()[1] ]

def create_bucket(resource, bucket_name: str) -> bool:
"""
Creates a new bucket in the S3 endpoint.
Expand Down Expand Up @@ -155,6 +183,27 @@ def object_list(bucket, prefix='', count=False) -> list[str]:
print(f'Existing objects: {o}', end='\r', flush=True)
return keys

def object_list_swift(conn: swiftclient.Connection, container_name: str, prefix : str = '', full_listing: bool = True, count: bool = False) -> list[str]:
"""
Returns a list of keys of all objects in the specified bucket.
Parameters:
- conn: swiftlient.Connection object.
- container_name: The name of the Swift container.
Returns:
A list of object keys.
"""
keys = []
if count:
o = 0
for obj in conn.get_container(container_name,prefix=prefix,full_listing=full_listing)[1]:
keys.append(obj['name'])
if count:
o += 1
if o % 10000 == 0:
print(f'Existing objects: {o}', end='\r', flush=True)
return keys

def print_containers_swift(conn: swiftclient.Connection) -> None:
"""
Expand Down Expand Up @@ -182,20 +231,78 @@ def print_contents_swift(conn: swiftclient.Connection, container_name: str) -> N
for data in conn.get_container(container_name)[1]:
print('{0}\t{1}\t{2}'.format(data['name'], data['bytes'], data['last_modified']))

def get_conn_swift(user: str, access_key: str, host: str) -> swiftclient.Connection:
def get_conn_swift() -> swiftclient.Connection:
"""
Creates and returns a Swift connection object for the specified Swift endpoint.
Requires the following environment variables to be set:
- ST_USER: The Swift user.
- ST_KEY: The Swift secret key.
- ST_AUTH: The Swift authentication URL.
Returns:
A Swift connection object.
"""
try:
creds = {'user': os.environ['ST_USER'],
'key': os.environ['ST_KEY'],
'authurl': os.environ['ST_AUTH']}
except KeyError as e:
raise KeyError('Set ST_USER, ST_KEY and ST_AUTH environment variables.')
return swiftclient.Connection(
user=creds['user'],
key=creds['key'],
authurl=creds['authurl']
)

def get_service_swift() -> swiftclient.service.SwiftService:
"""
Creates and returns a Swift service object for the specified Swift endpoint.
Parameters:
- user: The Swift user.
- access_key: The Swift access key.
- secret_key: The Swift secret key.
- host: The Swift authentication URL.
Returns:
A Swift connection object.
A Swift service object.
"""
return swiftclient.Connection(
user=user,
key=access_key,
authurl=host
try:
creds = {'user': os.environ['ST_USER'],
'key': os.environ['ST_KEY'],
'authurl': os.environ['ST_AUTH']}
except KeyError as e:
raise KeyError('Set ST_USER, ST_KEY and ST_AUTH environment variables.')
return swiftclient.service.SwiftService(
{
'auth_version': '1',
'os_auth_url': creds['authurl'],
'os_username': creds['user'].split(':')[1],
'os_password': creds['key'],
'os_project_name': creds['user'].split(':')[0],
'os_user_domain_name': 'default',
'os_project_domain_name': 'default'
}
)

def get_SwiftUploadObject(source, object_name=None, options=None) -> swiftclient.service.SwiftUploadObject:
return swiftclient.service.SwiftUploadObject(source, object_name, options)

def download_file_swift(conn: swiftclient.Connection, container_name: str, object_name: str, local_path: str) -> None:
"""
Downloads a file from the specified container.
Parameters:
- conn: The Swift connection object.
- container_name: The name of the Swift container.
- object_name: The name of the object to download.
- local_path: The local path to save the downloaded file.
WARNING: This function will overwrite the file at local_path if it already exists.
Returns:
None
"""
os.makedirs(os.path.dirname(local_path), exist_ok=True)
with open(local_path, 'wb') as f:
f.write(conn.get_object(container_name, object_name)[1])
1 change: 1 addition & 0 deletions bucket_manager/updated
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fri Nov 15 17:25:55 GMT 2024
Loading

0 comments on commit b995cd7

Please sign in to comment.