Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

76 quick check file count vs object count #77

Merged
merged 3 commits into from
Oct 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions csd3-side/scripts/lsst-backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ def upload_and_callback(s3_host, access_key, secret_key, bucket_name, local_dir,
return None

### KEY FUNCTION TO FIND ALL FILES AND ORGANISE UPLOADS ###
def process_files(s3_host, access_key, secret_key, bucket_name, current_objects, exclude, local_dir, destination_dir, perform_checksum, dryrun, log, global_collate, use_compression, client, mem_per_worker, collate_list_file, save_collate_file) -> None:
def process_files(s3_host, access_key, secret_key, bucket_name, current_objects, exclude, local_dir, destination_dir, perform_checksum, dryrun, log, global_collate, use_compression, client, mem_per_worker, collate_list_file, save_collate_file, file_count_stop) -> None:
"""
Uploads files from a local directory to an S3 bucket in parallel.

Expand Down Expand Up @@ -725,6 +725,13 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
total_all_files += len(files)
print(f'Folders: {total_all_folders}; Files: {total_all_files}.', flush=True, end='\r')
print()
if file_count_stop:
total_non_collate_zip = len(current_objects[current_objects['CURRENT_OBJECTS'].str.contains('collated_') == False])
if total_non_collate_zip == total_all_files:
print(f'Number of existing objects (excluding collated zips) equal to number of local files given the same prefix ({total_all_files}).')
print('This is a soft verification that the entire local dataset has been uploaded previously.')
print('Exiting. To prevent this behavior and force per-file verification, set `--no-file-count-stop` to True.', flush=True)
sys.exit()
if not os.path.exists(collate_list_file):
print(f'Preparing to upload {total_all_files} files in {total_all_folders} folders from {local_dir} to {bucket_name}/{destination_dir}.', flush=True)
for folder, sub_folders, files in os.walk(local_dir, topdown=False):
Expand Down Expand Up @@ -1114,7 +1121,8 @@ def error(self, message):
parser.add_argument('--no-checksum', default=False, action='store_true', help='Do not perform checksum validation during upload.')
parser.add_argument('--no-compression', default=False, action='store_true', help='Do not use compression when collating files.')
parser.add_argument('--save-config', default=False, action='store_true', help='Save the configuration to the provided config file path and exit.')
parser.add_argument('--save-collate-list', default=True, action='store_true', help='Save collate-list-file. Use to skip folder scanning.')
parser.add_argument('--no-save-collate-list', default=False, action='store_true', help='Save collate-list-file. Use to skip folder scanning.')
parser.add_argument('--no-file-count-stop', default=False, action='store_true', help='Do not stop if count of local files equals count of S3 objects.')
args = parser.parse_args()

if not args.config_file and not (args.bucket_name and args.local_path and args.S3_prefix):
Expand Down Expand Up @@ -1156,8 +1164,10 @@ def error(self, message):
args.no_compression = config['no_compression']
if 'collate_list_file' in config.keys() and not args.collate_list_file:
args.collate_list_file = config['collate_list_file']
if 'save_collate_list' in config.keys() and not args.save_collate_list:
args.save_collate_list = config['save_collate_list']
if 'no_save_collate_list' in config.keys() and not args.no_save_collate_list:
args.no_save_collate_list = config['no_save_collate_list']
if 'no_file_count_stop' in config.keys() and not args.no_file_count_stop:
args.no_file_count_stop = config['no_file_count_stop']
if args.save_config and not args.config_file:
parser.error('A config file must be provided to save the configuration.')

Expand All @@ -1181,7 +1191,7 @@ def error(self, message):
if not collate_list_file:
save_collate_list = False
else:
save_collate_list = args.save_collate_list
save_collate_list = not args.no_save_collate_list # internally, flag turns *on* save_collate_list, but for user no-save-collate-list turns it off - makes flag more intuitive
if save_collate_list and not collate_list_file:
parser.error('A collate list file must be provided to save the collate list.')
if save_collate_list and not os.path.exists(collate_list_file):
Expand All @@ -1191,6 +1201,8 @@ def error(self, message):
if (save_collate_list or collate_list_file) and not global_collate:
parser.error('Collate list file provided but collation is turned off. Please enable collation to use the collate list file.')

file_count_stop = not args.no_file_count_stop # internally, flag turns *on* file-count-stop, but for user no-file-count-stop turns it off - makes flag more intuitive

if args.exclude:
exclude = pd.Series(args.exclude)
else:
Expand All @@ -1212,7 +1224,8 @@ def error(self, message):
'no_checksum': not perform_checksum,
'no_compression': not use_compression,
'collate_list_file': collate_list_file,
'save_collate_list': save_collate_list,
'no_save_collate_list': not save_collate_list,
'no_file_count_stop': not file_count_stop,
'exclude': exclude.to_list(),
}, f)
sys.exit(0)
Expand Down Expand Up @@ -1294,6 +1307,7 @@ def error(self, message):
current_objects = pd.DataFrame.from_dict({'CURRENT_OBJECTS':current_objects})

print(f'Current objects (with matching prefix): {len(current_objects)}', flush=True)
print(f'Current objects (with matching prefix; excluding collated zips): {len(current_objects[current_objects['CURRENT_OBJECTS'].str.contains('collated_') == False])}', flush=True)
if not current_objects.empty:
print('Obtaining current object metadata.')
current_objects['METADATA'] = current_objects['CURRENT_OBJECTS'].apply(find_metadata, bucket=bucket)
Expand Down Expand Up @@ -1335,7 +1349,7 @@ def error(self, message):
print(f'Using {nprocs} processes.')
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
process_files(s3_host,access_key, secret_key, bucket_name, current_objects, exclude, local_dir, destination_dir, perform_checksum, dryrun, log, global_collate, use_compression, client, mem_per_worker, collate_list_file, save_collate_list)
process_files(s3_host,access_key, secret_key, bucket_name, current_objects, exclude, local_dir, destination_dir, perform_checksum, dryrun, log, global_collate, use_compression, client, mem_per_worker, collate_list_file, save_collate_list, file_count_stop)
# success = True
# except Exception as e:
# print(e)
Expand Down