From fa346f988f9603b44d10f0df8aef81dca77fa302 Mon Sep 17 00:00:00 2001 From: Dave McKay Date: Mon, 16 Dec 2024 15:04:25 +0000 Subject: [PATCH] 81 bug skip zips (#82) * very experimental zip_upload * specify dask npartitions * dask meta * bool * non-problem * dtypes * dtypes as strs * list -> object * try pandas for lists * don't use dask for to_collate for now * retry * back to pandas * typo * . * pass api to zip_and_upload * don't pass id explicitly * print head * . * print pandas * test zip_and_upload * . * number zipfiles from zero * just pandas * debugging ValueError in literal_eval * . * . * id is int * retry dd * object passed is ps.Series * progress * comment out debugging prints * debug * typos * . * remove assertion * . * to_collate_uploads * pass pd not dd * ds type * try args * zul * try compute * printf * print k * . * . * . * while test * print zul * zul compute * print zip data size * meta for future and string * try dataframe as meta * debugging * change return statement * debug * wait * back to list * don't use iloc * just pandas * pass id and file_list * clean up as_completed loops * working - removed debugging prints * "to upload" indicator * debugging set upload to False * wait on zip_folders * don't use subtask for zip * on't explicitly del zuls * loop until no future * try no subtasks * add mem_check * wait on each * . * retry - if slow try 48 at a time * complete redo of upload and zul futures * typo * working, but upload part slow, possibly serial * whole sequence in zip_and_upload * debug * debug * reverse if on upload_futures * del at index * list.remove * bufix * zip_and_upload has to return something * debug * check False uploads * assertion for uploads * typo * compare skiping to len(upload==False) * linting * print lens and skipping * CLI and prefix added to bucket_contents.py * formatting * fix checking zip_batch list * return skipping * pass skipping * simplify skipping * . * . * . * bug found - uploaded zips not updated in csv file * bugfix - set upload to false in to_collate before re-saving * debugging save --- csd3-side/scripts/lsst-backup.py | 577 ++++++++++++++++++++++--------- scripts/bucket_contents.py | 70 ++-- 2 files changed, 457 insertions(+), 190 deletions(-) diff --git a/csd3-side/scripts/lsst-backup.py b/csd3-side/scripts/lsst-backup.py index e332fce..508960f 100644 --- a/csd3-side/scripts/lsst-backup.py +++ b/csd3-side/scripts/lsst-backup.py @@ -54,6 +54,67 @@ # libc = ctypes.CDLL("libc.so.6") # return libc.malloc_trim(0) +def compare_zip_contents(collate_objects: list[str] | pd.DataFrame, current_objects: pd.DataFrame, destination_dir: str, skipping: int) -> list[str] | pd.DataFrame: + """ + Compare the contents of zip files to determine which files need to be uploaded. + + Parameters: + collate_objects (list[str] | pd.DataFrame): A list of file paths or a DataFrame of file paths to be collated into zip files containing 'object_names' and 'upload' columns. + current_objects (pd.DataFrame): A DataFrame containing metadata of current zip files, including their contents. + destination_dir (str): The directory where the zip files will be stored. + + Returns: + list[str] | pd.DataFrame: A list of indices of zip files to be uploaded if collate_objects is a list, or a DataFrame with an 'upload' column indicating which zip files need to be uploaded if collate_objects is a DataFrame. + """ + if type(collate_objects) == pd.DataFrame: + df = True + else: + df = False + zips_to_upload = [] + + for i in range(len(collate_objects)): + if df: + cmp = [x.replace(destination_dir+'/', '') for x in collate_objects['object_names'].iloc[i]] + else: + cmp = [x.replace(destination_dir+'/', '') for x in collate_objects[i]] + if not current_objects.empty: + if current_objects['METADATA'].isin([cmp]).any(): + existing_zip_contents = current_objects[current_objects['METADATA'].isin([cmp])]['METADATA'].values[0] + if all([x in existing_zip_contents for x in cmp]): + print(f'Zip file {destination_dir}/collated_{i}.zip already exists and file lists match - skipping.', flush=True) + if df: + if collate_objects.iloc[i]['upload']: + collate_objects.iloc[i]['upload'] = False + skipping += 1 + else: + skipping += 1 + else: + print(f'Zip file {destination_dir}/collated_{i}.zip already exists but file lists do not match - reuploading.', flush=True) + if df: + if not collate_objects.iloc[i]['upload']: + collate_objects.iloc[i]['upload'] = True + skipping -= 1 + else: + zips_to_upload.append(i) + else: + print(f'Zip file {destination_dir}/collated_{i}.zip does not exist uploading.', flush=True) + if df: + if not collate_objects.iloc[i]['upload']: + collate_objects.iloc[i]['upload'] = True + skipping -= 1 + else: + zips_to_upload.append(i) + else: + print(f'Zip file {destination_dir}/collated_{i}.zip does not exist uploading.', flush=True) + if df: + collate_objects.iloc[i]['upload'] = True + else: + zips_to_upload.append(i) + if df: + return collate_objects, skipping + else: + return zips_to_upload, skipping + def to_rds_path(home_path: str, local_dir: str) -> str: # get base folder for rds- folders @@ -174,35 +235,73 @@ def mem_check(futures): def remove_duplicates(l: list[dict]) -> list[dict]: return pd.DataFrame(l).drop_duplicates().to_dict(orient='records') -def zip_and_upload(s3, bucket_name, api, destination_dir, local_dir, file_paths, total_size_uploaded, total_files_uploaded, use_compression, dryrun, id, mem_per_worker) -> tuple[str, int, bytes]: +def zip_and_upload(id, file_paths, s3, bucket_name, api, destination_dir, local_dir, total_size_uploaded, total_files_uploaded, use_compression, dryrun, processing_start, mem_per_worker) -> tuple[object, str]: + """ + Zips a list of files and uploads the resulting zip file to an S3 bucket. + Args: + id (str): Identifier for the zip file. + file_paths (list): List of file paths to be included in the zip file. + s3 (swiftclient.Connection | None): if api == "swift": swiftclient.Connection for uploading the zip file; elif api == "s3": None. + bucket_name (str): Name of the S3 bucket where the zip file will be uploaded. + api (str): API name: "swift" or "s3". + destination_dir (str): Destination "directory" in the S3 bucket. + local_dir (str): Local directory containing the files to be zipped. + total_size_uploaded (int): Total size of files uploaded so far. + total_files_uploaded (int): Total number of files uploaded so far. + use_compression (bool): Whether to use compression for the zip file. + dryrun (bool): If True, perform a dry run without actual upload. + processing_start (datetime): Start time of the processing. + mem_per_worker (int): Memory allocated per worker. + Returns: + bool: True if a zip was created and uploaded, False if not.. + """ # print('in zip_and_upload', flush=True) + # try: + # assert type(ds) is pd.Series + # except AssertionError: + # raise AssertionError('ds must be a pandas Series object.') + # id = ds['id'] + # file_paths = ds['file_paths'] + #debugging + # print(f"DEBUGGING - id: {id}", flush=True) + # print(f"DEBUGGING - file_paths: {file_paths}", flush=True) + + print(f'Zipping and uploading {len(file_paths)} files from {local_dir} to {destination_dir}/collated_{id}.zip.', flush=True) ############# # zip part # ############# client = get_client() # with annotate(parent_folder=parent_folder): - zip_future = client.submit(zip_folders, - local_dir, - file_paths, - use_compression, - dryrun, - id, - mem_per_worker - ) - def get_zip_future(future): - return future.result() - tries = 0 - zip_data = None - namelist = None - while tries < 5: - try: - zip_data, namelist = get_zip_future(zip_future) - tries += 1 - except Exception as e: - sleep(5) - print(f'Zip future timed out {tries}/5') - if zip_data is None and namelist is None: - raise Exception('Zip future timed out 5 times.') + # zip_future = client.submit(zip_folders, + # local_dir, + # file_paths, + # use_compression, + # dryrun, + # id, + # mem_per_worker + # ) + # def get_zip_future(future): + # return future.result() + # wait(zip_future) + + # # print('DEBUGGING - Got zip', flush=True) + # tries = 0 + # zip_data = None + # namelist = None + # while tries < 5: + # try: + # zip_data, namelist = get_zip_future(zip_future) + # tries += 1 + # except Exception as e: + # sleep(5) + # print(f'Zip future timed out {tries}/5') + # if zip_data is None and namelist is None: + # raise Exception('Zip future timed out 5 times.') + + zip_data, namelist = zip_folders(local_dir, file_paths, use_compression, dryrun, id, mem_per_worker) + print('Created zipFile in memory', flush=True) + + # print(f'DEBUGGING - Zip data size: {len(zip_data)} bytes.', flush=True) # if len(zip_data) > mem_per_worker/2: # print('Scattering zip data.') # scattered_zip_data = client.scatter(zip_data) @@ -214,11 +313,10 @@ def get_zip_future(future): print(f'zip_object_key: {zip_object_key}', flush=True) if namelist == []: print(f'No files to upload in zip file.') - return None, zip_object_key #+' nothing to upload' - else: - print(f'Uploading zip file containing {len(file_paths)} files to S3 bucket {bucket_name} to key {zip_object_key}.', flush=True) - # with annotate(parent_folder=parent_folder): - f = client.submit(upload_and_callback, + return False #, zip_object_key, namelist #+' nothing to upload' + else: # for no subtasks + # return zip_data, zip_object_key, namelist, len(zip_data) + upload_and_callback( s3, bucket_name, api, @@ -228,15 +326,56 @@ def get_zip_future(future): namelist, zip_object_key, dryrun, - datetime.now(), - 1, + processing_start, + 1, # i.e., 1 zip file len(zip_data), total_size_uploaded, total_files_uploaded, True, mem_per_worker - ) - return f, zip_object_key + ) + return True + # else: + # print(f'Uploading zip file containing {len(file_paths)} files to S3 bucket {bucket_name} to key {zip_object_key}.', flush=True) + # # with annotate(parent_folder=parent_folder): + # f = client.submit(upload_and_callback, + # s3, + # bucket_name, + # api, + # local_dir, + # destination_dir, + # zip_data, + # namelist, + # zip_object_key, + # dryrun, + # datetime.now(), + # 1, + # len(zip_data), + # total_size_uploaded, + # total_files_uploaded, + # True, + # mem_per_worker + # ) + # return f, zip_object_key + + # Try calling zip_and_upload directly + # s3, + # bucket_name, + # api, + # local_dir, + # destination_dir, + # result[0], # zip_data + # result[2], # namelist + # result[1], # zip_object_key + # dryrun, + # processing_start, + # 1, # 1 zip file + # result[3], # len(zip_data) (size in bytes) + # total_size_uploaded, + # total_files_uploaded, + # True, + # mem_per_worker + def zip_folders(local_dir:str, file_paths:list[str], use_compression:bool, dryrun:bool, id:int, mem_per_worker:int) -> tuple[str, int, bytes]: """ @@ -294,7 +433,7 @@ def zip_folders(local_dir:str, file_paths:list[str], use_compression:bool, dryru return zip_buffer.getvalue(), namelist else: return b'', [] - + def part_uploader(bucket_name, object_key, part_number, chunk_data, upload_id) -> dict: """ Uploads a part of a file to an S3 bucket. @@ -316,12 +455,12 @@ def part_uploader(bucket_name, object_key, part_number, chunk_data, upload_id) - Key=object_key, PartNumber=part_number, UploadId=upload_id)["ETag"]} - + def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_key, dryrun, mem_per_worker) -> str: """ Uploads a file to an S3 bucket. Calculates a checksum for the file - + Args: s3 (None | swiftclient.Connection): None or swiftclient Connection object. bucket_name (str): The name of the S3 bucket or Swift container name. @@ -353,7 +492,7 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k file_size = os.path.getsize(filename) use_future = False - + if file_size > 10 * 1024**3: if not dryrun: print(f'WARNING: File size of {file_size} bytes exceeds memory per worker of {mem_per_worker} bytes.', flush=True) @@ -433,7 +572,7 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k """ - Use multipart upload for large files """ - + obj = bucket.Object(object_key) mp_upload = obj.initiate_multipart_upload() chunk_size = 512 * 1024**2 # 512 MiB @@ -476,7 +615,7 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k bucket.put_object(Body=file_data, Key=object_key, ContentMD5=checksum_base64) except Exception as e: print(f'Error uploading {filename} to {bucket_name}/{object_key}: {e}') - + del file_data else: checksum_string = "DRYRUN" @@ -540,7 +679,7 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k checksum_string = checksum_hash.hexdigest() checksum_base64 = base64.b64encode(checksum_hash.digest()).decode() # file_data.seek(0) # Reset the file pointer to the start - + try: if file_size > mem_per_worker or file_size > 5 * 1024**3: # Check if file size is larger than 5GiB """ @@ -581,10 +720,10 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k # if use_future: # bucket.put_object(Body=get_client().gather(file_data), Key=object_key, ContentMD5=checksum_base64) # else: - s3.put_object(container=bucket_name, contents=file_data, content_type='multipart/mixed', obj=object_key, etag=checksum_string) + s3.put_object(container=bucket_name, contents=file_data, content_type='multipart/mixed', obj=object_key, etag=checksum_string) except Exception as e: print(f'Error uploading {filename} to {bucket_name}/{object_key}: {e}') - + del file_data else: checksum_string = "DRYRUN" @@ -602,7 +741,7 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k return_string += ',n/a' else: return_string += f',{checksum_string}' - + #for no zip contents return_string += ',n/a' return return_string @@ -611,7 +750,7 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte """ Uploads a file to an S3 bucket. Calculates a checksum for the file - + Args: s3 (None | swiftclient.Connection): None or Swift swiftclient.Connection object. bucket_name (str): The name of the S3 bucket. @@ -634,7 +773,7 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte filename = object_key.split('/')[-1] file_data_size = len(file_data) - + print(f'Uploading zip file "{filename}" for {folder} to {bucket_name}/{object_key}, {file_data_size} bytes, checksum = True, dryrun = {dryrun}', flush=True) """ - Upload the file to the bucket @@ -674,9 +813,9 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte header: LOCAL_FOLDER,LOCAL_PATH,FILE_SIZE,BUCKET_NAME,DESTINATION_KEY,CHECKSUM,ZIP_CONTENTS """ return_string = f'"{folder}","{filename}",{file_data_size},"{bucket_name}","{object_key}","{checksum_string}","{",".join(zip_contents)}"' - + return return_string - + elif api == 'swift': try: assert type(s3) is swiftclient.Connection @@ -685,7 +824,7 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte filename = object_key.split('/')[-1] file_data_size = len(file_data) - + print(f'Uploading zip file "{filename}" for {folder} to {bucket_name}/{object_key}, {file_data_size} bytes, checksum = True, dryrun = {dryrun}', flush=True) """ - Upload the file to the bucket @@ -699,7 +838,7 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte checksum_base64 = base64.b64encode(checksum_hash.digest()).decode() try: - + """ - Upload the file to the bucket """ @@ -725,7 +864,7 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte header: LOCAL_FOLDER,LOCAL_PATH,FILE_SIZE,BUCKET_NAME,DESTINATION_KEY,CHECKSUM,ZIP_CONTENTS """ return_string = f'"{folder}","{filename}",{file_data_size},"{bucket_name}","{object_key}","{checksum_string}","{",".join(zip_contents)}"' - + return return_string def print_stats(file_name_or_data, file_count, total_size, file_start, file_end, processing_start, total_size_uploaded, total_files_uploaded, collated) -> None: @@ -806,12 +945,12 @@ def upload_and_callback(s3, bucket_name, api, local_dir, folder, file_name_or_da else: print(f'Uploading {file_count} files from {folder}.') result = upload_to_bucket(s3, bucket_name, api, local_dir, folder, file_name_or_data, object_key, dryrun, mem_per_worker) - + file_end = datetime.now() print_stats(file_name_or_data, file_count, folder_files_size, file_start, file_end, processing_start, total_size_uploaded, total_files_uploaded, collated) with open(log, 'a') as logfile: logfile.write(f'{result}\n') - + del file_name_or_data return None @@ -872,6 +1011,7 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des zip_batch_files = [[]] zip_batch_object_names = [[]] zip_batch_sizes = [0] + # if save_collate_file: # scanned_list = [] # scanned_dicts = [] @@ -902,7 +1042,7 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des # if folder in scanned_list: # continue print(f'Processing {folder_num}/{total_all_folders} folders; {file_num}/{total_all_files} files in {local_dir}.', flush=True) - + # check if folder is in the exclude list if len(files) == 0 and len(sub_folders) == 0: print(f'Skipping subfolder - no files or subfolders.', flush=True) @@ -949,7 +1089,7 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des mean_filesize = total_filesize / len(files) else: mean_filesize = 0 - + # check if any subfolders contain no subfolders and < 4 files if len(sub_folders) > 0: for sub_folder in sub_folders: @@ -990,7 +1130,7 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des # print(object_names) print(f'Skipping subfolder - all files exist.', flush=True) continue - + if mean_filesize > max_zip_batch_size or not global_collate: print('Individual upload.', flush=True) @@ -1031,18 +1171,18 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des print(f'Sending {file_count} files (total size: {folder_files_size/1024**2:.0f} MiB) in {folder} to S3 bucket {bucket_name}.', flush=True) print(f'Individual files objects names: {object_names}', flush=True) - + try: for i,args in enumerate(zip( - repeat(s3), + repeat(s3), repeat(api), repeat(bucket_name), repeat(local_dir), - repeat(folder), + repeat(folder), folder_files, repeat(None), - object_names, - repeat(dryrun), + object_names, + repeat(dryrun), repeat(processing_start), repeat(file_count), repeat(folder_files_size), @@ -1067,8 +1207,8 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des with open('error_log.err', 'a') as f: f.write(f'Unexpected error: {e}\n') # Exit gracefully - sys.exit(1) - + sys.exit(1) + # release block of files if the list for results is greater than 4 times the number of processes elif len(folder_files) > 0 and global_collate: # small files in folder @@ -1099,14 +1239,14 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des symlink_targets.append(target) #add real file to symlink_obj_names list symlink_obj_names.append(symlink_obj_name) - + # append symlink_targets and symlink_obj_names to folder_files and object_names folder_files.extend(symlink_targets) object_names.extend(symlink_obj_names) file_count = len(object_names) #always do this AFTER removing "current_objects" to avoid re-uploading - + # Level n collation size = zip_batch_sizes[-1] print(f'Size: {size}') @@ -1126,28 +1266,36 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des folder_files_size = np.sum(np.array([os.lstat(filename).st_size for filename in folder_files])) print(f'Number of zip files: {len(zip_batch_files)}', flush=True) print('', flush=True) - + if global_collate: ############################### # CHECK HERE FOR ZIP CONTENTS # ############################### if not os.path.exists(collate_list_file): - for i, zip_batch in enumerate(zip_batch_object_names): - cmp = [x.replace(destination_dir+'/', '') for x in zip_batch] - if not current_objects.empty: - if current_objects['METADATA'].isin([cmp]).any(): - existing_zip_contents = current_objects[current_objects['METADATA'].isin([cmp])]['METADATA'].values[0] - if all([x in existing_zip_contents for x in cmp]): - print(f'Zip file {destination_dir}/collated_{i+1}.zip already exists and file lists match - skipping.', flush=True) - zip_batch_object_names.pop(i) - zip_batch_files.pop(i) - continue - else: - print(f'Zip file {destination_dir}/collated_{i+1}.zip already exists but file lists do not match - reuploading.', flush=True) + zips_to_upload, skipping = compare_zip_contents(zip_batch_object_names, current_objects, destination_dir, 0) # Create dict for zip files for i in range(len(zip_batch_files)): - to_collate_list.append({'id':i, 'object_names':zip_batch_object_names[i], 'file_paths':zip_batch_files[i], 'size':zip_batch_sizes[i]}) + if i in zips_to_upload: + to_collate_list.append( + { + 'id': i, + 'object_names': zip_batch_object_names[i], + 'file_paths': zip_batch_files[i], + 'size': zip_batch_sizes[i], + 'upload': True + } + ) + else: + to_collate_list.append( + { + 'id': i, + 'object_names': zip_batch_object_names[i], + 'file_paths': zip_batch_files[i], + 'size': zip_batch_sizes[i], + 'upload': False + } + ) # to_collate['id'].append(i) # to_collate['object_names'].append(zip_batch_object_names[i]) # to_collate['file_paths'].append(file_paths) @@ -1158,29 +1306,24 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des # print(f'id: {[i for i in range(len(zip_batch_files))]}') to_collate = pd.DataFrame.from_dict(to_collate_list) - client.scatter(to_collate) + client.scatter(to_collate) del zip_batch_files, zip_batch_object_names, zip_batch_sizes + else: # with open(collate_list_file, 'r') as f: + # to_collate = dd.from_pandas(pd.read_csv(collate_list_file), npartitions=len(client.scheduler_info()['workers'])*10) + # to_collate.object_names = to_collate.object_names.apply(literal_eval, meta=('object_names', 'object')) + # to_collate.file_paths = to_collate.file_paths.apply(literal_eval, meta=('file_paths', 'object')) + # to_collate = to_collate.compute() to_collate = pd.read_csv(collate_list_file) to_collate.object_names = to_collate.object_names.apply(literal_eval) to_collate.file_paths = to_collate.file_paths.apply(literal_eval) - client.scatter(to_collate) + # client.scatter(to_collate) print(f'Loaded collate list from {collate_list_file}, len={len(to_collate)}.', flush=True) if not current_objects.empty: # now using pandas for both current_objects and to_collate - this could be re-written to using vectorised operations - droplist = [] - for i in range(len(to_collate['object_names'])): - # print(zip_object_names) - cmp = [x.replace(destination_dir+'/', '') for x in to_collate.iloc[i]['object_names']] - if current_objects['METADATA'].isin([cmp]).any(): - existing_zip_contents = current_objects[current_objects['METADATA'].isin([cmp])]['METADATA'].values[0] - if all([x in existing_zip_contents for x in cmp]): - print(f'Zip file {destination_dir}/collated_{i+1}.zip from {collate_list_file} already exists and file lists match - skipping.', flush=True) - droplist.append(i) - else: - print(f'Zip file {destination_dir}/collated_{i+1}.zip from {collate_list_file} already exists but file lists do not match - reuploading.', flush=True) - to_collate.drop(droplist, inplace=True) + to_collate, skipping = compare_zip_contents(to_collate, current_objects, destination_dir, len(to_collate[to_collate.upload == False])) + if save_collate_file: print(f'Saving collate list to {collate_list_file}, len={len(to_collate)}.', flush=True) # with open(collate_list_file, 'w') as f: @@ -1195,78 +1338,173 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des # print(to_collate) # print(type(to_collate.iloc[0]['file_paths'])) # exit() + print(to_collate[to_collate.upload == False][['file_paths','id', 'upload']]) + print(f'len(to_collate[to_collate.upload == False]): {len(to_collate[to_collate.upload == False])}, skipping: {skipping}') + to_collate_uploads = to_collate[to_collate.upload == True][['file_paths','id', 'upload']] + # print(f'to_collate_uploads: {to_collate_uploads}') + + # to_collate_uploads = dd.from_pandas(to_collate[to_collate.upload == True][['file_paths','id']], npartitions=len(client.scheduler_info()['workers'])*10) + print(f'to_collate: {to_collate}, {len(to_collate)}') + print(f'to_collate_uploads: {to_collate_uploads}, {len(to_collate_uploads)}') + assert to_collate_uploads['upload'].all() + #current_objects['CURRENT_OBJECTS'].apply(find_metadata_swift, conn=s3, container_name=bucket_name) + # print(to_collate_uploads.head(), flush=True) + # zip_and_upload(to_collate[to_collate.upload == True][['file_paths','id']].iloc[0], s3=s3, bucket_name=bucket_name, api=api, destination_dir=destination_dir, local_dir=local_dir, total_size_uploaded=total_size_uploaded, total_files_uploaded=total_files_uploaded, use_compression=use_compression, dryrun=dryrun, mem_per_worker=mem_per_worker) + # exit() + for id in to_collate_uploads['id']: + if len(upload_futures) >= len(client.scheduler_info()['workers'])*2: + # print('Waiting for zip slots to free up.', flush=True) + while len(upload_futures) >= len(client.scheduler_info()['workers'])*2: + print(len(upload_futures), flush=True) + for ulf in upload_futures: + if 'exception' in ulf.status or 'error' in ulf.status: + f_tuple = ulf.exception(), ulf.traceback() + failed.append(f_tuple) + upload_futures.remove(ulf) + else: + upload_futures.remove(ulf) + to_collate.loc[to_collate['id'] == id, 'upload'] = False - - for i in range(len(to_collate)): - # mem_half_full = len(zul_futures)*np.sum(np.array([os.stat(fp).st_size for fp in to_collate.iloc[i]['file_paths']])) > len(client.scheduler_info()['workers'])*mem_per_worker / 2 - # if mem_half_full: - # # clean up finished futures, but don't block - # print(f'Memory is half full. Cleaning up finished futures.', flush=True) - # for f in zul_futures: - # result = f.result() - # if result[0] is not None: - # upload_futures.append(result[0]) - # to_collate = to_collate[to_collate.object_names != result[1]] - # print(f'Zip {result[1]} created and added to upload queue.', flush=True) - # del f - # else: - # print(f'No files to zip as {result[1]}. Skipping upload.', flush=True) - # del f - # if len(upload_futures) > 0: - # if sum([f.status == 'finished' for f in upload_futures]) > 0: - # for f in upload_futures: - # if 'exception' in f.status and f not in failed: - # f_tuple = f.exception(), f.traceback() - # del f - # if f_tuple not in failed: - # failed.append(f_tuple) - # elif 'finished' in f.status: - # del f - # print('Rebalancing memory.', flush=True) - # client.rebalance() - - mem_check(zul_futures+upload_futures) - zul_futures.append(client.submit( + upload_futures.append(client.submit( zip_and_upload, + id, + to_collate_uploads[to_collate_uploads.id == id]['file_paths'].values[0], s3, bucket_name, api, destination_dir, local_dir, - to_collate.iloc[i]['file_paths'], total_size_uploaded, total_files_uploaded, use_compression, dryrun, - i, + processing_start, mem_per_worker, )) - # mem_check(zul_futures) - + for ulf in as_completed(upload_futures): + if 'exception' in ulf.status or 'error' in ulf.status: + f_tuple = ulf.exception(), ulf.traceback() + failed.append(f_tuple) + upload_futures.remove(ulf) + else: + upload_futures.remove(ulf) + to_collate.loc[to_collate['id'] == id, 'upload'] = False + + + + # for zul_f in as_completed(zul_futures): # is a zip_and_upload_future + # # mem_check(zul_futures+upload_futures) + # if len(upload_futures) >= len(client.scheduler_info()['workers']): + # print('Waiting for upload slots to free up.', flush=True) + # for f in as_completed(upload_futures): + # if 'exception' in f.status or 'error' in f.status: + # f_tuple = f.exception(), f.traceback() + # failed.append(f_tuple) + # del f + # else: + # del f + # result = zul_f.result() + # if result[0] is not None: + # upload_futures.append(client.submit(upload_and_callback, + # s3, + # bucket_name, + # api, + # local_dir, + # destination_dir, + # result[0], # zip_data + # result[2], # namelist + # result[1], # zip_object_key + # dryrun, + # processing_start, + # 1, # 1 zip file + # result[3], # len(zip_data) (size in bytes) + # total_size_uploaded, + # total_files_uploaded, + # True, + # mem_per_worker + # )) + # to_collate.loc[to_collate['object_names'] == result[1], 'upload'] = False + # print(f'Zip {result[1]} created and added to upload queue.', flush=True) + # print(f'To upload: {len(to_collate[to_collate.upload == True])} zips remaining.', flush=True) + # del zul_f + # else: + # print(f'No files to zip as {result[1]}. Skipping upload.', flush=True) + # del zul_f + # del zul_futures + + # zul_futures = to_collate_uploads.apply(zip_and_upload, axis=1, + # args=(s3, + # bucket_name, + # api, + # destination_dir, + # local_dir, + # total_size_uploaded, + # total_files_uploaded, + # use_compression, + # dryrun, + # mem_per_worker), + # meta=pd.DataFrame(columns=['future','zip_object_key'], dtype='object') + # ) + # print('zul_futures') + # print(zul_futures) + # wait(zul_futures) + # print('waiting') + # exit() + # print(type(zul_futures)) + # for i in range(len(to_collate)): + # mem_check(zul_futures+upload_futures) + # zul_futures.append(client.submit( + # zip_and_upload, + # s3, + # bucket_name, + # api, + # destination_dir, + # local_dir, + # to_collate.iloc[i]['file_paths'], + # total_size_uploaded, + # total_files_uploaded, + # use_compression, + # dryrun, + # i, + # mem_per_worker, + # )) + # # mem_check(zul_futures) + + ######################## # Monitor upload tasks # ######################## - print('Monitoring zip tasks.', flush=True) - for f in as_completed(zul_futures): - result = f.result() - if result[0] is not None: - upload_futures.append(result[0]) - to_collate = to_collate[to_collate.object_names != result[1]] - print(f'Zip {result[1]} created and added to upload queue.', flush=True) - del f - else: - print(f'No files to zip as {result[1]}. Skipping upload.', flush=True) - del f - - # fire_and_forget(upload_futures) - for f in as_completed(upload_futures): - if 'exception' in f.status or 'error' in f.status: - f_tuple = f.exception(), f.traceback() - failed.append(f_tuple) - del f - elif 'finished' in f.status: - del f + # print('Monitoring zip tasks.', flush=True) + #just upload futures + + + # del f + # elif 'finished' in f.status: + # del f + # del upload_futures + + # upload_futures and zul_futures + # while len(upload_futures) > 0 or len(zul_futures) > 0: + # for f in as_completed(upload_futures+zul_futures): + # if f in zul_futures: # is a zip_and_upload_future + # result = f.result() + # if result[0] is not None: + # upload_futures.append(result[0]) + # to_collate.loc[to_collate['object_names'] == result[1], 'upload'] = False + # print(f'Zip {result[1]} created and added to upload queue.', flush=True) + # print(f'To upload: {len(to_collate[to_collate.upload == True])} zips remaining.', flush=True) + # del f + # else: + # print(f'No files to zip as {result[1]}. Skipping upload.', flush=True) + # del f + # else: # is an upload_future + # if 'exception' in f.status or 'error' in f.status: + # f_tuple = f.exception(), f.traceback() + # failed.append(f_tuple) + # del f + # elif 'finished' in f.status: + # del f if failed: for i, failed_upload in enumerate(failed): @@ -1274,6 +1512,7 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des # Re-save collate list to reflect uploads if save_collate_file: + print(f'Saving updated collate list to {collate_list_file}.', flush=True) to_collate.to_csv(collate_list_file, index=False) # with open(collate_list_file, 'w') as f: # json.dump(to_collate, f) @@ -1312,7 +1551,7 @@ def error(self, message): parser.add_argument('--no-file-count-stop', default=False, action='store_true', help='Do not stop if count of local files equals count of S3 objects.') args = parser.parse_args() - if not args.config_file and not (args.bucket_name and args.local_path and args.S3_prefix): + if not args.config_file and not (args.bucket_namse and args.local_path and args.S3_prefix): parser.error('If a config file is not provided, the bucket name, local path, and S3 prefix must be provided.') if args.config_file and (args.api or args.bucket_name or args.local_path or args.S3_prefix or args.S3_folder or args.exclude or args.nprocs or args.threads_per_worker or args.no_collate or args.dryrun or args.no_compression or args.save_config or args.no_save_collate_list or args.no_file_count_stop): print(f'WARNING: Options provide on command line override options in {args.config_file}.') @@ -1379,7 +1618,7 @@ def error(self, message): prefix = args.S3_prefix sub_dirs = args.S3_folder print(f'sub_dirs {sub_dirs}') - nprocs = args.nprocs + nprocs = args.nprocs threads_per_worker = args.threads_per_worker print(f'threads per worker: {threads_per_worker}') global_collate = not args.no_collate # internally, flag turns *on* collate, but for user no-collate turns it off - makes flag more intuitive @@ -1399,14 +1638,14 @@ def error(self, message): print(f'Collate list will be read from and re-saved to {collate_list_file}.') if (save_collate_list or collate_list_file) and not global_collate: parser.error('Collate list file provided but collation is turned off. Please enable collation to use the collate list file.') - + file_count_stop = not args.no_file_count_stop # internally, flag turns *on* file-count-stop, but for user no-file-count-stop turns it off - makes flag more intuitive - + if args.exclude: exclude = pd.Series(args.exclude) else: exclude = pd.Series([]) - + print(f'Config: {args}') if save_config: @@ -1434,14 +1673,14 @@ def error(self, message): if not local_dir or not prefix or not bucket_name: parser.print_help() sys.exit(1) - + #print hostname uname = subprocess.run(['uname', '-n'], capture_output=True) print(f'Running on {uname.stdout.decode().strip()}') # Initiate timing start = datetime.now() - + ##allow top-level folder to be provided with S3-folder == '' if sub_dirs == '': log_suffix = 'lsst-backup.csv' # DO NOT CHANGE @@ -1456,8 +1695,8 @@ def error(self, message): # check for previous suffix (remove after testing) previous_suffix = 'files.csv' previous_log = f"{prefix}-{'-'.join(sub_dirs.split('/'))}-{previous_suffix}" - destination_dir = f"{prefix}/{sub_dirs}" - + destination_dir = f"{prefix}/{sub_dirs}" + # Add titles to log file if not os.path.exists(log): if os.path.exists(previous_log): @@ -1469,7 +1708,7 @@ def error(self, message): print(f'Created backup log file {log}') with open(log, 'a') as logfile: # don't open as 'w' in case this is a continuation logfile.write('LOCAL_FOLDER,LOCAL_PATH,FILE_SIZE,BUCKET_NAME,DESTINATION_KEY,CHECKSUM,ZIP_CONTENTS\n') - + # Setup bucket try: if bm.check_keys(api): @@ -1492,14 +1731,14 @@ def error(self, message): sys.exit() print(f'Using {api.capitalize()} API with host {s3_host}') - + if api == 's3': s3 = bm.get_resource() bucket_list = bm.bucket_list(s3) elif api == 'swift': s3 = bm.get_conn_swift() bucket_list = bm.bucket_list_swift(s3) - + if bucket_name not in bucket_list: if not dryrun: if api == 's3': @@ -1514,7 +1753,7 @@ def error(self, message): else: print(f'Bucket exists: {bucket_name}') print('dryrun == True, so continuing.') - + if api == 's3': bucket = s3.Bucket(bucket_name) elif api == 'swift': @@ -1548,11 +1787,11 @@ def error(self, message): current_objects = bm.object_list_swift(s3, bucket_name, prefix=destination_dir, count=True) print() print(f'Done.\nFinished at {datetime.now()}, elapsed time = {datetime.now() - start}', flush=True) - + current_objects = pd.DataFrame.from_dict({'CURRENT_OBJECTS':current_objects}) - + print(f'Current objects (with matching prefix): {len(current_objects)}', flush=True) - + if not current_objects.empty: print(f"Current objects (with matching prefix; excluding collated zips): {len(current_objects[current_objects['CURRENT_OBJECTS'].str.contains('collated_') == False])}", flush=True) print(f'Obtaining current object metadata, elapsed time = {datetime.now() - start}', flush=True) @@ -1568,7 +1807,7 @@ def error(self, message): print(f'Done, elapsed time = {datetime.now() - start}', flush=True) # current_objects.to_csv('current_objects.csv', index=False) # exit() - + ## check if log exists in the bucket, and download it and append top it if it does # TODO: integrate this with local check for log file print(f'Checking for existing log files in bucket {bucket_name}, elapsed time = {datetime.now() - start}', flush=True) @@ -1594,7 +1833,7 @@ def error(self, message): # check local_dir formatting while local_dir[-1] == '/': local_dir = local_dir[:-1] - + with warnings.catch_warnings(): warnings.filterwarnings('ignore') print(f'Processing files in {local_dir}, elapsed time = {datetime.now() - start}', flush=True) @@ -1609,7 +1848,7 @@ def error(self, message): # print(f'Restartings Dask client due to error: {e}') # print(f'Current objects will be repopulated.') # continue - + print(f'Finished uploads at {datetime.now()}, elapsed time = {datetime.now() - start}') print(f'Dask Client closed at {datetime.now()}, elapsed time = {datetime.now() - start}') print('Completing logging.') @@ -1635,16 +1874,16 @@ def error(self, message): api, local_dir, '/', #path - log, - os.path.basename(log), + log, + os.path.basename(log), False, # dryrun mem_per_worker, ) - + final_size = logdf["FILE_SIZE"].sum() / 1024**2 try: final_transfer_speed = final_size / final_time_seconds - + except ZeroDivisionError: final_transfer_speed = 0 try: diff --git a/scripts/bucket_contents.py b/scripts/bucket_contents.py index aca18c5..7e34ea8 100644 --- a/scripts/bucket_contents.py +++ b/scripts/bucket_contents.py @@ -2,29 +2,14 @@ # coding: utf-8 # D. McKay Feb 2024 +import argparse import sys if len(sys.argv) != 2: sys.exit('Provide a bucket name as an argument.') import os import bucket_manager.bucket_manager as bm -try: - assert bm.check_keys() -except AssertionError as e: - print(e) - sys.exit() - -import warnings -warnings.filterwarnings('ignore') - -s3 = bm.get_resource() - -bucket_name = sys.argv[1] -bucket = s3.Bucket(bucket_name) -total_size = 0 -try: - print('Contents of bucket: ',bucket_name) - print('Object, Size / MiB, LastModified, Zip Contents') +def list_all(bucket): for ob in bucket.objects.all(): if ob.key.endswith('zip'): try: @@ -34,8 +19,51 @@ else: print(f'{ob.key}, {ob.size/1024**2:.2f}, {ob.last_modified}, n/a') total_size += ob.size -except Exception as e: - if '(NoSuchBucket)' in str(e).split(): - print('NoSuchBucket') -print(f'Total size of bucket: {total_size/1024**3:.2f} GiB') \ No newline at end of file +def list_prefix(bucket,prefix): + for ob in bucket.objects.filter(Prefix=prefix): + if ob.key.endswith('zip'): + try: + print(f'{ob.key}, {ob.size/1024**2:.2f}, {ob.last_modified}, "{s3.ObjectSummary(bucket_name, ob.key).get()["Metadata"]["zip-contents"]}"') + except Exception as e: + print(f'{ob.key}, {ob.size/1024**2:.2f}, {ob.last_modified}, n/a') + else: + print(f'{ob.key}, {ob.size/1024**2:.2f}, {ob.last_modified}, n/a') + total_size += ob.size + +if __name__ == '__main__': + + parser = argparse.ArgumentParser(description='Measure transfer speed to an S3 bucket.') + parser.add_argument('--bucket-name', '-b', type=str, help='Name of the S3 bucket', required=True) + parser.add_argument('--prefix', '-p', type=str, default='', help='Optional prefix to filter objects in the bucket') + + args = parser.parse_args() + bucket_name = args.bucket_name + prefix = args.prefix + + try: + assert bm.check_keys() + except AssertionError as e: + print(e) + sys.exit() + + import warnings + warnings.filterwarnings('ignore') + + s3 = bm.get_resource() + + bucket_name = sys.argv[1] + bucket = s3.Bucket(bucket_name) + total_size = 0 + try: + print('Contents of bucket: ',bucket_name) + print('Object, Size / MiB, LastModified, Zip Contents') + if prefix: + list_prefix(bucket,prefix) + else: + list_all(bucket) + except Exception as e: + if '(NoSuchBucket)' in str(e).split(): + print('NoSuchBucket') + + print(f'Total size of bucket: {total_size/1024**3:.2f} GiB') \ No newline at end of file