Skip to content

Commit

Permalink
Update lsst-backup.py
Browse files Browse the repository at this point in the history
replace from dask branch
  • Loading branch information
davedavemckay authored Sep 10, 2024
1 parent 31082f7 commit 682aee2
Showing 1 changed file with 16 additions and 31 deletions.
47 changes: 16 additions & 31 deletions csd3-side/scripts/lsst-backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
Returns:
None
"""
import gc
import sys
import os
from itertools import repeat
Expand Down Expand Up @@ -73,31 +72,6 @@ def find_metadata(key: str, bucket) -> List[str]:
else:
return None

def find_metadata(key: str, s3) -> list[str]:
"""
Finds the metadata for a given key in an S3 bucket.
Args:
key (str): The key to search for metadata.
s3: The S3 object.
Returns:
list[str]: A list of existing metadata contents if found, otherwise empty list.
"""
existing_zip_contents = None
if key.endswith('.zip'):
try:
existing_zip_contents = str(s3.Object(bucket_name,''.join([key,'.metadata'])).get()['Body'].read().decode('UTF-8')).split(';')
except Exception as e:
try:
existing_zip_contents = s3.Object(bucket_name,key).metadata['zip-contents'].split(';')
except KeyError:
return None
if existing_zip_contents:
return existing_zip_contents
else:
return None


def remove_duplicates(l: list[dict]) -> list[dict]:
return pd.DataFrame(l).drop_duplicates().to_dict(orient='records')
Expand Down Expand Up @@ -373,7 +347,7 @@ def upload_to_bucket_collated(s3_host, access_key, secret_key, bucket_name, fold
- Use multipart upload for large files
"""
# Do metadata first so its URI can be added to up_upload on initiation
metadata_value = '|'.join(zip_contents) # use pipe as separator
metadata_value = '\0'.join(zip_contents) # use null byte as separator
metadata_object_key = object_key + '.metadata'
bucket.put_object(Body=metadata_value, Key=metadata_object_key, Metadata={'corresponding-zip': object_key})
metadata = {'zip-contents-object': metadata_object_key}
Expand Down Expand Up @@ -416,7 +390,6 @@ def upload_to_bucket_collated(s3_host, access_key, secret_key, bucket_name, fold
- Upload the file to the bucket
"""
print(f'Uploading zip file "{filename}" ({file_data_size} bytes) to {bucket_name}/{object_key}')

metadata_value = '|'.join(zip_contents) # use | as separator
metadata_size = len(metadata_value.encode('utf-8'))

Expand All @@ -440,6 +413,7 @@ def upload_to_bucket_collated(s3_host, access_key, secret_key, bucket_name, fold
exit(1)
else:
checksum_string = "DRYRUN"

del file_data # Delete the file data to free up memory

"""
Expand Down Expand Up @@ -494,7 +468,6 @@ def print_stats(file_name_or_data, file_count, total_size, file_start, file_end,
del file_name_or_data

def upload_and_callback(s3_host, access_key, secret_key, bucket_name, folder, file_name_or_data, zip_contents, object_key, perform_checksum, dryrun, processing_start, file_count, folder_files_size, total_size_uploaded, total_files_uploaded, collated, mem_per_worker) -> None:

# upload files in parallel and log output
file_start = datetime.now()
print(f'collated = {collated}', flush=True)
Expand Down Expand Up @@ -545,8 +518,9 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
total_size_uploaded = 0
total_files_uploaded = 0
i = 0
# Put data in distributed memory

client.scatter([s3_host, access_key, secret_key, bucket_name, perform_checksum, dryrun, current_objects], broadcast=True)

#recursive loop over local folder
to_collate = {} # store folders to collate
total_all_folders = 0
Expand Down Expand Up @@ -722,6 +696,8 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
f.write(f'Unexpected error: {e}\n')
# Exit gracefully
sys.exit(1)

# release block of files if the list for results is greater than 4 times the number of processes

elif len(folder_files) > 0 and global_collate: # small files in folder
folder_files_size = np.sum(np.array([os.lstat(filename).st_size for filename in folder_files]))
Expand Down Expand Up @@ -789,6 +765,7 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
# collate folders
if len(to_collate) > 0:
print(f'Collating {len([to_collate[parent_folder]["folders"] for parent_folder in to_collate.keys()])} folders into zip files.') #{sum([len(x["zips"]) for x in to_collate.keys()])}

# call zip_folder in parallel
for zip_tuple in to_collate.items():
parent_folder = zip_tuple[0]
Expand All @@ -797,7 +774,6 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
folders = zip_tuple[1]['folders']
folder_files = zip_tuple[1]['folder_files']
num_files = sum([len(ff) for ff in folder_files])

try:
max_filesize = max([max([os.lstat(filename).st_size for filename in ff]) for ff in folder_files])
except ValueError:
Expand All @@ -806,6 +782,7 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,

max_files_per_zip = int(np.ceil(1024**3 / max_filesize)) # limit zips to 1 GiB - using available memory too inconsistent
num_zips = int(np.ceil(num_files / max_files_per_zip))

chunk_subfolders = False
if num_zips > len(folders):
chunk_subfolders = True
Expand Down Expand Up @@ -833,8 +810,11 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
chunk_files,
repeat(use_compression),
repeat(dryrun),
# repeat(processing_start),
[i for i in range(len(chunks))],
repeat(mem_per_worker),
# repeat(total_size_uploaded),
# repeat(total_files_uploaded),
)):
zip_futures.append(client.submit(
zip_folders,
Expand All @@ -843,6 +823,7 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,

for zip_future in as_completed(zip_futures):
parent_folder, id, zip_data = zip_future.result()

with zipfile.ZipFile(io.BytesIO(zip_data), 'r') as z:
zip_contents = z.namelist()
to_collate[parent_folder]['zips'].append({'zip_contents':zip_contents, 'id':id, 'zip_object_name':str(os.sep.join([destination_dir, os.path.relpath(f'{parent_folder}/collated_{id}.zip', local_dir)]))})
Expand Down Expand Up @@ -900,9 +881,11 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
# End loop if all futures are finished (or failed)
if 'pending' not in [f.status for f in upload_futures+zul_futures+zip_futures]:
break

####
# Monitor upload tasks
####

if failed:
for i, failed_upload in enumerate(failed):
print(f'Error upload {i}:\nException: {failed_upload[0]}\nTraceback: {failed_upload[1]}')
Expand Down Expand Up @@ -1103,6 +1086,8 @@ def error(self, message):
else:
current_objects['METADATA'] = None

# current_objects.to_csv('current_objects.csv', index=False)

## check if log exists in the bucket, and download it and append top it if it does
# TODO: integrate this with local check for log file
if current_objects['CURRENT_OBJECTS'].isin([log]).any():
Expand Down

0 comments on commit 682aee2

Please sign in to comment.