From c9cfa45213b509daaf2da08fa1d77c5de2f5761c Mon Sep 17 00:00:00 2001 From: Dave McKay Date: Tue, 17 Dec 2024 13:17:46 +0000 Subject: [PATCH] . --- csd3-side/scripts/lsst-backup.py | 124 +++++++++++++++---------------- 1 file changed, 61 insertions(+), 63 deletions(-) diff --git a/csd3-side/scripts/lsst-backup.py b/csd3-side/scripts/lsst-backup.py index e332fce..0576531 100644 --- a/csd3-side/scripts/lsst-backup.py +++ b/csd3-side/scripts/lsst-backup.py @@ -182,11 +182,11 @@ def zip_and_upload(s3, bucket_name, api, destination_dir, local_dir, file_paths, client = get_client() # with annotate(parent_folder=parent_folder): zip_future = client.submit(zip_folders, - local_dir, - file_paths, - use_compression, - dryrun, - id, + local_dir, + file_paths, + use_compression, + dryrun, + id, mem_per_worker ) def get_zip_future(future): @@ -294,7 +294,7 @@ def zip_folders(local_dir:str, file_paths:list[str], use_compression:bool, dryru return zip_buffer.getvalue(), namelist else: return b'', [] - + def part_uploader(bucket_name, object_key, part_number, chunk_data, upload_id) -> dict: """ Uploads a part of a file to an S3 bucket. @@ -316,12 +316,12 @@ def part_uploader(bucket_name, object_key, part_number, chunk_data, upload_id) - Key=object_key, PartNumber=part_number, UploadId=upload_id)["ETag"]} - + def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_key, dryrun, mem_per_worker) -> str: """ Uploads a file to an S3 bucket. Calculates a checksum for the file - + Args: s3 (None | swiftclient.Connection): None or swiftclient Connection object. bucket_name (str): The name of the S3 bucket or Swift container name. @@ -353,7 +353,7 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k file_size = os.path.getsize(filename) use_future = False - + if file_size > 10 * 1024**3: if not dryrun: print(f'WARNING: File size of {file_size} bytes exceeds memory per worker of {mem_per_worker} bytes.', flush=True) @@ -433,7 +433,7 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k """ - Use multipart upload for large files """ - + obj = bucket.Object(object_key) mp_upload = obj.initiate_multipart_upload() chunk_size = 512 * 1024**2 # 512 MiB @@ -476,7 +476,7 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k bucket.put_object(Body=file_data, Key=object_key, ContentMD5=checksum_base64) except Exception as e: print(f'Error uploading {filename} to {bucket_name}/{object_key}: {e}') - + del file_data else: checksum_string = "DRYRUN" @@ -540,7 +540,7 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k checksum_string = checksum_hash.hexdigest() checksum_base64 = base64.b64encode(checksum_hash.digest()).decode() # file_data.seek(0) # Reset the file pointer to the start - + try: if file_size > mem_per_worker or file_size > 5 * 1024**3: # Check if file size is larger than 5GiB """ @@ -581,10 +581,10 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k # if use_future: # bucket.put_object(Body=get_client().gather(file_data), Key=object_key, ContentMD5=checksum_base64) # else: - s3.put_object(container=bucket_name, contents=file_data, content_type='multipart/mixed', obj=object_key, etag=checksum_string) + s3.put_object(container=bucket_name, contents=file_data, content_type='multipart/mixed', obj=object_key, etag=checksum_string) except Exception as e: print(f'Error uploading {filename} to {bucket_name}/{object_key}: {e}') - + del file_data else: checksum_string = "DRYRUN" @@ -602,7 +602,7 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k return_string += ',n/a' else: return_string += f',{checksum_string}' - + #for no zip contents return_string += ',n/a' return return_string @@ -611,7 +611,7 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte """ Uploads a file to an S3 bucket. Calculates a checksum for the file - + Args: s3 (None | swiftclient.Connection): None or Swift swiftclient.Connection object. bucket_name (str): The name of the S3 bucket. @@ -634,7 +634,7 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte filename = object_key.split('/')[-1] file_data_size = len(file_data) - + print(f'Uploading zip file "{filename}" for {folder} to {bucket_name}/{object_key}, {file_data_size} bytes, checksum = True, dryrun = {dryrun}', flush=True) """ - Upload the file to the bucket @@ -674,9 +674,9 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte header: LOCAL_FOLDER,LOCAL_PATH,FILE_SIZE,BUCKET_NAME,DESTINATION_KEY,CHECKSUM,ZIP_CONTENTS """ return_string = f'"{folder}","{filename}",{file_data_size},"{bucket_name}","{object_key}","{checksum_string}","{",".join(zip_contents)}"' - + return return_string - + elif api == 'swift': try: assert type(s3) is swiftclient.Connection @@ -685,7 +685,7 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte filename = object_key.split('/')[-1] file_data_size = len(file_data) - + print(f'Uploading zip file "{filename}" for {folder} to {bucket_name}/{object_key}, {file_data_size} bytes, checksum = True, dryrun = {dryrun}', flush=True) """ - Upload the file to the bucket @@ -699,7 +699,7 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte checksum_base64 = base64.b64encode(checksum_hash.digest()).decode() try: - + """ - Upload the file to the bucket """ @@ -725,7 +725,7 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte header: LOCAL_FOLDER,LOCAL_PATH,FILE_SIZE,BUCKET_NAME,DESTINATION_KEY,CHECKSUM,ZIP_CONTENTS """ return_string = f'"{folder}","{filename}",{file_data_size},"{bucket_name}","{object_key}","{checksum_string}","{",".join(zip_contents)}"' - + return return_string def print_stats(file_name_or_data, file_count, total_size, file_start, file_end, processing_start, total_size_uploaded, total_files_uploaded, collated) -> None: @@ -806,12 +806,12 @@ def upload_and_callback(s3, bucket_name, api, local_dir, folder, file_name_or_da else: print(f'Uploading {file_count} files from {folder}.') result = upload_to_bucket(s3, bucket_name, api, local_dir, folder, file_name_or_data, object_key, dryrun, mem_per_worker) - + file_end = datetime.now() print_stats(file_name_or_data, file_count, folder_files_size, file_start, file_end, processing_start, total_size_uploaded, total_files_uploaded, collated) with open(log, 'a') as logfile: logfile.write(f'{result}\n') - + del file_name_or_data return None @@ -902,7 +902,7 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des # if folder in scanned_list: # continue print(f'Processing {folder_num}/{total_all_folders} folders; {file_num}/{total_all_files} files in {local_dir}.', flush=True) - + # check if folder is in the exclude list if len(files) == 0 and len(sub_folders) == 0: print(f'Skipping subfolder - no files or subfolders.', flush=True) @@ -949,7 +949,7 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des mean_filesize = total_filesize / len(files) else: mean_filesize = 0 - + # check if any subfolders contain no subfolders and < 4 files if len(sub_folders) > 0: for sub_folder in sub_folders: @@ -990,7 +990,7 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des # print(object_names) print(f'Skipping subfolder - all files exist.', flush=True) continue - + if mean_filesize > max_zip_batch_size or not global_collate: print('Individual upload.', flush=True) @@ -1031,18 +1031,18 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des print(f'Sending {file_count} files (total size: {folder_files_size/1024**2:.0f} MiB) in {folder} to S3 bucket {bucket_name}.', flush=True) print(f'Individual files objects names: {object_names}', flush=True) - + try: for i,args in enumerate(zip( - repeat(s3), + repeat(s3), repeat(api), repeat(bucket_name), repeat(local_dir), - repeat(folder), + repeat(folder), folder_files, repeat(None), - object_names, - repeat(dryrun), + object_names, + repeat(dryrun), repeat(processing_start), repeat(file_count), repeat(folder_files_size), @@ -1067,8 +1067,8 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des with open('error_log.err', 'a') as f: f.write(f'Unexpected error: {e}\n') # Exit gracefully - sys.exit(1) - + sys.exit(1) + # release block of files if the list for results is greater than 4 times the number of processes elif len(folder_files) > 0 and global_collate: # small files in folder @@ -1099,14 +1099,14 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des symlink_targets.append(target) #add real file to symlink_obj_names list symlink_obj_names.append(symlink_obj_name) - + # append symlink_targets and symlink_obj_names to folder_files and object_names folder_files.extend(symlink_targets) object_names.extend(symlink_obj_names) file_count = len(object_names) #always do this AFTER removing "current_objects" to avoid re-uploading - + # Level n collation size = zip_batch_sizes[-1] print(f'Size: {size}') @@ -1126,7 +1126,7 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des folder_files_size = np.sum(np.array([os.lstat(filename).st_size for filename in folder_files])) print(f'Number of zip files: {len(zip_batch_files)}', flush=True) print('', flush=True) - + if global_collate: ############################### # CHECK HERE FOR ZIP CONTENTS # @@ -1158,7 +1158,7 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des # print(f'id: {[i for i in range(len(zip_batch_files))]}') to_collate = pd.DataFrame.from_dict(to_collate_list) - client.scatter(to_collate) + client.scatter(to_collate) del zip_batch_files, zip_batch_object_names, zip_batch_sizes else: # with open(collate_list_file, 'r') as f: @@ -1242,7 +1242,7 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des mem_per_worker, )) # mem_check(zul_futures) - + ######################## # Monitor upload tasks # ######################## @@ -1275,8 +1275,6 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des # Re-save collate list to reflect uploads if save_collate_file: to_collate.to_csv(collate_list_file, index=False) - # with open(collate_list_file, 'w') as f: - # json.dump(to_collate, f) else: print(f'Collate list not saved.') @@ -1379,7 +1377,7 @@ def error(self, message): prefix = args.S3_prefix sub_dirs = args.S3_folder print(f'sub_dirs {sub_dirs}') - nprocs = args.nprocs + nprocs = args.nprocs threads_per_worker = args.threads_per_worker print(f'threads per worker: {threads_per_worker}') global_collate = not args.no_collate # internally, flag turns *on* collate, but for user no-collate turns it off - makes flag more intuitive @@ -1399,14 +1397,14 @@ def error(self, message): print(f'Collate list will be read from and re-saved to {collate_list_file}.') if (save_collate_list or collate_list_file) and not global_collate: parser.error('Collate list file provided but collation is turned off. Please enable collation to use the collate list file.') - + file_count_stop = not args.no_file_count_stop # internally, flag turns *on* file-count-stop, but for user no-file-count-stop turns it off - makes flag more intuitive - + if args.exclude: exclude = pd.Series(args.exclude) else: exclude = pd.Series([]) - + print(f'Config: {args}') if save_config: @@ -1434,14 +1432,14 @@ def error(self, message): if not local_dir or not prefix or not bucket_name: parser.print_help() sys.exit(1) - + #print hostname uname = subprocess.run(['uname', '-n'], capture_output=True) print(f'Running on {uname.stdout.decode().strip()}') # Initiate timing start = datetime.now() - + ##allow top-level folder to be provided with S3-folder == '' if sub_dirs == '': log_suffix = 'lsst-backup.csv' # DO NOT CHANGE @@ -1456,8 +1454,8 @@ def error(self, message): # check for previous suffix (remove after testing) previous_suffix = 'files.csv' previous_log = f"{prefix}-{'-'.join(sub_dirs.split('/'))}-{previous_suffix}" - destination_dir = f"{prefix}/{sub_dirs}" - + destination_dir = f"{prefix}/{sub_dirs}" + # Add titles to log file if not os.path.exists(log): if os.path.exists(previous_log): @@ -1469,7 +1467,7 @@ def error(self, message): print(f'Created backup log file {log}') with open(log, 'a') as logfile: # don't open as 'w' in case this is a continuation logfile.write('LOCAL_FOLDER,LOCAL_PATH,FILE_SIZE,BUCKET_NAME,DESTINATION_KEY,CHECKSUM,ZIP_CONTENTS\n') - + # Setup bucket try: if bm.check_keys(api): @@ -1492,14 +1490,14 @@ def error(self, message): sys.exit() print(f'Using {api.capitalize()} API with host {s3_host}') - + if api == 's3': s3 = bm.get_resource() bucket_list = bm.bucket_list(s3) elif api == 'swift': s3 = bm.get_conn_swift() bucket_list = bm.bucket_list_swift(s3) - + if bucket_name not in bucket_list: if not dryrun: if api == 's3': @@ -1514,7 +1512,7 @@ def error(self, message): else: print(f'Bucket exists: {bucket_name}') print('dryrun == True, so continuing.') - + if api == 's3': bucket = s3.Bucket(bucket_name) elif api == 'swift': @@ -1548,11 +1546,11 @@ def error(self, message): current_objects = bm.object_list_swift(s3, bucket_name, prefix=destination_dir, count=True) print() print(f'Done.\nFinished at {datetime.now()}, elapsed time = {datetime.now() - start}', flush=True) - + current_objects = pd.DataFrame.from_dict({'CURRENT_OBJECTS':current_objects}) - + print(f'Current objects (with matching prefix): {len(current_objects)}', flush=True) - + if not current_objects.empty: print(f"Current objects (with matching prefix; excluding collated zips): {len(current_objects[current_objects['CURRENT_OBJECTS'].str.contains('collated_') == False])}", flush=True) print(f'Obtaining current object metadata, elapsed time = {datetime.now() - start}', flush=True) @@ -1568,7 +1566,7 @@ def error(self, message): print(f'Done, elapsed time = {datetime.now() - start}', flush=True) # current_objects.to_csv('current_objects.csv', index=False) # exit() - + ## check if log exists in the bucket, and download it and append top it if it does # TODO: integrate this with local check for log file print(f'Checking for existing log files in bucket {bucket_name}, elapsed time = {datetime.now() - start}', flush=True) @@ -1594,7 +1592,7 @@ def error(self, message): # check local_dir formatting while local_dir[-1] == '/': local_dir = local_dir[:-1] - + with warnings.catch_warnings(): warnings.filterwarnings('ignore') print(f'Processing files in {local_dir}, elapsed time = {datetime.now() - start}', flush=True) @@ -1609,7 +1607,7 @@ def error(self, message): # print(f'Restartings Dask client due to error: {e}') # print(f'Current objects will be repopulated.') # continue - + print(f'Finished uploads at {datetime.now()}, elapsed time = {datetime.now() - start}') print(f'Dask Client closed at {datetime.now()}, elapsed time = {datetime.now() - start}') print('Completing logging.') @@ -1635,16 +1633,16 @@ def error(self, message): api, local_dir, '/', #path - log, - os.path.basename(log), + log, + os.path.basename(log), False, # dryrun mem_per_worker, ) - + final_size = logdf["FILE_SIZE"].sum() / 1024**2 try: final_transfer_speed = final_size / final_time_seconds - + except ZeroDivisionError: final_transfer_speed = 0 try: