diff --git a/csd3-side/scripts/lsst-backup.py b/csd3-side/scripts/lsst-backup.py index a8e6093..793ab0d 100644 --- a/csd3-side/scripts/lsst-backup.py +++ b/csd3-side/scripts/lsst-backup.py @@ -1265,7 +1265,7 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des folder_files_size = np.sum(np.array([os.lstat(filename).st_size for filename in folder_files])) print(f'Number of zip files: {len(zip_batch_files)}', flush=True) - print('', flush=True) + print(f'Done traversing {local_dir}.', flush=True) if global_collate: ############################### @@ -1390,122 +1390,9 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des else: upload_futures.remove(ulf) to_collate.loc[to_collate['id'] == id, 'upload'] = False - - - - # for zul_f in as_completed(zul_futures): # is a zip_and_upload_future - # # mem_check(zul_futures+upload_futures) - # if len(upload_futures) >= len(client.scheduler_info()['workers']): - # print('Waiting for upload slots to free up.', flush=True) - # for f in as_completed(upload_futures): - # if 'exception' in f.status or 'error' in f.status: - # f_tuple = f.exception(), f.traceback() - # failed.append(f_tuple) - # del f - # else: - # del f - # result = zul_f.result() - # if result[0] is not None: - # upload_futures.append(client.submit(upload_and_callback, - # s3, - # bucket_name, - # api, - # local_dir, - # destination_dir, - # result[0], # zip_data - # result[2], # namelist - # result[1], # zip_object_key - # dryrun, - # processing_start, - # 1, # 1 zip file - # result[3], # len(zip_data) (size in bytes) - # total_size_uploaded, - # total_files_uploaded, - # True, - # mem_per_worker - # )) - # to_collate.loc[to_collate['object_names'] == result[1], 'upload'] = False - # print(f'Zip {result[1]} created and added to upload queue.', flush=True) - # print(f'To upload: {len(to_collate[to_collate.upload == True])} zips remaining.', flush=True) - # del zul_f - # else: - # print(f'No files to zip as {result[1]}. Skipping upload.', flush=True) - # del zul_f - # del zul_futures - - # zul_futures = to_collate_uploads.apply(zip_and_upload, axis=1, - # args=(s3, - # bucket_name, - # api, - # destination_dir, - # local_dir, - # total_size_uploaded, - # total_files_uploaded, - # use_compression, - # dryrun, - # mem_per_worker), - # meta=pd.DataFrame(columns=['future','zip_object_key'], dtype='object') - # ) - # print('zul_futures') - # print(zul_futures) - # wait(zul_futures) - # print('waiting') - # exit() - # print(type(zul_futures)) - # for i in range(len(to_collate)): - # mem_check(zul_futures+upload_futures) - # zul_futures.append(client.submit( - # zip_and_upload, - # s3, - # bucket_name, - # api, - # destination_dir, - # local_dir, - # to_collate.iloc[i]['file_paths'], - # total_size_uploaded, - # total_files_uploaded, - # use_compression, - # dryrun, - # i, - # mem_per_worker, - # )) - # # mem_check(zul_futures) - - - ######################## - # Monitor upload tasks # - ######################## - - # print('Monitoring zip tasks.', flush=True) - #just upload futures - - - # del f - # elif 'finished' in f.status: - # del f - # del upload_futures - - # upload_futures and zul_futures - # while len(upload_futures) > 0 or len(zul_futures) > 0: - # for f in as_completed(upload_futures+zul_futures): - # if f in zul_futures: # is a zip_and_upload_future - # result = f.result() - # if result[0] is not None: - # upload_futures.append(result[0]) - # to_collate.loc[to_collate['object_names'] == result[1], 'upload'] = False - # print(f'Zip {result[1]} created and added to upload queue.', flush=True) - # print(f'To upload: {len(to_collate[to_collate.upload == True])} zips remaining.', flush=True) - # del f - # else: - # print(f'No files to zip as {result[1]}. Skipping upload.', flush=True) - # del f - # else: # is an upload_future - # if 'exception' in f.status or 'error' in f.status: - # f_tuple = f.exception(), f.traceback() - # failed.append(f_tuple) - # del f - # elif 'finished' in f.status: - # del f + if len(upload_futures) == 0: + print('All zip uploads complete.', flush=True) + sleep(30) if failed: for i, failed_upload in enumerate(failed): @@ -1518,7 +1405,9 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des else: print(f'Collate list not saved.') -# # Go! +########################################## +# Main Function # +########################################## if __name__ == '__main__': epilog = '' class MyParser(argparse.ArgumentParser): @@ -1533,7 +1422,6 @@ def error(self, message): ) parser.add_argument('--config-file', type=str, help='Path to the configuration YAML file.') parser.add_argument('--api', type=str, help='API to use; "S3" or "Swift". Case insensitive.') - parser.add_argument('--collate-list-file', type=str, help='The path to a CSV file containing a list of dicts describing files and folders to collate.') parser.add_argument('--bucket-name', type=str, help='Name of the S3 bucket.') parser.add_argument('--local-path', type=str, help='Absolute path to the folder to be uploaded.') parser.add_argument('--S3-prefix', type=str, help='Prefix to be used in S3 object keys.') @@ -1546,13 +1434,24 @@ def error(self, message): parser.add_argument('--dryrun', default=False, action='store_true', help='Perform a dry run without uploading files.') parser.add_argument('--no-compression', default=False, action='store_true', help='Do not use compression when collating files.') parser.add_argument('--save-config', default=False, action='store_true', help='Save the configuration to the provided config file path and exit.') - parser.add_argument('--no-save-collate-list', default=False, action='store_true', help='Save collate-list-file. Use to skip folder scanning.') parser.add_argument('--no-file-count-stop', default=False, action='store_true', help='Do not stop if count of local files equals count of S3 objects.') args = parser.parse_args() if not args.config_file and not (args.bucket_namse and args.local_path and args.S3_prefix): parser.error('If a config file is not provided, the bucket name, local path, and S3 prefix must be provided.') - if args.config_file and (args.api or args.bucket_name or args.local_path or args.S3_prefix or args.S3_folder or args.exclude or args.nprocs or args.threads_per_worker or args.no_collate or args.dryrun or args.no_compression or args.save_config or args.no_save_collate_list or args.no_file_count_stop): + if args.config_file and (args.api or + args.bucket_name or + args.local_path or + args.S3_prefix or + args.S3_folder or + args.exclude or + args.nprocs or + args.threads_per_worker or + args.no_collate or + args.dryrun or + args.no_compression or + args.save_config or + args.no_file_count_stop): print(f'WARNING: Options provide on command line override options in {args.config_file}.') if args.config_file: config_file = args.config_file @@ -1588,10 +1487,6 @@ def error(self, message): args.dryrun = config['dryrun'] if 'no_compression' in config.keys() and not args.no_compression: args.no_compression = config['no_compression'] - if 'collate_list_file' in config.keys() and not args.collate_list_file: - args.collate_list_file = config['collate_list_file'] - if 'no_save_collate_list' in config.keys() and not args.no_save_collate_list: - args.no_save_collate_list = config['no_save_collate_list'] if 'no_file_count_stop' in config.keys() and not args.no_file_count_stop: args.no_file_count_stop = config['no_file_count_stop'] if 'api' in config.keys() and not args.api: @@ -1624,20 +1519,6 @@ def error(self, message): dryrun = args.dryrun use_compression = not args.no_compression # internally, flag turns *on* compression, but for user no-compression turns it off - makes flag more intuitive - collate_list_file = args.collate_list_file - if not collate_list_file: - save_collate_list = False - else: - save_collate_list = not args.no_save_collate_list # internally, flag turns *on* save_collate_list, but for user no-save-collate-list turns it off - makes flag more intuitive - if save_collate_list and not collate_list_file: - parser.error('A collate list file must be provided to save the collate list.') - if save_collate_list and not os.path.exists(collate_list_file): - print(f'Collate list will be generated and saved to {collate_list_file}.') - elif save_collate_list and os.path.exists(collate_list_file): - print(f'Collate list will be read from and re-saved to {collate_list_file}.') - if (save_collate_list or collate_list_file) and not global_collate: - parser.error('Collate list file provided but collation is turned off. Please enable collation to use the collate list file.') - file_count_stop = not args.no_file_count_stop # internally, flag turns *on* file-count-stop, but for user no-file-count-stop turns it off - makes flag more intuitive if args.exclude: @@ -1660,8 +1541,6 @@ def error(self, message): 'no_collate': not global_collate, 'dryrun': dryrun, 'no_compression': not use_compression, - 'collate_list_file': collate_list_file, - 'no_save_collate_list': not save_collate_list, 'no_file_count_stop': not file_count_stop, 'exclude': exclude.to_list(), }, f) @@ -1696,6 +1575,15 @@ def error(self, message): previous_log = f"{prefix}-{'-'.join(sub_dirs.split('/'))}-{previous_suffix}" destination_dir = f"{prefix}/{sub_dirs}" + if global_collate: + collate_list_suffix = 'collate-list.csv' + collate_list_file = log.replace(log_suffix,collate_list_suffix) # now automatically generated + save_collate_list = True # no longer optional + if save_collate_list and not os.path.exists(collate_list_file): + print(f'Collate list will be generated and saved to {collate_list_file}.') + elif save_collate_list and os.path.exists(collate_list_file): + print(f'Collate list will be read from and re-saved to {collate_list_file}.') + # Add titles to log file if not os.path.exists(log): if os.path.exists(previous_log): @@ -1833,13 +1721,60 @@ def error(self, message): while local_dir[-1] == '/': local_dir = local_dir[:-1] - with warnings.catch_warnings(): - warnings.filterwarnings('ignore') - print(f'Processing files in {local_dir}, elapsed time = {datetime.now() - start}', flush=True) - if api == 's3': - process_files(s3, bucket_name, api, current_objects, exclude, local_dir, destination_dir, dryrun, log, global_collate, use_compression, client, mem_per_worker, collate_list_file, save_collate_list, file_count_stop) - elif api == 'swift': - process_files(s3, bucket_name, api, current_objects, exclude, local_dir, destination_dir, dryrun, log, global_collate, use_compression, client, mem_per_worker, collate_list_file, save_collate_list, file_count_stop) + zips_to_upload = True + retries = 0 + global_retry_limit = 10 + while zips_to_upload and retries <= global_retry_limit: + print(f'Processing files in {local_dir}, elapsed time = {datetime.now() - start}, try number: {retries+1}', flush=True) + with warnings.catch_warnings(): + warnings.filterwarnings('ignore') + if api == 's3': + process_files(s3, + bucket_name, + api, + current_objects, + exclude, + local_dir, + destination_dir, + dryrun, + log, + global_collate, + use_compression, + client, + mem_per_worker, + collate_list_file, + save_collate_list, + file_count_stop + ) + elif api == 'swift': + process_files(s3, + bucket_name, + api, + current_objects, + exclude, + local_dir, + destination_dir, + dryrun, + log, + global_collate, + use_compression, + client, + mem_per_worker, + collate_list_file, + save_collate_list, + file_count_stop + ) + + with open(collate_list_file, 'r') as clf: + upload_checks = [] + for l in clf.readlines(): + if l.split(',')[-1] == 'True': + upload_checks.append(True) + else: + upload_checks.append(False) + print(upload_checks) + zips_to_upload = any(upload_checks) + retries += 1 # def get_all_tasks(dask_scheduler=None): # return dask_scheduler.tasks