Skip to content

Commit

Permalink
76 quick check file count vs object count (#77)
Browse files Browse the repository at this point in the history
* first try with no-file-count-stop

* clarified message

* done
  • Loading branch information
davedavemckay authored Oct 28, 2024
1 parent 0239db3 commit 29d8fd6
Showing 1 changed file with 21 additions and 7 deletions.
28 changes: 21 additions & 7 deletions csd3-side/scripts/lsst-backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ def upload_and_callback(s3_host, access_key, secret_key, bucket_name, local_dir,
return None

### KEY FUNCTION TO FIND ALL FILES AND ORGANISE UPLOADS ###
def process_files(s3_host, access_key, secret_key, bucket_name, current_objects, exclude, local_dir, destination_dir, perform_checksum, dryrun, log, global_collate, use_compression, client, mem_per_worker, collate_list_file, save_collate_file) -> None:
def process_files(s3_host, access_key, secret_key, bucket_name, current_objects, exclude, local_dir, destination_dir, perform_checksum, dryrun, log, global_collate, use_compression, client, mem_per_worker, collate_list_file, save_collate_file, file_count_stop) -> None:
"""
Uploads files from a local directory to an S3 bucket in parallel.
Expand Down Expand Up @@ -725,6 +725,13 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
total_all_files += len(files)
print(f'Folders: {total_all_folders}; Files: {total_all_files}.', flush=True, end='\r')
print()
if file_count_stop:
total_non_collate_zip = len(current_objects[current_objects['CURRENT_OBJECTS'].str.contains('collated_') == False])
if total_non_collate_zip == total_all_files:
print(f'Number of existing objects (excluding collated zips) equal to number of local files given the same prefix ({total_all_files}).')
print('This is a soft verification that the entire local dataset has been uploaded previously.')
print('Exiting. To prevent this behavior and force per-file verification, set `--no-file-count-stop` to True.', flush=True)
sys.exit()
if not os.path.exists(collate_list_file):
print(f'Preparing to upload {total_all_files} files in {total_all_folders} folders from {local_dir} to {bucket_name}/{destination_dir}.', flush=True)
for folder, sub_folders, files in os.walk(local_dir, topdown=False):
Expand Down Expand Up @@ -1114,7 +1121,8 @@ def error(self, message):
parser.add_argument('--no-checksum', default=False, action='store_true', help='Do not perform checksum validation during upload.')
parser.add_argument('--no-compression', default=False, action='store_true', help='Do not use compression when collating files.')
parser.add_argument('--save-config', default=False, action='store_true', help='Save the configuration to the provided config file path and exit.')
parser.add_argument('--save-collate-list', default=True, action='store_true', help='Save collate-list-file. Use to skip folder scanning.')
parser.add_argument('--no-save-collate-list', default=False, action='store_true', help='Save collate-list-file. Use to skip folder scanning.')
parser.add_argument('--no-file-count-stop', default=False, action='store_true', help='Do not stop if count of local files equals count of S3 objects.')
args = parser.parse_args()

if not args.config_file and not (args.bucket_name and args.local_path and args.S3_prefix):
Expand Down Expand Up @@ -1156,8 +1164,10 @@ def error(self, message):
args.no_compression = config['no_compression']
if 'collate_list_file' in config.keys() and not args.collate_list_file:
args.collate_list_file = config['collate_list_file']
if 'save_collate_list' in config.keys() and not args.save_collate_list:
args.save_collate_list = config['save_collate_list']
if 'no_save_collate_list' in config.keys() and not args.no_save_collate_list:
args.no_save_collate_list = config['no_save_collate_list']
if 'no_file_count_stop' in config.keys() and not args.no_file_count_stop:
args.no_file_count_stop = config['no_file_count_stop']
if args.save_config and not args.config_file:
parser.error('A config file must be provided to save the configuration.')

Expand All @@ -1181,7 +1191,7 @@ def error(self, message):
if not collate_list_file:
save_collate_list = False
else:
save_collate_list = args.save_collate_list
save_collate_list = not args.no_save_collate_list # internally, flag turns *on* save_collate_list, but for user no-save-collate-list turns it off - makes flag more intuitive
if save_collate_list and not collate_list_file:
parser.error('A collate list file must be provided to save the collate list.')
if save_collate_list and not os.path.exists(collate_list_file):
Expand All @@ -1191,6 +1201,8 @@ def error(self, message):
if (save_collate_list or collate_list_file) and not global_collate:
parser.error('Collate list file provided but collation is turned off. Please enable collation to use the collate list file.')

file_count_stop = not args.no_file_count_stop # internally, flag turns *on* file-count-stop, but for user no-file-count-stop turns it off - makes flag more intuitive

if args.exclude:
exclude = pd.Series(args.exclude)
else:
Expand All @@ -1212,7 +1224,8 @@ def error(self, message):
'no_checksum': not perform_checksum,
'no_compression': not use_compression,
'collate_list_file': collate_list_file,
'save_collate_list': save_collate_list,
'no_save_collate_list': not save_collate_list,
'no_file_count_stop': not file_count_stop,
'exclude': exclude.to_list(),
}, f)
sys.exit(0)
Expand Down Expand Up @@ -1294,6 +1307,7 @@ def error(self, message):
current_objects = pd.DataFrame.from_dict({'CURRENT_OBJECTS':current_objects})

print(f'Current objects (with matching prefix): {len(current_objects)}', flush=True)
print(f'Current objects (with matching prefix; excluding collated zips): {len(current_objects[current_objects['CURRENT_OBJECTS'].str.contains('collated_') == False])}', flush=True)
if not current_objects.empty:
print('Obtaining current object metadata.')
current_objects['METADATA'] = current_objects['CURRENT_OBJECTS'].apply(find_metadata, bucket=bucket)
Expand Down Expand Up @@ -1335,7 +1349,7 @@ def error(self, message):
print(f'Using {nprocs} processes.')
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
process_files(s3_host,access_key, secret_key, bucket_name, current_objects, exclude, local_dir, destination_dir, perform_checksum, dryrun, log, global_collate, use_compression, client, mem_per_worker, collate_list_file, save_collate_list)
process_files(s3_host,access_key, secret_key, bucket_name, current_objects, exclude, local_dir, destination_dir, perform_checksum, dryrun, log, global_collate, use_compression, client, mem_per_worker, collate_list_file, save_collate_list, file_count_stop)
# success = True
# except Exception as e:
# print(e)
Expand Down

0 comments on commit 29d8fd6

Please sign in to comment.