Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

83 retry until upload false #85

Merged
merged 8 commits into from
Dec 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 83 additions & 148 deletions csd3-side/scripts/lsst-backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des

folder_files_size = np.sum(np.array([os.lstat(filename).st_size for filename in folder_files]))
print(f'Number of zip files: {len(zip_batch_files)}', flush=True)
print('', flush=True)
print(f'Done traversing {local_dir}.', flush=True)

if global_collate:
###############################
Expand Down Expand Up @@ -1390,122 +1390,9 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des
else:
upload_futures.remove(ulf)
to_collate.loc[to_collate['id'] == id, 'upload'] = False



# for zul_f in as_completed(zul_futures): # is a zip_and_upload_future
# # mem_check(zul_futures+upload_futures)
# if len(upload_futures) >= len(client.scheduler_info()['workers']):
# print('Waiting for upload slots to free up.', flush=True)
# for f in as_completed(upload_futures):
# if 'exception' in f.status or 'error' in f.status:
# f_tuple = f.exception(), f.traceback()
# failed.append(f_tuple)
# del f
# else:
# del f
# result = zul_f.result()
# if result[0] is not None:
# upload_futures.append(client.submit(upload_and_callback,
# s3,
# bucket_name,
# api,
# local_dir,
# destination_dir,
# result[0], # zip_data
# result[2], # namelist
# result[1], # zip_object_key
# dryrun,
# processing_start,
# 1, # 1 zip file
# result[3], # len(zip_data) (size in bytes)
# total_size_uploaded,
# total_files_uploaded,
# True,
# mem_per_worker
# ))
# to_collate.loc[to_collate['object_names'] == result[1], 'upload'] = False
# print(f'Zip {result[1]} created and added to upload queue.', flush=True)
# print(f'To upload: {len(to_collate[to_collate.upload == True])} zips remaining.', flush=True)
# del zul_f
# else:
# print(f'No files to zip as {result[1]}. Skipping upload.', flush=True)
# del zul_f
# del zul_futures

# zul_futures = to_collate_uploads.apply(zip_and_upload, axis=1,
# args=(s3,
# bucket_name,
# api,
# destination_dir,
# local_dir,
# total_size_uploaded,
# total_files_uploaded,
# use_compression,
# dryrun,
# mem_per_worker),
# meta=pd.DataFrame(columns=['future','zip_object_key'], dtype='object')
# )
# print('zul_futures')
# print(zul_futures)
# wait(zul_futures)
# print('waiting')
# exit()
# print(type(zul_futures))
# for i in range(len(to_collate)):
# mem_check(zul_futures+upload_futures)
# zul_futures.append(client.submit(
# zip_and_upload,
# s3,
# bucket_name,
# api,
# destination_dir,
# local_dir,
# to_collate.iloc[i]['file_paths'],
# total_size_uploaded,
# total_files_uploaded,
# use_compression,
# dryrun,
# i,
# mem_per_worker,
# ))
# # mem_check(zul_futures)


########################
# Monitor upload tasks #
########################

# print('Monitoring zip tasks.', flush=True)
#just upload futures


# del f
# elif 'finished' in f.status:
# del f
# del upload_futures

# upload_futures and zul_futures
# while len(upload_futures) > 0 or len(zul_futures) > 0:
# for f in as_completed(upload_futures+zul_futures):
# if f in zul_futures: # is a zip_and_upload_future
# result = f.result()
# if result[0] is not None:
# upload_futures.append(result[0])
# to_collate.loc[to_collate['object_names'] == result[1], 'upload'] = False
# print(f'Zip {result[1]} created and added to upload queue.', flush=True)
# print(f'To upload: {len(to_collate[to_collate.upload == True])} zips remaining.', flush=True)
# del f
# else:
# print(f'No files to zip as {result[1]}. Skipping upload.', flush=True)
# del f
# else: # is an upload_future
# if 'exception' in f.status or 'error' in f.status:
# f_tuple = f.exception(), f.traceback()
# failed.append(f_tuple)
# del f
# elif 'finished' in f.status:
# del f
if len(upload_futures) == 0:
print('All zip uploads complete.', flush=True)
sleep(30)

