Skip to content

Commit

Permalink
22 series.isin (#23)
Browse files Browse the repository at this point in the history
* starting with the exclude list

* save_config using exclude.to_list

* exclude done

* added du.sh sbatch script

* Use callback to free up zip_contect memory

* separating cores used for zipping and upload

* freeing up memory not working and possibly redundant.

* done
  • Loading branch information
davedavemckay authored Aug 5, 2024
1 parent 3ce2a40 commit d5eccdc
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 13 deletions.
38 changes: 29 additions & 9 deletions csd3-side/scripts/lsst-backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ def upload_and_callback(s3_host, access_key, secret_key, bucket_name, folder, fi
logfile.write(f'{result}\n')
return None

# def free_up_zip_memory(to_collate, parent_folder, index):
# del to_collate[parent_folder]['zips'][index]['zip_contents']
# print(f'Deleted zip contents object for {parent_folder}, zip index {index}, to free memory.')

def process_files(s3_host, access_key, secret_key, bucket_name, current_objects, exclude, local_dir, destination_dir, nprocs, perform_checksum, dryrun, log, global_collate, use_compression):
"""
Uploads files from a local directory to an S3 bucket in parallel.
Expand All @@ -447,6 +451,11 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
total_files_uploaded = 0
i = 0
#processed_files = []
if global_collate:
half_cores = nprocs // 2
zip_pool = Pool(processes=half_cores)
collate_ul_pool = Pool(processes=nprocs - half_cores)

pool = Pool(nprocs) # use 4 CPUs by default - very little speed-up, might drop multiprocessing and parallelise at shell level
#recursive loop over local folder
results = []
Expand All @@ -464,13 +473,14 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
elif len(files) == 0:
print(f'Skipping subfolder - no files.')
continue
if folder in exclude:
if exclude.isin([folder]).any():
print(f'Skipping subfolder {folder} - excluded.')
continue
# remove subfolders in exclude list
if len(sub_folders) > 0:
len_pre_exclude = len(sub_folders)
sub_folders[:] = [sub_folder for sub_folder in sub_folders if sub_folder not in exclude]
sub_folders[:] = [sub_folder for sub_folder in sub_folders if not exclude.isin([sub_folder]).any()]
# sub_folders[:] = [sub_folder for sub_folder in sub_folders if sub_folder not in exclude]
print(f'Skipping {len_pre_exclude - len(sub_folders)} subfolders in {folder} - excluded. {len(sub_folders)} subfolders remaining.')

folder_files = [os.sep.join([folder, filename]) for filename in files]
Expand Down Expand Up @@ -653,7 +663,7 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
print(f'collating into: {len(chunks)} zip file(s)')
for id,chunk in enumerate(zip(chunks,chunk_files)):
# print(f'chunk {id} contains {len(chunk[0])} folders')
zip_results.append(pool.apply_async(zip_folders, (parent_folder,chunk[0],chunk_files[0],use_compression,dryrun,id)))
zip_results.append(zip_pool.apply_async(zip_folders, (parent_folder,chunk[0],chunk_files[0],use_compression,dryrun,id)))
zipped = 0
uploaded = []
total_zips = len(zip_results)
Expand All @@ -676,6 +686,8 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
# #[os.sep.join([destination_dir, os.path.relpath(filename, local_dir)]) for filename in folder_files]
# to_collate[parent_folder][id]['zip_object_name'] =

# tc_index = len(to_collate[parent_folder]['zips']) - 1

# check if zip_object_name exists in bucket and get its checksum
if current_objects.isin([to_collate[parent_folder]['zips'][-1]['zip_object_name']]).any():
existing_zip_checksum = bm.get_resource(access_key, secret_key, s3_host).Object(bucket_name,to_collate[parent_folder]['zips'][-1]['zip_object_name']).e_tag.strip('"')
Expand All @@ -690,7 +702,7 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
total_size_uploaded += len(zip_data)
total_files_uploaded += 1
print(f"Uploading {to_collate[parent_folder]['zips'][-1]['zip_object_name']}.")
results.append(pool.apply_async(upload_and_callback, args=(
results.append(collate_ul_pool.apply_async(upload_and_callback, args=(
s3_host,
access_key,
secret_key,
Expand All @@ -706,11 +718,17 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
len(zip_data),
total_size_uploaded,
total_files_uploaded,
True,
)))
True),
# callback=lambda _: free_up_zip_memory(to_collate, parent_folder, tc_index),
))

pool.close()
pool.join()
if global_collate:
zip_pool.close()
zip_pool.join()
collate_ul_pool.close()
collate_ul_pool.join()

# # Go!
if __name__ == '__main__':
Expand Down Expand Up @@ -789,15 +807,17 @@ def error(self, message):
dryrun = args.dryrun
use_compression = not args.no_compression # internally, flag turns *on* compression, but for user no-compression turns it off - makes flag more intuitive

exclude = []

if args.exclude:
exclude = args.exclude
exclude = pd.Series(args.exclude)
else:
exclude = pd.Series.empty()

print(f'Config: {args}')

if save_config:
with open(config_file, 'w') as f:
yaml.dump({'bucket_name': bucket_name, 'local_path': local_dir, 'S3_prefix': prefix, 'S3_folder': sub_dirs, 'exclude': exclude, 'nprocs': nprocs, 'no_collate': not global_collate, 'dryrun': dryrun, 'no_checksum': not perform_checksum, 'no_compression': not use_compression}, f)
yaml.dump({'bucket_name': bucket_name, 'local_path': local_dir, 'S3_prefix': prefix, 'S3_folder': sub_dirs, 'exclude': exclude.to_list(), 'nprocs': nprocs, 'no_collate': not global_collate, 'dryrun': dryrun, 'no_checksum': not perform_checksum, 'no_compression': not use_compression}, f)
sys.exit(0)

print(f'Symlinks will be replaced with the target file. A new file <simlink_file>.symlink will contain the symlink target path.')
Expand Down
25 changes: 25 additions & 0 deletions scripts/du.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

# Script to discover the size of datasets using a back-end node

#SBATCH -A IRIS-IP005-CPU
#SBATCH --job-name="volume"
#SBATCH --time=06:00:00
#SBATCH --partition=cclake
#SBATCH --nodes=1
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=1
#SBATCH --mem=3420mb

# Change to job working directory
# Edit to parent folder of folder(s) of interest
cd /rds/project/rds-lT5YGmtKack/ras81/butler_wide_20220930/data/u/ir-shir1/DRP
date
pwd -P
# uncomment the below for multiple subfolders
du -h --max-depth=1
# uncomment the below for a single named folder
# du -sh folder
date

# for multiple named folders, copy/paste the above
25 changes: 21 additions & 4 deletions scripts/find_collated_zips.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@

import re

def get_zipfile_list(bucket_name, access_key, secret_key, s3_host):
def get_zipfile_list(bucket_name, access_key, secret_key, s3_host, get_contents_metadata):
zipfile_list = []
contents_list = []
s3 = bm.get_resource(access_key, secret_key, s3_host)
s3_client = bm.get_client(access_key, secret_key, s3_host)
bucket = s3.Bucket(bucket_name)
Expand All @@ -47,8 +48,14 @@ def get_zipfile_list(bucket_name, access_key, secret_key, s3_host):
key = obj['Key']
if pattern.match(key):
zipfile_list.append(key)
if get_contents_metadata:
contents = bucket.Object(key).get()['Metadata']['zip-contents'].split(',')
print(f'{key}: {contents}')
else:
print(f'{key}')


return zipfile_list
return zipfile_list, contents

def main():
epilog = ''
Expand All @@ -63,9 +70,14 @@ def error(self, message):
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument('--bucket-name','-b', type=str, help='Name of the S3 bucket.', required=True)
parser.add_argument('--check-contents','-c', action='store_true', help='Check the contents of the zip files from metadata exist in the bucket.')

args = parser.parse_args()
bucket_name = args.bucket_name
if args.check_contents:
check_contents = True
else:
check_contents = False

# Setup bucket object
s3_host = 'echo.stfc.ac.uk'
Expand All @@ -84,9 +96,14 @@ def error(self, message):
print(f'Bucket {bucket_name} not found in {s3_host}.')
sys.exit()

zipfile_list = get_zipfile_list(bucket_name, access_key, secret_key, s3_host)
zipfile_list, zipfile_contents = get_zipfile_list(bucket_name, access_key, secret_key, s3_host, check_contents)

print(zipfile_list)
if check_contents:
for i, contents in enumerate(zipfile_contents):
print(f'Zip file: {zipfile_list[i]}, {contents}')
else:
for zipfile in zipfile_list:
print(zipfile)

if __name__ == '__main__':
main()

0 comments on commit d5eccdc

Please sign in to comment.