Skip to content

Commit

Permalink
Merge branch 'main' into 84-clean-lsst-backup-py
Browse files Browse the repository at this point in the history
  • Loading branch information
davedavemckay authored Dec 19, 2024
2 parents 4f14112 + cbbb383 commit 69712b2
Showing 1 changed file with 82 additions and 32 deletions.
114 changes: 82 additions & 32 deletions csd3-side/scripts/lsst-backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des

folder_files_size = np.sum(np.array([os.lstat(filename).st_size for filename in folder_files]))
print(f'Number of zip files: {len(zip_batch_files)}', flush=True)
print('', flush=True)
print(f'Done traversing {local_dir}.', flush=True)

if global_collate:
###############################
Expand Down Expand Up @@ -1211,6 +1211,8 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des
else:
upload_futures.remove(ulf)
to_collate.loc[to_collate['id'] == id, 'upload'] = False
if len(upload_futures) == 0:
print('All zip uploads complete.', flush=True)

if failed:
for i, failed_upload in enumerate(failed):
Expand All @@ -1223,7 +1225,9 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des
else:
print(f'Collate list not saved.')

# # Go!
##########################################
# Main Function #
##########################################
if __name__ == '__main__':
epilog = ''
class MyParser(argparse.ArgumentParser):
Expand All @@ -1238,7 +1242,6 @@ def error(self, message):
)
parser.add_argument('--config-file', type=str, help='Path to the configuration YAML file.')
parser.add_argument('--api', type=str, help='API to use; "S3" or "Swift". Case insensitive.')
parser.add_argument('--collate-list-file', type=str, help='The path to a CSV file containing a list of dicts describing files and folders to collate.')
parser.add_argument('--bucket-name', type=str, help='Name of the S3 bucket.')
parser.add_argument('--local-path', type=str, help='Absolute path to the folder to be uploaded.')
parser.add_argument('--S3-prefix', type=str, help='Prefix to be used in S3 object keys.')
Expand All @@ -1251,13 +1254,24 @@ def error(self, message):
parser.add_argument('--dryrun', default=False, action='store_true', help='Perform a dry run without uploading files.')
parser.add_argument('--no-compression', default=False, action='store_true', help='Do not use compression when collating files.')
parser.add_argument('--save-config', default=False, action='store_true', help='Save the configuration to the provided config file path and exit.')
parser.add_argument('--no-save-collate-list', default=False, action='store_true', help='Save collate-list-file. Use to skip folder scanning.')
parser.add_argument('--no-file-count-stop', default=False, action='store_true', help='Do not stop if count of local files equals count of S3 objects.')
args = parser.parse_args()

if not args.config_file and not (args.bucket_namse and args.local_path and args.S3_prefix):
parser.error('If a config file is not provided, the bucket name, local path, and S3 prefix must be provided.')
if args.config_file and (args.api or args.bucket_name or args.local_path or args.S3_prefix or args.S3_folder or args.exclude or args.nprocs or args.threads_per_worker or args.no_collate or args.dryrun or args.no_compression or args.save_config or args.no_save_collate_list or args.no_file_count_stop):
if args.config_file and (args.api or
args.bucket_name or
args.local_path or
args.S3_prefix or
args.S3_folder or
args.exclude or
args.nprocs or
args.threads_per_worker or
args.no_collate or
args.dryrun or
args.no_compression or
args.save_config or
args.no_file_count_stop):
print(f'WARNING: Options provide on command line override options in {args.config_file}.')
if args.config_file:
config_file = args.config_file
Expand Down Expand Up @@ -1293,10 +1307,6 @@ def error(self, message):
args.dryrun = config['dryrun']
if 'no_compression' in config.keys() and not args.no_compression:
args.no_compression = config['no_compression']
if 'collate_list_file' in config.keys() and not args.collate_list_file:
args.collate_list_file = config['collate_list_file']
if 'no_save_collate_list' in config.keys() and not args.no_save_collate_list:
args.no_save_collate_list = config['no_save_collate_list']
if 'no_file_count_stop' in config.keys() and not args.no_file_count_stop:
args.no_file_count_stop = config['no_file_count_stop']
if 'api' in config.keys() and not args.api:
Expand Down Expand Up @@ -1329,20 +1339,6 @@ def error(self, message):
dryrun = args.dryrun
use_compression = not args.no_compression # internally, flag turns *on* compression, but for user no-compression turns it off - makes flag more intuitive