if failed:
for i, failed_upload in enumerate(failed):
Expand All @@ -1518,7 +1405,9 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des
else:
print(f'Collate list not saved.')

# # Go!
##########################################
# Main Function #
##########################################
if __name__ == '__main__':
epilog = ''
class MyParser(argparse.ArgumentParser):
Expand All @@ -1533,7 +1422,6 @@ def error(self, message):
)
parser.add_argument('--config-file', type=str, help='Path to the configuration YAML file.')
parser.add_argument('--api', type=str, help='API to use; "S3" or "Swift". Case insensitive.')
parser.add_argument('--collate-list-file', type=str, help='The path to a CSV file containing a list of dicts describing files and folders to collate.')
parser.add_argument('--bucket-name', type=str, help='Name of the S3 bucket.')
parser.add_argument('--local-path', type=str, help='Absolute path to the folder to be uploaded.')
parser.add_argument('--S3-prefix', type=str, help='Prefix to be used in S3 object keys.')
Expand All @@ -1546,13 +1434,24 @@ def error(self, message):
parser.add_argument('--dryrun', default=False, action='store_true', help='Perform a dry run without uploading files.')
parser.add_argument('--no-compression', default=False, action='store_true', help='Do not use compression when collating files.')
parser.add_argument('--save-config', default=False, action='store_true', help='Save the configuration to the provided config file path and exit.')
parser.add_argument('--no-save-collate-list', default=False, action='store_true', help='Save collate-list-file. Use to skip folder scanning.')
parser.add_argument('--no-file-count-stop', default=False, action='store_true', help='Do not stop if count of local files equals count of S3 objects.')
args = parser.parse_args()

if not args.config_file and not (args.bucket_namse and args.local_path and args.S3_prefix):
parser.error('If a config file is not provided, the bucket name, local path, and S3 prefix must be provided.')
if args.config_file and (args.api or args.bucket_name or args.local_path or args.S3_prefix or args.S3_folder or args.exclude or args.nprocs or args.threads_per_worker or args.no_collate or args.dryrun or args.no_compression or args.save_config or args.no_save_collate_list or args.no_file_count_stop):
if args.config_file and (args.api or
args.bucket_name or
args.local_path or
args.S3_prefix or
args.S3_folder or
args.exclude or
args.nprocs or
args.threads_per_worker or
args.no_collate or
args.dryrun or
args.no_compression or
args.save_config or
args.no_file_count_stop):
print(f'WARNING: Options provide on command line override options in {args.config_file}.')
if args.config_file:
config_file = args.config_file
Expand Down Expand Up @@ -1588,10 +1487,6 @@ def error(self, message):
args.dryrun = config['dryrun']
if 'no_compression' in config.keys() and not args.no_compression:
args.no_compression = config['no_compression']
if 'collate_list_file' in config.keys() and not args.collate_list_file:
args.collate_list_file = config['collate_list_file']
if 'no_save_collate_list' in config.keys() and not args.no_save_collate_list:
args.no_save_collate_list = config['no_save_collate_list']
if 'no_file_count_stop' in config.keys() and not args.no_file_count_stop:
args.no_file_count_stop = config['no_file_count_stop']
if 'api' in config.keys() and not args.api:
Expand Down Expand Up @@ -1624,20 +1519,6 @@ def error(self, message):
dryrun = args.dryrun
use_compression = not args.no_compression # internally, flag turns *on* compression, but for user no-compression turns it off - makes flag more intuitive

