From d5eccdc90dd8afc8455657b2dc1cb20aa203fd11 Mon Sep 17 00:00:00 2001 From: Dave McKay Date: Mon, 5 Aug 2024 16:07:39 +0100 Subject: [PATCH] 22 series.isin (#23) * starting with the exclude list * save_config using exclude.to_list * exclude done * added du.sh sbatch script * Use callback to free up zip_contect memory * separating cores used for zipping and upload * freeing up memory not working and possibly redundant. * done --- csd3-side/scripts/lsst-backup.py | 38 ++++++++++++++++++++++++-------- scripts/du.sh | 25 +++++++++++++++++++++ scripts/find_collated_zips.py | 25 +++++++++++++++++---- 3 files changed, 75 insertions(+), 13 deletions(-) create mode 100644 scripts/du.sh diff --git a/csd3-side/scripts/lsst-backup.py b/csd3-side/scripts/lsst-backup.py index 6f20a07..c210aa7 100644 --- a/csd3-side/scripts/lsst-backup.py +++ b/csd3-side/scripts/lsst-backup.py @@ -422,6 +422,10 @@ def upload_and_callback(s3_host, access_key, secret_key, bucket_name, folder, fi logfile.write(f'{result}\n') return None +# def free_up_zip_memory(to_collate, parent_folder, index): +# del to_collate[parent_folder]['zips'][index]['zip_contents'] +# print(f'Deleted zip contents object for {parent_folder}, zip index {index}, to free memory.') + def process_files(s3_host, access_key, secret_key, bucket_name, current_objects, exclude, local_dir, destination_dir, nprocs, perform_checksum, dryrun, log, global_collate, use_compression): """ Uploads files from a local directory to an S3 bucket in parallel. @@ -447,6 +451,11 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects, total_files_uploaded = 0 i = 0 #processed_files = [] + if global_collate: + half_cores = nprocs // 2 + zip_pool = Pool(processes=half_cores) + collate_ul_pool = Pool(processes=nprocs - half_cores) + pool = Pool(nprocs) # use 4 CPUs by default - very little speed-up, might drop multiprocessing and parallelise at shell level #recursive loop over local folder results = [] @@ -464,13 +473,14 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects, elif len(files) == 0: print(f'Skipping subfolder - no files.') continue - if folder in exclude: + if exclude.isin([folder]).any(): print(f'Skipping subfolder {folder} - excluded.') continue # remove subfolders in exclude list if len(sub_folders) > 0: len_pre_exclude = len(sub_folders) - sub_folders[:] = [sub_folder for sub_folder in sub_folders if sub_folder not in exclude] + sub_folders[:] = [sub_folder for sub_folder in sub_folders if not exclude.isin([sub_folder]).any()] + # sub_folders[:] = [sub_folder for sub_folder in sub_folders if sub_folder not in exclude] print(f'Skipping {len_pre_exclude - len(sub_folders)} subfolders in {folder} - excluded. {len(sub_folders)} subfolders remaining.') folder_files = [os.sep.join([folder, filename]) for filename in files] @@ -653,7 +663,7 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects, print(f'collating into: {len(chunks)} zip file(s)') for id,chunk in enumerate(zip(chunks,chunk_files)): # print(f'chunk {id} contains {len(chunk[0])} folders') - zip_results.append(pool.apply_async(zip_folders, (parent_folder,chunk[0],chunk_files[0],use_compression,dryrun,id))) + zip_results.append(zip_pool.apply_async(zip_folders, (parent_folder,chunk[0],chunk_files[0],use_compression,dryrun,id))) zipped = 0 uploaded = [] total_zips = len(zip_results) @@ -676,6 +686,8 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects, # #[os.sep.join([destination_dir, os.path.relpath(filename, local_dir)]) for filename in folder_files] # to_collate[parent_folder][id]['zip_object_name'] = + # tc_index = len(to_collate[parent_folder]['zips']) - 1 + # check if zip_object_name exists in bucket and get its checksum if current_objects.isin([to_collate[parent_folder]['zips'][-1]['zip_object_name']]).any(): existing_zip_checksum = bm.get_resource(access_key, secret_key, s3_host).Object(bucket_name,to_collate[parent_folder]['zips'][-1]['zip_object_name']).e_tag.strip('"') @@ -690,7 +702,7 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects, total_size_uploaded += len(zip_data) total_files_uploaded += 1 print(f"Uploading {to_collate[parent_folder]['zips'][-1]['zip_object_name']}.") - results.append(pool.apply_async(upload_and_callback, args=( + results.append(collate_ul_pool.apply_async(upload_and_callback, args=( s3_host, access_key, secret_key, @@ -706,11 +718,17 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects, len(zip_data), total_size_uploaded, total_files_uploaded, - True, - ))) + True), + # callback=lambda _: free_up_zip_memory(to_collate, parent_folder, tc_index), + )) pool.close() pool.join() + if global_collate: + zip_pool.close() + zip_pool.join() + collate_ul_pool.close() + collate_ul_pool.join() # # Go! if __name__ == '__main__': @@ -789,15 +807,17 @@ def error(self, message): dryrun = args.dryrun use_compression = not args.no_compression # internally, flag turns *on* compression, but for user no-compression turns it off - makes flag more intuitive - exclude = [] + if args.exclude: - exclude = args.exclude + exclude = pd.Series(args.exclude) + else: + exclude = pd.Series.empty() print(f'Config: {args}') if save_config: with open(config_file, 'w') as f: - yaml.dump({'bucket_name': bucket_name, 'local_path': local_dir, 'S3_prefix': prefix, 'S3_folder': sub_dirs, 'exclude': exclude, 'nprocs': nprocs, 'no_collate': not global_collate, 'dryrun': dryrun, 'no_checksum': not perform_checksum, 'no_compression': not use_compression}, f) + yaml.dump({'bucket_name': bucket_name, 'local_path': local_dir, 'S3_prefix': prefix, 'S3_folder': sub_dirs, 'exclude': exclude.to_list(), 'nprocs': nprocs, 'no_collate': not global_collate, 'dryrun': dryrun, 'no_checksum': not perform_checksum, 'no_compression': not use_compression}, f) sys.exit(0) print(f'Symlinks will be replaced with the target file. A new file .symlink will contain the symlink target path.') diff --git a/scripts/du.sh b/scripts/du.sh new file mode 100644 index 0000000..1d075fe --- /dev/null +++ b/scripts/du.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +# Script to discover the size of datasets using a back-end node + +#SBATCH -A IRIS-IP005-CPU +#SBATCH --job-name="volume" +#SBATCH --time=06:00:00 +#SBATCH --partition=cclake +#SBATCH --nodes=1 +#SBATCH --tasks-per-node=1 +#SBATCH --cpus-per-task=1 +#SBATCH --mem=3420mb + +# Change to job working directory +# Edit to parent folder of folder(s) of interest +cd /rds/project/rds-lT5YGmtKack/ras81/butler_wide_20220930/data/u/ir-shir1/DRP +date +pwd -P +# uncomment the below for multiple subfolders +du -h --max-depth=1 +# uncomment the below for a single named folder +# du -sh folder +date + +# for multiple named folders, copy/paste the above \ No newline at end of file diff --git a/scripts/find_collated_zips.py b/scripts/find_collated_zips.py index 8d6fc1d..4cf1a79 100644 --- a/scripts/find_collated_zips.py +++ b/scripts/find_collated_zips.py @@ -30,8 +30,9 @@ import re -def get_zipfile_list(bucket_name, access_key, secret_key, s3_host): +def get_zipfile_list(bucket_name, access_key, secret_key, s3_host, get_contents_metadata): zipfile_list = [] + contents_list = [] s3 = bm.get_resource(access_key, secret_key, s3_host) s3_client = bm.get_client(access_key, secret_key, s3_host) bucket = s3.Bucket(bucket_name) @@ -47,8 +48,14 @@ def get_zipfile_list(bucket_name, access_key, secret_key, s3_host): key = obj['Key'] if pattern.match(key): zipfile_list.append(key) + if get_contents_metadata: + contents = bucket.Object(key).get()['Metadata']['zip-contents'].split(',') + print(f'{key}: {contents}') + else: + print(f'{key}') + - return zipfile_list + return zipfile_list, contents def main(): epilog = '' @@ -63,9 +70,14 @@ def error(self, message): formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument('--bucket-name','-b', type=str, help='Name of the S3 bucket.', required=True) + parser.add_argument('--check-contents','-c', action='store_true', help='Check the contents of the zip files from metadata exist in the bucket.') args = parser.parse_args() bucket_name = args.bucket_name + if args.check_contents: + check_contents = True + else: + check_contents = False # Setup bucket object s3_host = 'echo.stfc.ac.uk' @@ -84,9 +96,14 @@ def error(self, message): print(f'Bucket {bucket_name} not found in {s3_host}.') sys.exit() - zipfile_list = get_zipfile_list(bucket_name, access_key, secret_key, s3_host) + zipfile_list, zipfile_contents = get_zipfile_list(bucket_name, access_key, secret_key, s3_host, check_contents) - print(zipfile_list) + if check_contents: + for i, contents in enumerate(zipfile_contents): + print(f'Zip file: {zipfile_list[i]}, {contents}') + else: + for zipfile in zipfile_list: + print(zipfile) if __name__ == '__main__': main() \ No newline at end of file