From fa4bd8ee58ccdab1789bc07bce25830c77b7d25b Mon Sep 17 00:00:00 2001 From: Dave McKay Date: Thu, 19 Dec 2024 10:55:29 +0000 Subject: [PATCH 1/2] clean (#87) --- csd3-side/scripts/lsst-backup.py | 217 ++----------------------------- 1 file changed, 8 insertions(+), 209 deletions(-) diff --git a/csd3-side/scripts/lsst-backup.py b/csd3-side/scripts/lsst-backup.py index 3c1bd64..2e5e0b7 100644 --- a/csd3-side/scripts/lsst-backup.py +++ b/csd3-side/scripts/lsst-backup.py @@ -15,7 +15,7 @@ Returns: None """ -# import json + import sys import os from itertools import repeat @@ -48,12 +48,6 @@ from typing import List -# import ctypes - -# def trim_memory() -> int: -# libc = ctypes.CDLL("libc.so.6") -# return libc.malloc_trim(0) - def compare_zip_contents(collate_objects: list[str] | pd.DataFrame, current_objects: pd.DataFrame, destination_dir: str, skipping: int) -> list[str] | pd.DataFrame: """ Compare the contents of zip files to determine which files need to be uploaded. @@ -255,56 +249,14 @@ def zip_and_upload(id, file_paths, s3, bucket_name, api, destination_dir, local_ Returns: bool: True if a zip was created and uploaded, False if not.. """ - # print('in zip_and_upload', flush=True) - # try: - # assert type(ds) is pd.Series - # except AssertionError: - # raise AssertionError('ds must be a pandas Series object.') - # id = ds['id'] - # file_paths = ds['file_paths'] - #debugging - # print(f"DEBUGGING - id: {id}", flush=True) - # print(f"DEBUGGING - file_paths: {file_paths}", flush=True) - print(f'Zipping and uploading {len(file_paths)} files from {local_dir} to {destination_dir}/collated_{id}.zip.', flush=True) ############# # zip part # ############# client = get_client() - # with annotate(parent_folder=parent_folder): - # zip_future = client.submit(zip_folders, - # local_dir, - # file_paths, - # use_compression, - # dryrun, - # id, - # mem_per_worker - # ) - # def get_zip_future(future): - # return future.result() - # wait(zip_future) - - # # print('DEBUGGING - Got zip', flush=True) - # tries = 0 - # zip_data = None - # namelist = None - # while tries < 5: - # try: - # zip_data, namelist = get_zip_future(zip_future) - # tries += 1 - # except Exception as e: - # sleep(5) - # print(f'Zip future timed out {tries}/5') - # if zip_data is None and namelist is None: - # raise Exception('Zip future timed out 5 times.') zip_data, namelist = zip_folders(local_dir, file_paths, use_compression, dryrun, id, mem_per_worker) print('Created zipFile in memory', flush=True) - - # print(f'DEBUGGING - Zip data size: {len(zip_data)} bytes.', flush=True) - # if len(zip_data) > mem_per_worker/2: - # print('Scattering zip data.') - # scattered_zip_data = client.scatter(zip_data) ############### # upload part # ############### @@ -313,9 +265,8 @@ def zip_and_upload(id, file_paths, s3, bucket_name, api, destination_dir, local_ print(f'zip_object_key: {zip_object_key}', flush=True) if namelist == []: print(f'No files to upload in zip file.') - return False #, zip_object_key, namelist #+' nothing to upload' + return False else: # for no subtasks - # return zip_data, zip_object_key, namelist, len(zip_data) upload_and_callback( s3, bucket_name, @@ -335,47 +286,6 @@ def zip_and_upload(id, file_paths, s3, bucket_name, api, destination_dir, local_ mem_per_worker ) return True - # else: - # print(f'Uploading zip file containing {len(file_paths)} files to S3 bucket {bucket_name} to key {zip_object_key}.', flush=True) - # # with annotate(parent_folder=parent_folder): - # f = client.submit(upload_and_callback, - # s3, - # bucket_name, - # api, - # local_dir, - # destination_dir, - # zip_data, - # namelist, - # zip_object_key, - # dryrun, - # datetime.now(), - # 1, - # len(zip_data), - # total_size_uploaded, - # total_files_uploaded, - # True, - # mem_per_worker - # ) - # return f, zip_object_key - - # Try calling zip_and_upload directly - # s3, - # bucket_name, - # api, - # local_dir, - # destination_dir, - # result[0], # zip_data - # result[2], # namelist - # result[1], # zip_object_key - # dryrun, - # processing_start, - # 1, # 1 zip file - # result[3], # len(zip_data) (size in bytes) - # total_size_uploaded, - # total_files_uploaded, - # True, - # mem_per_worker - def zip_folders(local_dir:str, file_paths:list[str], use_compression:bool, dryrun:bool, id:int, mem_per_worker:int) -> tuple[str, int, bytes]: """ @@ -395,9 +305,7 @@ def zip_folders(local_dir:str, file_paths:list[str], use_compression:bool, dryru None """ - # print(file_paths, flush=True) zipped_size = 0 - # client = get_client() if not dryrun: try: zip_buffer = io.BytesIO() @@ -407,13 +315,11 @@ def zip_folders(local_dir:str, file_paths:list[str], use_compression:bool, dryru compression = zipfile.ZIP_STORED # zipfile.ZIP_STORED = no compression with zipfile.ZipFile(zip_buffer, "a", compression, True) as zip_file: for file in file_paths: - # print(file, flush=True) if file.startswith('/'): file_path = file else: exit(f'Path is wrong: {file}') arc_name = os.path.relpath(file_path, local_dir) - # print(f'arc_name {arc_name}', flush=True) try: zipped_size += os.path.getsize(file_path) with open(file_path, 'rb') as src_file: @@ -498,10 +404,6 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k print(f'WARNING: File size of {file_size} bytes exceeds memory per worker of {mem_per_worker} bytes.', flush=True) print('Running upload_object.py.', flush=True) print('This upload WILL NOT be checksummed or tracked!', flush=True) - # if perform_checksum: - # checksum_string = hashlib.md5(file_data).hexdigest() - # Ensure the file is closed before running the upload script - # file_data.close() del file_data # Ensure consistent path to upload_object.py upload_object_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../../scripts/upload_object.py') @@ -514,23 +416,6 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k ) print(f'Running upload_object.py for {filename}.', flush=True) return f'"{folder}","{filename}",{file_size},"{bucket_name}","{object_key}","n/a","n/a"' - # print(f'External upload {success.stdout.read()}', flush=True) - # if success.returncode == 0: - # print(f'File {filename} uploaded successfully.') - # if perform_checksum: - # return f'"{folder}","{filename}",{file_size},"{bucket_name}","{object_key}","{checksum_string}","n/a"' - # else: - # return f'"{folder}","{filename}",{file_size},"{bucket_name}","{object_key}","n/a","n/a"' - # else: - # print(f'Error uploading {filename} to {bucket_name}/{object_key}: {success.stderr}') - # if file_size > 0.5*mem_per_worker: - # print(f'WARNING: File size of {file_size} bytes exceeds half the memory per worker of {mem_per_worker} bytes.') - # try: - # file_data = get_client().scatter(file_data) - # except TypeError as e: - # print(f'Error scattering {filename}: {e}') - # exit(1) - # use_future = True print(f'Uploading {filename} from {folder} to {bucket_name}/{object_key}, {file_size} bytes, checksum = True, dryrun = {dryrun}', flush=True) """ @@ -556,11 +441,9 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k """ if use_future: file_data = get_client().gather(file_data) - # file_data.seek(0) # Ensure we're at the start of the file checksum_hash = hashlib.md5(file_data) checksum_string = checksum_hash.hexdigest() checksum_base64 = base64.b64encode(checksum_hash.digest()).decode() - # file_data.seek(0) # Reset the file pointer to the start if use_future: try: file_data = get_client().scatter(file_data) @@ -584,9 +467,6 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k start = i * chunk_size end = min(start + chunk_size, file_size) part_number = i + 1 - # with open(filename, 'rb') as f: - # f.seek(start) - # chunk_data = get_client.gather(file_data)[start:end] part_futures.append(get_client().submit( part_uploader, bucket_name, @@ -620,8 +500,6 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k else: checksum_string = "DRYRUN" - # del file_data # Delete the file data to free up memory - """ report actions CSV formatted @@ -674,11 +552,9 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k """ - Create checksum object """ - # file_data.seek(0) # Ensure we're at the start of the file checksum_hash = hashlib.md5(file_data) checksum_string = checksum_hash.hexdigest() checksum_base64 = base64.b64encode(checksum_hash.digest()).decode() - # file_data.seek(0) # Reset the file pointer to the start try: if file_size > mem_per_worker or file_size > 5 * 1024**3: # Check if file size is larger than 5GiB @@ -709,17 +585,11 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k 'segment_container': bucket_name+'-segments' } ) - # for result in results: - # if result['action'] == 'upload_object': - # check if all parts uploaded successfully else: """ - Upload the file to the bucket """ print(f'Uploading {filename} to {bucket_name}/{object_key}') - # if use_future: - # bucket.put_object(Body=get_client().gather(file_data), Key=object_key, ContentMD5=checksum_base64) - # else: s3.put_object(container=bucket_name, contents=file_data, content_type='multipart/mixed', obj=object_key, etag=checksum_string) except Exception as e: print(f'Error uploading {filename} to {bucket_name}/{object_key}: {e}') @@ -768,7 +638,6 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte """ if api == 's3': s3 = bm.get_resource() - # s3_client = bm.get_client() bucket = s3.Bucket(bucket_name) filename = object_key.split('/')[-1] @@ -805,8 +674,6 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte else: checksum_string = "DRYRUN" - # del file_data # Delete the file data to free up memory - """ report actions CSV formatted @@ -856,8 +723,6 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte else: checksum_string = "DRYRUN" - # del file_data # Delete the file data to free up memory - """ report actions CSV formatted @@ -907,7 +772,6 @@ def print_stats(file_name_or_data, file_count, total_size, file_start, file_end, # print('#######################################') except ZeroDivisionError: pass - # del file_name_or_data def upload_and_callback(s3, bucket_name, api, local_dir, folder, file_name_or_data, zip_contents, object_key, dryrun, processing_start, file_count, folder_files_size, total_size_uploaded, total_files_uploaded, collated, mem_per_worker) -> None: """ @@ -989,21 +853,18 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des assert type(s3) is swiftclient.Connection except AssertionError: raise AssertionError('s3 must be a swiftclient.Connection object if using Swift API.') - # current_objects_dd = dd.from_pandas(pd.DataFrame(current_objects), npartitions=len(client.scheduler_info()['workers'])*10) processing_start = datetime.now() total_size_uploaded = 0 total_files_uploaded = 0 i = 0 #recursive loop over local folder - # to_collate = {'id':[],'object_names':[],'file_paths':[],'size':[]} # to be used for storing file lists to be collated to_collate_list = [] # to be used for storing file lists to be collated as list of dicts total_all_folders = 0 total_all_files = 0 folder_num = 0 file_num = 0 uploads = [] - # zip_uploads = [] upload_futures = [] zul_futures = [] failed = [] @@ -1012,12 +873,6 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des zip_batch_object_names = [[]] zip_batch_sizes = [0] - # if save_collate_file: - # scanned_list = [] - # scanned_dicts = [] - # scanned_list_file = collate_list_file + '.scanned' - # scanned_dicts_file = collate_list_file + '.scanned_dicts' - print(f'Analysing local dataset {local_dir}.', flush=True) for folder, sub_folders, files in os.walk(local_dir, topdown=True): total_all_folders += 1 @@ -1038,9 +893,6 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des for folder, sub_folders, files in os.walk(local_dir, topdown=False): folder_num += 1 file_num += len(files) - # if save_collate_file: - # if folder in scanned_list: - # continue print(f'Processing {folder_num}/{total_all_folders} folders; {file_num}/{total_all_files} files in {local_dir}.', flush=True) # check if folder is in the exclude list @@ -1118,16 +970,9 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des object_names = [os.sep.join([destination_dir, os.path.relpath(filename, local_dir)]) for filename in folder_files] init_len = len(object_names) - # scanned_list.append(folder) - # scanned_dicts.append({'folder':folder, 'object_names':object_names}) - # remove current objects - avoids reuploading - # could provide overwrite flag if this is desirable - # print(f'current_objects: {current_objects}') if not current_objects.empty: if set(object_names).issubset(current_objects['CURRENT_OBJECTS']): #all files in this subfolder already in bucket - # print(current_objects['CURRENT_OBJECTS']) - # print(object_names) print(f'Skipping subfolder - all files exist.', flush=True) continue @@ -1191,7 +1036,6 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des repeat(False), repeat(mem_per_worker), )): - # with annotate(folder=folder): upload_futures.append(client.submit(upload_and_callback, *args)) uploads.append({'folder':args[4],'folder_size':args[12],'file_size':os.lstat(folder_files[i]).st_size,'file':args[5],'object':args[7],'uploaded':False}) except BrokenPipeError as e: @@ -1296,29 +1140,15 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des 'upload': False } ) - # to_collate['id'].append(i) - # to_collate['object_names'].append(zip_batch_object_names[i]) - # to_collate['file_paths'].append(file_paths) - # to_collate['size'].append(zip_batch_sizes[i]) - # print(f'zip_batch_files: {zip_batch_files}, {len(zip_batch_files)}') - # print(f'zip_batch_object_names: {zip_batch_object_names}, {len(zip_batch_object_names)}') - # print(f'zip_batch_sizes: {zip_batch_sizes}, {len(zip_batch_sizes)}') - # print(f'id: {[i for i in range(len(zip_batch_files))]}') to_collate = pd.DataFrame.from_dict(to_collate_list) client.scatter(to_collate) del zip_batch_files, zip_batch_object_names, zip_batch_sizes else: - # with open(collate_list_file, 'r') as f: - # to_collate = dd.from_pandas(pd.read_csv(collate_list_file), npartitions=len(client.scheduler_info()['workers'])*10) - # to_collate.object_names = to_collate.object_names.apply(literal_eval, meta=('object_names', 'object')) - # to_collate.file_paths = to_collate.file_paths.apply(literal_eval, meta=('file_paths', 'object')) - # to_collate = to_collate.compute() to_collate = pd.read_csv(collate_list_file) to_collate.object_names = to_collate.object_names.apply(literal_eval) to_collate.file_paths = to_collate.file_paths.apply(literal_eval) - # client.scatter(to_collate) print(f'Loaded collate list from {collate_list_file}, len={len(to_collate)}.', flush=True) if not current_objects.empty: # now using pandas for both current_objects and to_collate - this could be re-written to using vectorised operations @@ -1326,34 +1156,21 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des if save_collate_file: print(f'Saving collate list to {collate_list_file}, len={len(to_collate)}.', flush=True) - # with open(collate_list_file, 'w') as f: to_collate.to_csv(collate_list_file, index=False) else: print(f'Collate list not saved.', flush=True) - # client.scatter(to_collate) if len(to_collate) > 0: # call zip_folder in parallel print(f'Zipping {len(to_collate)} batches.', flush=True) - # print(to_collate) - # print(type(to_collate.iloc[0]['file_paths'])) - # exit() print(to_collate[to_collate.upload == False][['file_paths','id', 'upload']]) print(f'len(to_collate[to_collate.upload == False]): {len(to_collate[to_collate.upload == False])}, skipping: {skipping}') to_collate_uploads = to_collate[to_collate.upload == True][['file_paths','id', 'upload']] - # print(f'to_collate_uploads: {to_collate_uploads}') - - # to_collate_uploads = dd.from_pandas(to_collate[to_collate.upload == True][['file_paths','id']], npartitions=len(client.scheduler_info()['workers'])*10) print(f'to_collate: {to_collate}, {len(to_collate)}') print(f'to_collate_uploads: {to_collate_uploads}, {len(to_collate_uploads)}') assert to_collate_uploads['upload'].all() - #current_objects['CURRENT_OBJECTS'].apply(find_metadata_swift, conn=s3, container_name=bucket_name) - # print(to_collate_uploads.head(), flush=True) - # zip_and_upload(to_collate[to_collate.upload == True][['file_paths','id']].iloc[0], s3=s3, bucket_name=bucket_name, api=api, destination_dir=destination_dir, local_dir=local_dir, total_size_uploaded=total_size_uploaded, total_files_uploaded=total_files_uploaded, use_compression=use_compression, dryrun=dryrun, mem_per_worker=mem_per_worker) - # exit() for id in to_collate_uploads['id']: if len(upload_futures) >= len(client.scheduler_info()['workers'])*2: - # print('Waiting for zip slots to free up.', flush=True) while len(upload_futures) >= len(client.scheduler_info()['workers'])*2: print(len(upload_futures), flush=True) for ulf in upload_futures: @@ -1382,6 +1199,10 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des mem_per_worker, )) + ######################## + # Monitor upload tasks # + ######################## + for ulf in as_completed(upload_futures): if 'exception' in ulf.status or 'error' in ulf.status: f_tuple = ulf.exception(), ulf.traceback() @@ -1655,18 +1476,15 @@ def error(self, message): n_workers = nprocs//threads_per_worker mem_per_worker = mem().total//n_workers # e.g., 187 GiB / 48 * 2 = 7.8 GiB print(f'nprocs: {nprocs}, Threads per worker: {threads_per_worker}, Number of workers: {n_workers}, Total memory: {total_memory/1024**3:.2f} GiB, Memory per worker: {mem_per_worker/1024**3:.2f} GiB') - # client = Client(n_workers=n_workers,threads_per_worker=threads_per_worker,memory_limit=mem_per_worker) #,silence_logs=ERROR - # Process the files - # try: + # Process the files with Client(n_workers=n_workers,threads_per_worker=threads_per_worker,memory_limit=mem_per_worker) as client: print(f'Dask Client: {client}', flush=True) print(f'Dashboard: {client.dashboard_link}', flush=True) print(f'Starting processing at {datetime.now()}, elapsed time = {datetime.now() - start}') print(f'Using {nprocs} processes.') - - # while not success: print(f'Getting current object list for {bucket_name}. This may take some time.\nStarting at {datetime.now()}, elapsed time = {datetime.now() - start}', flush=True) + if api == 's3': current_objects = bm.object_list(bucket, prefix=destination_dir, count=True) elif api == 'swift': @@ -1691,11 +1509,8 @@ def error(self, message): else: current_objects['METADATA'] = None print(f'Done, elapsed time = {datetime.now() - start}', flush=True) - # current_objects.to_csv('current_objects.csv', index=False) - # exit() ## check if log exists in the bucket, and download it and append top it if it does - # TODO: integrate this with local check for log file print(f'Checking for existing log files in bucket {bucket_name}, elapsed time = {datetime.now() - start}', flush=True) if current_objects['CURRENT_OBJECTS'].isin([log]).any(): print(f'Log file {log} already exists in bucket. Downloading.') @@ -1775,22 +1590,6 @@ def error(self, message): zips_to_upload = any(upload_checks) retries += 1 - # def get_all_tasks(dask_scheduler=None): - # return dask_scheduler.tasks - - # stream = client.run_on_scheduler(get_all_tasks) - # print(stream) - # if len(stream) > 0: - # print('Waiting for tasks to complete.') - # client.wait(stream) - # success = True - # except Exception as e: - # print(e) - # sys.exit(1) - # print(f'Restartings Dask client due to error: {e}') - # print(f'Current objects will be repopulated.') - # continue - print(f'Finished uploads at {datetime.now()}, elapsed time = {datetime.now() - start}') print(f'Dask Client closed at {datetime.now()}, elapsed time = {datetime.now() - start}') print('Completing logging.') From 3a602e8741009e59a5e9749a39f4196c188f399e Mon Sep 17 00:00:00 2001 From: Dave McKay Date: Fri, 20 Dec 2024 11:47:16 +0000 Subject: [PATCH 2/2] 86 bug premature closure of dask client (#90) * clean (#87) (#88) * test put_object response with infinite loop * response dicts passed * exit after printing dicts * typos * print response status * clarify output * existence of a response may be enough * use status 201 * add script to loop backup * make loop_backup.sh executable * . --- csd3-side/scripts/loop_backup.sh | 10 ++++++++++ csd3-side/scripts/lsst-backup.py | 14 +++++++++----- echo-side/dags/process_new_zips.py | 7 +++---- 3 files changed, 22 insertions(+), 9 deletions(-) create mode 100755 csd3-side/scripts/loop_backup.sh diff --git a/csd3-side/scripts/loop_backup.sh b/csd3-side/scripts/loop_backup.sh new file mode 100755 index 0000000..aeac0e2 --- /dev/null +++ b/csd3-side/scripts/loop_backup.sh @@ -0,0 +1,10 @@ +#!/bin/bash +# Loop backup script +# This script will run the backup script until all the files are backed up +# It's kind of brute force. +config_file=$1 +collate_list_file=$2 +while [ $(grep -c True $collate_list_file) -gt 0 ] +do + python ../../scripts/lsst-backup.py --config-file $config_file +done \ No newline at end of file diff --git a/csd3-side/scripts/lsst-backup.py b/csd3-side/scripts/lsst-backup.py index 2e5e0b7..4660653 100644 --- a/csd3-side/scripts/lsst-backup.py +++ b/csd3-side/scripts/lsst-backup.py @@ -91,7 +91,7 @@ def compare_zip_contents(collate_objects: list[str] | pd.DataFrame, current_obje else: zips_to_upload.append(i) else: - print(f'Zip file {destination_dir}/collated_{i}.zip does not exist uploading.', flush=True) + print(f'Zip file {destination_dir}/collated_{i}.zip does not exist - uploading.', flush=True) if df: if not collate_objects.iloc[i]['upload']: collate_objects.iloc[i]['upload'] = True @@ -99,7 +99,7 @@ def compare_zip_contents(collate_objects: list[str] | pd.DataFrame, current_obje else: zips_to_upload.append(i) else: - print(f'Zip file {destination_dir}/collated_{i}.zip does not exist uploading.', flush=True) + print(f'Zip file {destination_dir}/collated_{i}.zip does not exist - uploading.', flush=True) if df: collate_objects.iloc[i]['upload'] = True else: @@ -714,9 +714,10 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte metadata_object_key = object_key + '.metadata' print(f'Writing zip contents to {metadata_object_key}.', flush=True) - s3.put_object(container=bucket_name, contents=metadata_value, content_type='text/plain', obj=metadata_object_key, headers={'x-object-meta-corresponding-zip': object_key}) + responses = [{},{}] + s3.put_object(container=bucket_name, contents=metadata_value, content_type='text/plain', obj=metadata_object_key, headers={'x-object-meta-corresponding-zip': object_key}, response_dict=responses[0]) #bucket.put_object(Body=file_data, Key=object_key, ContentMD5=checksum_base64, Metadata=metadata) - s3.put_object(container=bucket_name, contents=file_data, content_type='multipart/mixed', obj=object_key, etag=checksum_string, headers={'x-object-meta-zip-contents-object':metadata_object_key}) # NEED TO ADD METADATA HERE + s3.put_object(container=bucket_name, contents=file_data, content_type='multipart/mixed', obj=object_key, etag=checksum_string, headers={'x-object-meta-zip-contents-object':metadata_object_key}, response_dict=responses[1]) except Exception as e: print(f'Error uploading "{filename}" ({file_data_size}) to {bucket_name}/{object_key}: {e}') exit(1) @@ -729,6 +730,10 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte header: LOCAL_FOLDER,LOCAL_PATH,FILE_SIZE,BUCKET_NAME,DESTINATION_KEY,CHECKSUM,ZIP_CONTENTS """ return_string = f'"{folder}","{filename}",{file_data_size},"{bucket_name}","{object_key}","{checksum_string}","{",".join(zip_contents)}"' + while True: + if responses[0] and responses[1]: + if responses[0]['status'] == 201 and responses[1]['status'] == 201: + break return return_string @@ -1172,7 +1177,6 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des for id in to_collate_uploads['id']: if len(upload_futures) >= len(client.scheduler_info()['workers'])*2: while len(upload_futures) >= len(client.scheduler_info()['workers'])*2: - print(len(upload_futures), flush=True) for ulf in upload_futures: if 'exception' in ulf.status or 'error' in ulf.status: f_tuple = ulf.exception(), ulf.traceback() diff --git a/echo-side/dags/process_new_zips.py b/echo-side/dags/process_new_zips.py index a845e7b..9ef583c 100644 --- a/echo-side/dags/process_new_zips.py +++ b/echo-side/dags/process_new_zips.py @@ -22,7 +22,7 @@ def dl_bucket_names(url): def print_bucket_name(bucket_name): print(bucket_name) - + # Define default arguments for the DAG default_args = { 'owner': 'airflow', @@ -37,7 +37,7 @@ def print_bucket_name(bucket_name): 'process_zips', default_args=default_args, description='Runs process_collated_zips.py', - schedule_interval=timedelta(days=1), + schedule=timedelta(days=1), start_date=datetime(2024, 1, 1), catchup=False, ) as dag: @@ -68,5 +68,4 @@ def print_bucket_name(bucket_name): # print('No bucket names found.') print_bucket_name_task - process_zips_task - \ No newline at end of file + process_zips_task \ No newline at end of file