collate_list_file = args.collate_list_file
if not collate_list_file:
save_collate_list = False
else:
save_collate_list = not args.no_save_collate_list # internally, flag turns *on* save_collate_list, but for user no-save-collate-list turns it off - makes flag more intuitive
if save_collate_list and not collate_list_file:
parser.error('A collate list file must be provided to save the collate list.')
if save_collate_list and not os.path.exists(collate_list_file):
print(f'Collate list will be generated and saved to {collate_list_file}.')
elif save_collate_list and os.path.exists(collate_list_file):
print(f'Collate list will be read from and re-saved to {collate_list_file}.')
if (save_collate_list or collate_list_file) and not global_collate:
parser.error('Collate list file provided but collation is turned off. Please enable collation to use the collate list file.')

file_count_stop = not args.no_file_count_stop # internally, flag turns *on* file-count-stop, but for user no-file-count-stop turns it off - makes flag more intuitive

if args.exclude:
Expand All @@ -1660,8 +1541,6 @@ def error(self, message):
'no_collate': not global_collate,
'dryrun': dryrun,
'no_compression': not use_compression,
'collate_list_file': collate_list_file,
'no_save_collate_list': not save_collate_list,
'no_file_count_stop': not file_count_stop,
'exclude': exclude.to_list(),
}, f)
Expand Down Expand Up @@ -1696,6 +1575,15 @@ def error(self, message):
previous_log = f"{prefix}-{'-'.join(sub_dirs.split('/'))}-{previous_suffix}"
destination_dir = f"{prefix}/{sub_dirs}"

if global_collate:
collate_list_suffix = 'collate-list.csv'
collate_list_file = log.replace(log_suffix,collate_list_suffix) # now automatically generated
save_collate_list = True # no longer optional
if save_collate_list and not os.path.exists(collate_list_file):
print(f'Collate list will be generated and saved to {collate_list_file}.')
elif save_collate_list and os.path.exists(collate_list_file):
print(f'Collate list will be read from and re-saved to {collate_list_file}.')

# Add titles to log file
if not os.path.exists(log):
if os.path.exists(previous_log):
Expand Down Expand Up @@ -1833,13 +1721,60 @@ def error(self, message):
while local_dir[-1] == '/':
local_dir = local_dir[:-1]

with warnings.catch_warnings():
warnings.filterwarnings('ignore')
print(f'Processing files in {local_dir}, elapsed time = {datetime.now() - start}', flush=True)
if api == 's3':
process_files(s3, bucket_name, api, current_objects, exclude, local_dir, destination_dir, dryrun, log, global_collate, use_compression, client, mem_per_worker, collate_list_file, save_collate_list, file_count_stop)
elif api == 'swift':
process_files(s3, bucket_name, api, current_objects, exclude, local_dir, destination_dir, dryrun, log, global_collate, use_compression, client, mem_per_worker, collate_list_file, save_collate_list, file_count_stop)
zips_to_upload = True
retries = 0
global_retry_limit = 10
while zips_to_upload and retries <= global_retry_limit:
print(f'Processing files in {local_dir}, elapsed time = {datetime.now() - start}, try number: {retries+1}', flush=True)
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
if api == 's3':
process_files(s3,
bucket_name,
api,
current_objects,
exclude,
local_dir,
destination_dir,
dryrun,
log,
global_collate,
use_compression,
client,
mem_per_worker,
collate_list_file,
save_collate_list,
file_count_stop
)
elif api == 'swift':
process_files(s3,
bucket_name,
api,
current_objects,
exclude,
local_dir,
destination_dir,
dryrun,
log,
global_collate,
use_compression,
client,
mem_per_worker,
collate_list_file,
save_collate_list,
file_count_stop
)

with open(collate_list_file, 'r') as clf:
upload_checks = []
for l in clf.readlines():
if l.split(',')[-1] == 'True':
upload_checks.append(True)
else:
upload_checks.append(False)
print(upload_checks)
zips_to_upload = any(upload_checks)
retries += 1

# def get_all_tasks(dask_scheduler=None):
# return dask_scheduler.tasks
Expand Down
Loading