collate_list_file = args.collate_list_file
if not collate_list_file:
save_collate_list = False
else:
save_collate_list = not args.no_save_collate_list # internally, flag turns *on* save_collate_list, but for user no-save-collate-list turns it off - makes flag more intuitive
if save_collate_list and not collate_list_file:
parser.error('A collate list file must be provided to save the collate list.')
if save_collate_list and not os.path.exists(collate_list_file):
print(f'Collate list will be generated and saved to {collate_list_file}.')
elif save_collate_list and os.path.exists(collate_list_file):
print(f'Collate list will be read from and re-saved to {collate_list_file}.')
if (save_collate_list or collate_list_file) and not global_collate:
parser.error('Collate list file provided but collation is turned off. Please enable collation to use the collate list file.')

file_count_stop = not args.no_file_count_stop # internally, flag turns *on* file-count-stop, but for user no-file-count-stop turns it off - makes flag more intuitive

if args.exclude:
Expand All @@ -1365,8 +1361,6 @@ def error(self, message):
'no_collate': not global_collate,
'dryrun': dryrun,
'no_compression': not use_compression,
'collate_list_file': collate_list_file,
'no_save_collate_list': not save_collate_list,
'no_file_count_stop': not file_count_stop,
'exclude': exclude.to_list(),
}, f)
Expand Down Expand Up @@ -1401,6 +1395,15 @@ def error(self, message):
previous_log = f"{prefix}-{'-'.join(sub_dirs.split('/'))}-{previous_suffix}"
destination_dir = f"{prefix}/{sub_dirs}"

if global_collate:
collate_list_suffix = 'collate-list.csv'
collate_list_file = log.replace(log_suffix,collate_list_suffix) # now automatically generated
save_collate_list = True # no longer optional
if save_collate_list and not os.path.exists(collate_list_file):
print(f'Collate list will be generated and saved to {collate_list_file}.')
elif save_collate_list and os.path.exists(collate_list_file):
print(f'Collate list will be read from and re-saved to {collate_list_file}.')

# Add titles to log file
if not os.path.exists(log):
if os.path.exists(previous_log):
Expand Down Expand Up @@ -1532,13 +1535,60 @@ def error(self, message):
while local_dir[-1] == '/':
local_dir = local_dir[:-1]

with warnings.catch_warnings():
warnings.filterwarnings('ignore')
print(f'Processing files in {local_dir}, elapsed time = {datetime.now() - start}', flush=True)
if api == 's3':
process_files(s3, bucket_name, api, current_objects, exclude, local_dir, destination_dir, dryrun, log, global_collate, use_compression, client, mem_per_worker, collate_list_file, save_collate_list, file_count_stop)
elif api == 'swift':
process_files(s3, bucket_name, api, current_objects, exclude, local_dir, destination_dir, dryrun, log, global_collate, use_compression, client, mem_per_worker, collate_list_file, save_collate_list, file_count_stop)
zips_to_upload = True
retries = 0
global_retry_limit = 10
while zips_to_upload and retries <= global_retry_limit:
print(f'Processing files in {local_dir}, elapsed time = {datetime.now() - start}, try number: {retries+1}', flush=True)
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
if api == 's3':
process_files(s3,
bucket_name,
api,
current_objects,
exclude,
local_dir,
destination_dir,
dryrun,
log,
global_collate,
use_compression,
client,
mem_per_worker,
collate_list_file,
save_collate_list,
file_count_stop
)
elif api == 'swift':
process_files(s3,
bucket_name,
api,
current_objects,
exclude,
local_dir,
destination_dir,
dryrun,
log,
global_collate,
use_compression,
client,
mem_per_worker,
collate_list_file,
save_collate_list,
file_count_stop
)

with open(collate_list_file, 'r') as clf:
upload_checks = []
for l in clf.readlines():
if l.split(',')[-1] == 'True':
upload_checks.append(True)
else:
upload_checks.append(False)
print(upload_checks)
zips_to_upload = any(upload_checks)
retries += 1

print(f'Finished uploads at {datetime.now()}, elapsed time = {datetime.now() - start}')
print(f'Dask Client closed at {datetime.now()}, elapsed time = {datetime.now() - start}')
Expand Down

0 comments on commit 69712b2

Please sign in to comment.