diff --git a/csd3-side/scripts/loop_backup.sh b/csd3-side/scripts/loop_backup.sh new file mode 100755 index 0000000..aeac0e2 --- /dev/null +++ b/csd3-side/scripts/loop_backup.sh @@ -0,0 +1,10 @@ +#!/bin/bash +# Loop backup script +# This script will run the backup script until all the files are backed up +# It's kind of brute force. +config_file=$1 +collate_list_file=$2 +while [ $(grep -c True $collate_list_file) -gt 0 ] +do + python ../../scripts/lsst-backup.py --config-file $config_file +done \ No newline at end of file diff --git a/csd3-side/scripts/lsst-backup.py b/csd3-side/scripts/lsst-backup.py index 3c1bd64..4660653 100644 --- a/csd3-side/scripts/lsst-backup.py +++ b/csd3-side/scripts/lsst-backup.py @@ -15,7 +15,7 @@ Returns: None """ -# import json + import sys import os from itertools import repeat @@ -48,12 +48,6 @@ from typing import List -# import ctypes - -# def trim_memory() -> int: -# libc = ctypes.CDLL("libc.so.6") -# return libc.malloc_trim(0) - def compare_zip_contents(collate_objects: list[str] | pd.DataFrame, current_objects: pd.DataFrame, destination_dir: str, skipping: int) -> list[str] | pd.DataFrame: """ Compare the contents of zip files to determine which files need to be uploaded. @@ -97,7 +91,7 @@ def compare_zip_contents(collate_objects: list[str] | pd.DataFrame, current_obje else: zips_to_upload.append(i) else: - print(f'Zip file {destination_dir}/collated_{i}.zip does not exist uploading.', flush=True) + print(f'Zip file {destination_dir}/collated_{i}.zip does not exist - uploading.', flush=True) if df: if not collate_objects.iloc[i]['upload']: collate_objects.iloc[i]['upload'] = True @@ -105,7 +99,7 @@ def compare_zip_contents(collate_objects: list[str] | pd.DataFrame, current_obje else: zips_to_upload.append(i) else: - print(f'Zip file {destination_dir}/collated_{i}.zip does not exist uploading.', flush=True) + print(f'Zip file {destination_dir}/collated_{i}.zip does not exist - uploading.', flush=True) if df: collate_objects.iloc[i]['upload'] = True else: @@ -255,56 +249,14 @@ def zip_and_upload(id, file_paths, s3, bucket_name, api, destination_dir, local_ Returns: bool: True if a zip was created and uploaded, False if not.. """ - # print('in zip_and_upload', flush=True) - # try: - # assert type(ds) is pd.Series - # except AssertionError: - # raise AssertionError('ds must be a pandas Series object.') - # id = ds['id'] - # file_paths = ds['file_paths'] - #debugging - # print(f"DEBUGGING - id: {id}", flush=True) - # print(f"DEBUGGING - file_paths: {file_paths}", flush=True) - print(f'Zipping and uploading {len(file_paths)} files from {local_dir} to {destination_dir}/collated_{id}.zip.', flush=True) ############# # zip part # ############# client = get_client() - # with annotate(parent_folder=parent_folder): - # zip_future = client.submit(zip_folders, - # local_dir, - # file_paths, - # use_compression, - # dryrun, - # id, - # mem_per_worker - # ) - # def get_zip_future(future): - # return future.result() - # wait(zip_future) - - # # print('DEBUGGING - Got zip', flush=True) - # tries = 0 - # zip_data = None - # namelist = None - # while tries < 5: - # try: - # zip_data, namelist = get_zip_future(zip_future) - # tries += 1 - # except Exception as e: - # sleep(5) - # print(f'Zip future timed out {tries}/5') - # if zip_data is None and namelist is None: - # raise Exception('Zip future timed out 5 times.') zip_data, namelist = zip_folders(local_dir, file_paths, use_compression, dryrun, id, mem_per_worker) print('Created zipFile in memory', flush=True) - - # print(f'DEBUGGING - Zip data size: {len(zip_data)} bytes.', flush=True) - # if len(zip_data) > mem_per_worker/2: - # print('Scattering zip data.') - # scattered_zip_data = client.scatter(zip_data) ############### # upload part # ############### @@ -313,9 +265,8 @@ def zip_and_upload(id, file_paths, s3, bucket_name, api, destination_dir, local_ print(f'zip_object_key: {zip_object_key}', flush=True) if namelist == []: print(f'No files to upload in zip file.') - return False #, zip_object_key, namelist #+' nothing to upload' + return False else: # for no subtasks - # return zip_data, zip_object_key, namelist, len(zip_data) upload_and_callback( s3, bucket_name, @@ -335,47 +286,6 @@ def zip_and_upload(id, file_paths, s3, bucket_name, api, destination_dir, local_ mem_per_worker ) return True - # else: - # print(f'Uploading zip file containing {len(file_paths)} files to S3 bucket {bucket_name} to key {zip_object_key}.', flush=True) - # # with annotate(parent_folder=parent_folder): - # f = client.submit(upload_and_callback, - # s3, - # bucket_name, - # api, - # local_dir, - # destination_dir, - # zip_data, - # namelist, - # zip_object_key, - # dryrun, - # datetime.now(), - # 1, - # len(zip_data), - # total_size_uploaded, - # total_files_uploaded, - # True, - # mem_per_worker - # ) - # return f, zip_object_key - - # Try calling zip_and_upload directly - # s3, - # bucket_name, - # api, - # local_dir, - # destination_dir, - # result[0], # zip_data - # result[2], # namelist - # result[1], # zip_object_key - # dryrun, - # processing_start, - # 1, # 1 zip file - # result[3], # len(zip_data) (size in bytes) - # total_size_uploaded, - # total_files_uploaded, - # True, - # mem_per_worker - def zip_folders(local_dir:str, file_paths:list[str], use_compression:bool, dryrun:bool, id:int, mem_per_worker:int) -> tuple[str, int, bytes]: """ @@ -395,9 +305,7 @@ def zip_folders(local_dir:str, file_paths:list[str], use_compression:bool, dryru None """ - # print(file_paths, flush=True) zipped_size = 0 - # client = get_client() if not dryrun: try: zip_buffer = io.BytesIO() @@ -407,13 +315,11 @@ def zip_folders(local_dir:str, file_paths:list[str], use_compression:bool, dryru compression = zipfile.ZIP_STORED # zipfile.ZIP_STORED = no compression with zipfile.ZipFile(zip_buffer, "a", compression, True) as zip_file: for file in file_paths: - # print(file, flush=True) if file.startswith('/'): file_path = file else: exit(f'Path is wrong: {file}') arc_name = os.path.relpath(file_path, local_dir) - # print(f'arc_name {arc_name}', flush=True) try: zipped_size += os.path.getsize(file_path) with open(file_path, 'rb') as src_file: @@ -498,10 +404,6 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k print(f'WARNING: File size of {file_size} bytes exceeds memory per worker of {mem_per_worker} bytes.', flush=True) print('Running upload_object.py.', flush=True) print('This upload WILL NOT be checksummed or tracked!', flush=True) - # if perform_checksum: - # checksum_string = hashlib.md5(file_data).hexdigest() - # Ensure the file is closed before running the upload script - # file_data.close() del file_data # Ensure consistent path to upload_object.py upload_object_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../../scripts/upload_object.py') @@ -514,23 +416,6 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k ) print(f'Running upload_object.py for {filename}.', flush=True) return f'"{folder}","{filename}",{file_size},"{bucket_name}","{object_key}","n/a","n/a"' - # print(f'External upload {success.stdout.read()}', flush=True) - # if success.returncode == 0: - # print(f'File {filename} uploaded successfully.') - # if perform_checksum: - # return f'"{folder}","{filename}",{file_size},"{bucket_name}","{object_key}","{checksum_string}","n/a"' - # else: - # return f'"{folder}","{filename}",{file_size},"{bucket_name}","{object_key}","n/a","n/a"' - # else: - # print(f'Error uploading {filename} to {bucket_name}/{object_key}: {success.stderr}') - # if file_size > 0.5*mem_per_worker: - # print(f'WARNING: File size of {file_size} bytes exceeds half the memory per worker of {mem_per_worker} bytes.') - # try: - # file_data = get_client().scatter(file_data) - # except TypeError as e: - # print(f'Error scattering {filename}: {e}') - # exit(1) - # use_future = True print(f'Uploading {filename} from {folder} to {bucket_name}/{object_key}, {file_size} bytes, checksum = True, dryrun = {dryrun}', flush=True) """ @@ -556,11 +441,9 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k """ if use_future: file_data = get_client().gather(file_data) - # file_data.seek(0) # Ensure we're at the start of the file checksum_hash = hashlib.md5(file_data) checksum_string = checksum_hash.hexdigest() checksum_base64 = base64.b64encode(checksum_hash.digest()).decode() - # file_data.seek(0) # Reset the file pointer to the start if use_future: try: file_data = get_client().scatter(file_data) @@ -584,9 +467,6 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k start = i * chunk_size end = min(start + chunk_size, file_size) part_number = i + 1 - # with open(filename, 'rb') as f: - # f.seek(start) - # chunk_data = get_client.gather(file_data)[start:end] part_futures.append(get_client().submit( part_uploader, bucket_name, @@ -620,8 +500,6 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k else: checksum_string = "DRYRUN" - # del file_data # Delete the file data to free up memory - """ report actions CSV formatted @@ -674,11 +552,9 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k """ - Create checksum object """ - # file_data.seek(0) # Ensure we're at the start of the file checksum_hash = hashlib.md5(file_data) checksum_string = checksum_hash.hexdigest() checksum_base64 = base64.b64encode(checksum_hash.digest()).decode() - # file_data.seek(0) # Reset the file pointer to the start try: if file_size > mem_per_worker or file_size > 5 * 1024**3: # Check if file size is larger than 5GiB @@ -709,17 +585,11 @@ def upload_to_bucket(s3, bucket_name, api, local_dir, folder, filename, object_k 'segment_container': bucket_name+'-segments' } ) - # for result in results: - # if result['action'] == 'upload_object': - # check if all parts uploaded successfully else: """ - Upload the file to the bucket """ print(f'Uploading {filename} to {bucket_name}/{object_key}') - # if use_future: - # bucket.put_object(Body=get_client().gather(file_data), Key=object_key, ContentMD5=checksum_base64) - # else: s3.put_object(container=bucket_name, contents=file_data, content_type='multipart/mixed', obj=object_key, etag=checksum_string) except Exception as e: print(f'Error uploading {filename} to {bucket_name}/{object_key}: {e}') @@ -768,7 +638,6 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte """ if api == 's3': s3 = bm.get_resource() - # s3_client = bm.get_client() bucket = s3.Bucket(bucket_name) filename = object_key.split('/')[-1] @@ -805,8 +674,6 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte else: checksum_string = "DRYRUN" - # del file_data # Delete the file data to free up memory - """ report actions CSV formatted @@ -847,23 +714,26 @@ def upload_to_bucket_collated(s3, bucket_name, api, folder, file_data, zip_conte metadata_object_key = object_key + '.metadata' print(f'Writing zip contents to {metadata_object_key}.', flush=True) - s3.put_object(container=bucket_name, contents=metadata_value, content_type='text/plain', obj=metadata_object_key, headers={'x-object-meta-corresponding-zip': object_key}) + responses = [{},{}] + s3.put_object(container=bucket_name, contents=metadata_value, content_type='text/plain', obj=metadata_object_key, headers={'x-object-meta-corresponding-zip': object_key}, response_dict=responses[0]) #bucket.put_object(Body=file_data, Key=object_key, ContentMD5=checksum_base64, Metadata=metadata) - s3.put_object(container=bucket_name, contents=file_data, content_type='multipart/mixed', obj=object_key, etag=checksum_string, headers={'x-object-meta-zip-contents-object':metadata_object_key}) # NEED TO ADD METADATA HERE + s3.put_object(container=bucket_name, contents=file_data, content_type='multipart/mixed', obj=object_key, etag=checksum_string, headers={'x-object-meta-zip-contents-object':metadata_object_key}, response_dict=responses[1]) except Exception as e: print(f'Error uploading "{filename}" ({file_data_size}) to {bucket_name}/{object_key}: {e}') exit(1) else: checksum_string = "DRYRUN" - # del file_data # Delete the file data to free up memory - """ report actions CSV formatted header: LOCAL_FOLDER,LOCAL_PATH,FILE_SIZE,BUCKET_NAME,DESTINATION_KEY,CHECKSUM,ZIP_CONTENTS """ return_string = f'"{folder}","{filename}",{file_data_size},"{bucket_name}","{object_key}","{checksum_string}","{",".join(zip_contents)}"' + while True: + if responses[0] and responses[1]: + if responses[0]['status'] == 201 and responses[1]['status'] == 201: + break return return_string @@ -907,7 +777,6 @@ def print_stats(file_name_or_data, file_count, total_size, file_start, file_end, # print('#######################################') except ZeroDivisionError: pass - # del file_name_or_data def upload_and_callback(s3, bucket_name, api, local_dir, folder, file_name_or_data, zip_contents, object_key, dryrun, processing_start, file_count, folder_files_size, total_size_uploaded, total_files_uploaded, collated, mem_per_worker) -> None: """ @@ -989,21 +858,18 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des assert type(s3) is swiftclient.Connection except AssertionError: raise AssertionError('s3 must be a swiftclient.Connection object if using Swift API.') - # current_objects_dd = dd.from_pandas(pd.DataFrame(current_objects), npartitions=len(client.scheduler_info()['workers'])*10) processing_start = datetime.now() total_size_uploaded = 0 total_files_uploaded = 0 i = 0 #recursive loop over local folder - # to_collate = {'id':[],'object_names':[],'file_paths':[],'size':[]} # to be used for storing file lists to be collated to_collate_list = [] # to be used for storing file lists to be collated as list of dicts total_all_folders = 0 total_all_files = 0 folder_num = 0 file_num = 0 uploads = [] - # zip_uploads = [] upload_futures = [] zul_futures = [] failed = [] @@ -1012,12 +878,6 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des zip_batch_object_names = [[]] zip_batch_sizes = [0] - # if save_collate_file: - # scanned_list = [] - # scanned_dicts = [] - # scanned_list_file = collate_list_file + '.scanned' - # scanned_dicts_file = collate_list_file + '.scanned_dicts' - print(f'Analysing local dataset {local_dir}.', flush=True) for folder, sub_folders, files in os.walk(local_dir, topdown=True): total_all_folders += 1 @@ -1038,9 +898,6 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des for folder, sub_folders, files in os.walk(local_dir, topdown=False): folder_num += 1 file_num += len(files) - # if save_collate_file: - # if folder in scanned_list: - # continue print(f'Processing {folder_num}/{total_all_folders} folders; {file_num}/{total_all_files} files in {local_dir}.', flush=True) # check if folder is in the exclude list @@ -1118,16 +975,9 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des object_names = [os.sep.join([destination_dir, os.path.relpath(filename, local_dir)]) for filename in folder_files] init_len = len(object_names) - # scanned_list.append(folder) - # scanned_dicts.append({'folder':folder, 'object_names':object_names}) - # remove current objects - avoids reuploading - # could provide overwrite flag if this is desirable - # print(f'current_objects: {current_objects}') if not current_objects.empty: if set(object_names).issubset(current_objects['CURRENT_OBJECTS']): #all files in this subfolder already in bucket - # print(current_objects['CURRENT_OBJECTS']) - # print(object_names) print(f'Skipping subfolder - all files exist.', flush=True) continue @@ -1191,7 +1041,6 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des repeat(False), repeat(mem_per_worker), )): - # with annotate(folder=folder): upload_futures.append(client.submit(upload_and_callback, *args)) uploads.append({'folder':args[4],'folder_size':args[12],'file_size':os.lstat(folder_files[i]).st_size,'file':args[5],'object':args[7],'uploaded':False}) except BrokenPipeError as e: @@ -1296,29 +1145,15 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des 'upload': False } ) - # to_collate['id'].append(i) - # to_collate['object_names'].append(zip_batch_object_names[i]) - # to_collate['file_paths'].append(file_paths) - # to_collate['size'].append(zip_batch_sizes[i]) - # print(f'zip_batch_files: {zip_batch_files}, {len(zip_batch_files)}') - # print(f'zip_batch_object_names: {zip_batch_object_names}, {len(zip_batch_object_names)}') - # print(f'zip_batch_sizes: {zip_batch_sizes}, {len(zip_batch_sizes)}') - # print(f'id: {[i for i in range(len(zip_batch_files))]}') to_collate = pd.DataFrame.from_dict(to_collate_list) client.scatter(to_collate) del zip_batch_files, zip_batch_object_names, zip_batch_sizes else: - # with open(collate_list_file, 'r') as f: - # to_collate = dd.from_pandas(pd.read_csv(collate_list_file), npartitions=len(client.scheduler_info()['workers'])*10) - # to_collate.object_names = to_collate.object_names.apply(literal_eval, meta=('object_names', 'object')) - # to_collate.file_paths = to_collate.file_paths.apply(literal_eval, meta=('file_paths', 'object')) - # to_collate = to_collate.compute() to_collate = pd.read_csv(collate_list_file) to_collate.object_names = to_collate.object_names.apply(literal_eval) to_collate.file_paths = to_collate.file_paths.apply(literal_eval) - # client.scatter(to_collate) print(f'Loaded collate list from {collate_list_file}, len={len(to_collate)}.', flush=True) if not current_objects.empty: # now using pandas for both current_objects and to_collate - this could be re-written to using vectorised operations @@ -1326,36 +1161,22 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des if save_collate_file: print(f'Saving collate list to {collate_list_file}, len={len(to_collate)}.', flush=True) - # with open(collate_list_file, 'w') as f: to_collate.to_csv(collate_list_file, index=False) else: print(f'Collate list not saved.', flush=True) - # client.scatter(to_collate) if len(to_collate) > 0: # call zip_folder in parallel print(f'Zipping {len(to_collate)} batches.', flush=True) - # print(to_collate) - # print(type(to_collate.iloc[0]['file_paths'])) - # exit() print(to_collate[to_collate.upload == False][['file_paths','id', 'upload']]) print(f'len(to_collate[to_collate.upload == False]): {len(to_collate[to_collate.upload == False])}, skipping: {skipping}') to_collate_uploads = to_collate[to_collate.upload == True][['file_paths','id', 'upload']] - # print(f'to_collate_uploads: {to_collate_uploads}') - - # to_collate_uploads = dd.from_pandas(to_collate[to_collate.upload == True][['file_paths','id']], npartitions=len(client.scheduler_info()['workers'])*10) print(f'to_collate: {to_collate}, {len(to_collate)}') print(f'to_collate_uploads: {to_collate_uploads}, {len(to_collate_uploads)}') assert to_collate_uploads['upload'].all() - #current_objects['CURRENT_OBJECTS'].apply(find_metadata_swift, conn=s3, container_name=bucket_name) - # print(to_collate_uploads.head(), flush=True) - # zip_and_upload(to_collate[to_collate.upload == True][['file_paths','id']].iloc[0], s3=s3, bucket_name=bucket_name, api=api, destination_dir=destination_dir, local_dir=local_dir, total_size_uploaded=total_size_uploaded, total_files_uploaded=total_files_uploaded, use_compression=use_compression, dryrun=dryrun, mem_per_worker=mem_per_worker) - # exit() for id in to_collate_uploads['id']: if len(upload_futures) >= len(client.scheduler_info()['workers'])*2: - # print('Waiting for zip slots to free up.', flush=True) while len(upload_futures) >= len(client.scheduler_info()['workers'])*2: - print(len(upload_futures), flush=True) for ulf in upload_futures: if 'exception' in ulf.status or 'error' in ulf.status: f_tuple = ulf.exception(), ulf.traceback() @@ -1382,6 +1203,10 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des mem_per_worker, )) + ######################## + # Monitor upload tasks # + ######################## + for ulf in as_completed(upload_futures): if 'exception' in ulf.status or 'error' in ulf.status: f_tuple = ulf.exception(), ulf.traceback() @@ -1655,18 +1480,15 @@ def error(self, message): n_workers = nprocs//threads_per_worker mem_per_worker = mem().total//n_workers # e.g., 187 GiB / 48 * 2 = 7.8 GiB print(f'nprocs: {nprocs}, Threads per worker: {threads_per_worker}, Number of workers: {n_workers}, Total memory: {total_memory/1024**3:.2f} GiB, Memory per worker: {mem_per_worker/1024**3:.2f} GiB') - # client = Client(n_workers=n_workers,threads_per_worker=threads_per_worker,memory_limit=mem_per_worker) #,silence_logs=ERROR - # Process the files - # try: + # Process the files with Client(n_workers=n_workers,threads_per_worker=threads_per_worker,memory_limit=mem_per_worker) as client: print(f'Dask Client: {client}', flush=True) print(f'Dashboard: {client.dashboard_link}', flush=True) print(f'Starting processing at {datetime.now()}, elapsed time = {datetime.now() - start}') print(f'Using {nprocs} processes.') - - # while not success: print(f'Getting current object list for {bucket_name}. This may take some time.\nStarting at {datetime.now()}, elapsed time = {datetime.now() - start}', flush=True) + if api == 's3': current_objects = bm.object_list(bucket, prefix=destination_dir, count=True) elif api == 'swift': @@ -1691,11 +1513,8 @@ def error(self, message): else: current_objects['METADATA'] = None print(f'Done, elapsed time = {datetime.now() - start}', flush=True) - # current_objects.to_csv('current_objects.csv', index=False) - # exit() ## check if log exists in the bucket, and download it and append top it if it does - # TODO: integrate this with local check for log file print(f'Checking for existing log files in bucket {bucket_name}, elapsed time = {datetime.now() - start}', flush=True) if current_objects['CURRENT_OBJECTS'].isin([log]).any(): print(f'Log file {log} already exists in bucket. Downloading.') @@ -1775,22 +1594,6 @@ def error(self, message): zips_to_upload = any(upload_checks) retries += 1 - # def get_all_tasks(dask_scheduler=None): - # return dask_scheduler.tasks - - # stream = client.run_on_scheduler(get_all_tasks) - # print(stream) - # if len(stream) > 0: - # print('Waiting for tasks to complete.') - # client.wait(stream) - # success = True - # except Exception as e: - # print(e) - # sys.exit(1) - # print(f'Restartings Dask client due to error: {e}') - # print(f'Current objects will be repopulated.') - # continue - print(f'Finished uploads at {datetime.now()}, elapsed time = {datetime.now() - start}') print(f'Dask Client closed at {datetime.now()}, elapsed time = {datetime.now() - start}') print('Completing logging.') diff --git a/echo-side/containers/mamba_env/Dockerfile b/echo-side/containers/mamba_env/Dockerfile index 79683a5..c8b24f8 100644 --- a/echo-side/containers/mamba_env/Dockerfile +++ b/echo-side/containers/mamba_env/Dockerfile @@ -1,11 +1,9 @@ -FROM continuumio/miniconda3 +FROM condaforge/miniforge3 LABEL org.opencontainers.image.source https://github.com/lsst-uk/csd3-echo-somerville LABEL org.opencontainers.image.description="Code to backup and curate LSST-UK data from CSD3 to Echo." LABEL org.opencontainers.image.licenses="Apache-2.0" -RUN conda install -y -c conda-forge mamba && mamba init - COPY environment.yaml /environment.yaml RUN mamba env create --name lsst-uk --file=/environment.yaml diff --git a/echo-side/dags/process_new_zips.py b/echo-side/dags/process_new_zips.py index a845e7b..9ef583c 100644 --- a/echo-side/dags/process_new_zips.py +++ b/echo-side/dags/process_new_zips.py @@ -22,7 +22,7 @@ def dl_bucket_names(url): def print_bucket_name(bucket_name): print(bucket_name) - + # Define default arguments for the DAG default_args = { 'owner': 'airflow', @@ -37,7 +37,7 @@ def print_bucket_name(bucket_name): 'process_zips', default_args=default_args, description='Runs process_collated_zips.py', - schedule_interval=timedelta(days=1), + schedule=timedelta(days=1), start_date=datetime(2024, 1, 1), catchup=False, ) as dag: @@ -68,5 +68,4 @@ def print_bucket_name(bucket_name): # print('No bucket names found.') print_bucket_name_task - process_zips_task - \ No newline at end of file + process_zips_task \ No newline at end of file diff --git a/echo-side/environment.yaml b/echo-side/environment.yaml new file mode 100644 index 0000000..df06c2f --- /dev/null +++ b/echo-side/environment.yaml @@ -0,0 +1,406 @@ +name: lsst-uk +channels: + - conda-forge + - defaults +dependencies: + - _libgcc_mutex=0.1=conda_forge + - _openmp_mutex=4.5=2_gnu + - aiofiles=23.2.1=pyhd8ed1ab_0 + - aiohttp=3.9.3=py310h2372a71_0 + - aiosignal=1.3.1=pyhd8ed1ab_0 + - airflow=2.8.2=py310hff52083_0 + - alembic=1.13.1=pyhd8ed1ab_1 + - amqp=5.2.0=pyhd8ed1ab_1 + - anyio=4.2.0=py310h06a4308_0 + - apache-airflow=2.8.2=py310hff52083_0 + - apache-airflow-providers-amazon=8.19.0=pyhd8ed1ab_0 + - apache-airflow-providers-cncf-kubernetes=8.0.1=pyhd8ed1ab_0 + - apache-airflow-providers-common-io=1.3.0=pyhd8ed1ab_0 + - apache-airflow-providers-common-sql=1.11.1=pyhd8ed1ab_0 + - apache-airflow-providers-docker=3.12.0=pyhd8ed1ab_0 + - apache-airflow-providers-ftp=3.7.0=pyhd8ed1ab_0 + - apache-airflow-providers-http=4.10.0=pyhd8ed1ab_0 + - apache-airflow-providers-imap=3.5.0=pyhd8ed1ab_0 + - apache-airflow-providers-sqlite=3.7.1=pyhd8ed1ab_0 + - apispec=6.4.0=pyhd8ed1ab_0 + - argcomplete=3.2.3=pyhd8ed1ab_0 + - argon2-cffi=21.3.0=pyhd3eb1b0_0 + - argon2-cffi-bindings=21.2.0=py310h7f8727e_0 + - arrow=1.3.0=pyhd8ed1ab_0 + - asgiref=3.7.2=pyhd8ed1ab_0 + - asn1crypto=1.5.1=pyhd8ed1ab_0 + - asttokens=2.0.5=pyhd3eb1b0_0 + - async-lru=2.0.4=py310h06a4308_0 + - async-timeout=4.0.3=pyhd8ed1ab_0 + - attrs=23.1.0=py310h06a4308_0 + - aws-c-auth=0.7.16=h79b3bcb_6 + - aws-c-cal=0.6.10=hb29e0c7_1 + - aws-c-common=0.9.13=hd590300_0 + - aws-c-compression=0.2.18=hecc5fa9_1 + - aws-c-event-stream=0.4.2=hf9b2f7b_4 + - aws-c-http=0.8.1=h5d7533a_5 + - aws-c-io=0.14.5=h50678d4_1 + - aws-c-mqtt=0.10.2=hf479d2b_4 + - aws-c-s3=0.5.2=h4ad9680_0 + - aws-c-sdkutils=0.1.15=hecc5fa9_1 + - aws-checksums=0.1.18=hecc5fa9_1 + - aws-crt-cpp=0.26.2=h19f5d62_7 + - aws-sdk-cpp=1.11.267=h5606698_1 + - babel=2.11.0=py310h06a4308_0 + - backoff=2.2.1=pyhd8ed1ab_0 + - backports.zoneinfo=0.2.1=py310hff52083_8 + - bcrypt=4.1.2=py310hcb5633a_0 + - beautifulsoup4=4.12.2=py310h06a4308_0 + - billiard=4.2.0=py310h2372a71_0 + - blas=1.0=mkl + - bleach=4.1.0=pyhd3eb1b0_0 + - blinker=1.7.0=pyhd8ed1ab_0 + - bokeh=3.3.4=pyhd8ed1ab_0 + - boto3=1.34.61=pyhd8ed1ab_1 + - botocore=1.34.61=pyge310_1234567_0 + - bottleneck=1.3.7=py310ha9d4c09_0 + - brotli-python=1.0.9=py310h6a678d5_7 + - bzip2=1.0.8=h5eee18b_5 + - c-ares=1.27.0=hd590300_0 + - ca-certificates=2024.6.2=hbcca054_0 + - cachelib=0.9.0=pyhd8ed1ab_0 + - cachetools=5.3.3=pyhd8ed1ab_0 + - celery=5.3.6=pyhd8ed1ab_0 + - certifi=2024.6.2=py310h06a4308_0 + - cffi=1.16.0=py310h5eee18b_0 + - charset-normalizer=2.0.4=pyhd3eb1b0_0 + - click=8.1.7=unix_pyh707e725_0 + - click-didyoumean=0.3.0=pyhd8ed1ab_0 + - click-plugins=1.1.1=py_0 + - click-repl=0.3.0=pyhd8ed1ab_0 + - clickclick=20.10.2=pyhd8ed1ab_0 + - cloudpickle=3.0.0=pyhd8ed1ab_0 + - colorama=0.4.6=pyhd8ed1ab_0 + - colorlog=4.8.0=py310hff52083_3 + - colour=0.1.5=pyhd8ed1ab_1 + - comm=0.2.1=py310h06a4308_0 + - configupdater=3.2=pyhd8ed1ab_0 + - connexion=2.14.1=pyhd8ed1ab_0 + - contourpy=1.2.0=py310hd41b1e2_0 + - cron-descriptor=1.4.3=pyhd8ed1ab_0 + - croniter=2.0.2=pyhd8ed1ab_0 + - cryptography=41.0.7=py310hb8475ec_1 + - cytoolz=0.12.3=py310h2372a71_0 + - dask=2024.3.0=pyhd8ed1ab_1 + - dask-core=2024.3.0=pyhd8ed1ab_0 + - dask-expr=1.0.1=pyhd8ed1ab_0 + - debugpy=1.6.7=py310h6a678d5_0 + - decorator=5.1.1=pyhd3eb1b0_0 + - defusedxml=0.7.1=pyhd3eb1b0_0 + - deprecated=1.2.14=pyh1a96a4e_0 + - dill=0.3.8=pyhd8ed1ab_0 + - distributed=2024.3.0=pyhd8ed1ab_0 + - dnspython=2.6.1=pyhd8ed1ab_0 + - docker-py=7.1.0=pyhd8ed1ab_0 + - docutils=0.20.1=py310hff52083_3 + - email-validator=1.3.1=pyhd8ed1ab_0 + - exceptiongroup=1.2.0=py310h06a4308_0 + - executing=0.8.3=pyhd3eb1b0_0 + - flask=2.2.5=pyhd8ed1ab_0 + - flask-appbuilder=4.3.11=pyhd8ed1ab_0 + - flask-babel=2.0.0=pyhd8ed1ab_1 + - flask-caching=2.1.0=pyhd8ed1ab_0 + - flask-jwt-extended=4.6.0=pyhd8ed1ab_0 + - flask-limiter=3.5.1=pyhd8ed1ab_0 + - flask-login=0.6.3=pyhd8ed1ab_1 + - flask-session=0.5.0=pyhd8ed1ab_0 + - flask-sqlalchemy=2.5.1=pyhd8ed1ab_0 + - flask-wtf=1.2.1=pyhd8ed1ab_0 + - freetype=2.12.1=h267a509_2 + - frozenlist=1.4.1=py310h2372a71_0 + - fsspec=2024.2.0=pyhca7485f_0 + - furl=2.1.3=pyhd8ed1ab_0 + - gflags=2.2.2=he1b5a44_1004 + - glog=0.7.0=hed5481d_0 + - google-auth=2.28.2=pyhca7485f_0 + - google-re2=1.1=py310hd1af1a7_0 + - googleapis-common-protos=1.63.0=pyhd8ed1ab_0 + - greenlet=3.0.3=py310hc6cd4ac_0 + - grpcio=1.62.1=py310h1b8f574_0 + - gunicorn=21.2.0=py310hff52083_1 + - h11=0.14.0=pyhd8ed1ab_0 + - h2=4.1.0=pyhd8ed1ab_0 + - hpack=4.0.0=pyh9f0ad1d_0 + - httpcore=1.0.4=pyhd8ed1ab_0 + - httpx=0.27.0=pyhd8ed1ab_0 + - hyperframe=6.0.1=pyhd8ed1ab_0 + - icu=73.2=h59595ed_0 + - idna=3.4=py310h06a4308_0 + - importlib-metadata=7.0.2=pyha770c72_0 + - importlib-resources=6.1.3=pyhd8ed1ab_0 + - importlib_metadata=7.0.2=hd8ed1ab_0 + - importlib_resources=6.1.3=pyhd8ed1ab_0 + - infinity=1.5=pyhd8ed1ab_0 + - inflection=0.5.1=pyh9f0ad1d_0 + - intel-openmp=2023.1.0=hdb19cb5_46306 + - intervals=0.9.2=pyhd8ed1ab_0 + - ipykernel=6.28.0=py310h06a4308_0 + - ipython=8.20.0=py310h06a4308_0 + - itsdangerous=2.1.2=pyhd8ed1ab_0 + - jedi=0.18.1=py310h06a4308_1 + - jinja2=3.1.3=py310h06a4308_0 + - jmespath=1.0.1=py310h06a4308_0 + - json5=0.9.6=pyhd3eb1b0_0 + - jsonpath-ng=1.6.1=pyhd8ed1ab_0 + - jsonschema=4.19.2=py310h06a4308_0 + - jsonschema-specifications=2023.7.1=py310h06a4308_0 + - jupyter-lsp=2.2.0=py310h06a4308_0 + - jupyter_client=8.6.0=py310h06a4308_0 + - jupyter_core=5.5.0=py310h06a4308_0 + - jupyter_events=0.8.0=py310h06a4308_0 + - jupyter_server=2.10.0=py310h06a4308_0 + - jupyter_server_terminals=0.4.4=py310h06a4308_1 + - jupyterlab=4.0.11=py310h06a4308_0 + - jupyterlab_pygments=0.1.2=py_0 + - jupyterlab_server=2.25.1=py310h06a4308_0 + - keyutils=1.6.1=h166bdaf_0 + - kombu=5.3.5=py310hff52083_0 + - krb5=1.21.2=h659d440_0 + - kubernetes_asyncio=29.0.0=pyhd8ed1ab_0 + - lazy-object-proxy=1.10.0=py310h2372a71_0 + - lcms2=2.15=h7f713cb_2 + - ld_impl_linux-64=2.38=h1181459_1 + - lerc=4.0.0=h27087fc_0 + - libabseil=20240116.1=cxx17_h59595ed_2 + - libarrow=15.0.1=he513d67_1_cpu + - libarrow-acero=15.0.1=h59595ed_1_cpu + - libarrow-dataset=15.0.1=h59595ed_1_cpu + - libarrow-flight=15.0.1=hc6145d9_1_cpu + - libarrow-flight-sql=15.0.1=h757c851_1_cpu + - libarrow-gandiva=15.0.1=hb016d2e_1_cpu + - libarrow-substrait=15.0.1=h757c851_1_cpu + - libbrotlicommon=1.1.0=hd590300_1 + - libbrotlidec=1.1.0=hd590300_1 + - libbrotlienc=1.1.0=hd590300_1 + - libcrc32c=1.1.2=h9c3ff4c_0 + - libcurl=8.5.0=hca28451_0 + - libdeflate=1.19=hd590300_0 + - libedit=3.1.20191231=he28a2e2_2 + - libev=4.33=hd590300_2 + - libevent=2.1.12=hf998b51_1 + - libffi=3.4.4=h6a678d5_0 + - libgcc-ng=13.2.0=h807b86a_5 + - libgomp=13.2.0=h807b86a_5 + - libgoogle-cloud=2.22.0=h9be4e54_1 + - libgoogle-cloud-storage=2.22.0=hc7a4891_1 + - libgrpc=1.62.1=h15f2491_0 + - libiconv=1.17=hd590300_2 + - libjpeg-turbo=2.1.5.1=hd590300_1 + - libllvm16=16.0.6=hb3ce162_3 + - libnghttp2=1.58.0=h47da74e_1 + - libnl=3.9.0=hd590300_0 + - libnuma=2.0.16=h0b41bf4_1 + - libparquet=15.0.1=h352af49_1_cpu + - libpng=1.6.43=h2797004_0 + - libpq=16.2=h33b98f1_0 + - libprotobuf=4.25.3=h08a7969_0 + - libre2-11=2023.09.01=h5a48ba9_2 + - libsodium=1.0.18=h7b6447c_0 + - libssh2=1.11.0=h0841786_0 + - libstdcxx-ng=13.2.0=h7e041cc_5 + - libthrift=0.19.0=hb90f79a_1 + - libtiff=4.6.0=h29866fb_1 + - libutf8proc=2.8.0=h166bdaf_0 + - libuuid=1.41.5=h5eee18b_0 + - libwebp-base=1.3.2=hd590300_0 + - libxcb=1.15=h0b41bf4_0 + - libxml2=2.12.5=h232c23b_0 + - libxslt=1.1.39=h76b75d6_0 + - libzlib=1.2.13=hd590300_5 + - limits=3.10.0=pyhd8ed1ab_0 + - linkify-it-py=2.0.3=pyhd8ed1ab_0 + - locket=1.0.0=pyhd8ed1ab_0 + - lockfile=0.12.2=py_1 + - lxml=5.1.0=py310hcfd0673_0 + - lz4=4.3.3=py310h350c4a5_0 + - lz4-c=1.9.4=hcb278e6_0 + - mako=1.3.2=pyhd8ed1ab_0 + - markdown-it-py=3.0.0=pyhd8ed1ab_0 + - markupsafe=2.1.3=py310h5eee18b_0 + - marshmallow=3.21.1=pyhd8ed1ab_0 + - marshmallow-oneofschema=3.1.1=pyhd8ed1ab_0 + - marshmallow-sqlalchemy=0.26.1=pyhd8ed1ab_0 + - matplotlib-inline=0.1.6=py310h06a4308_0 + - mdit-py-plugins=0.4.0=pyhd8ed1ab_0 + - mdurl=0.1.2=pyhd8ed1ab_0 + - mistune=2.0.4=py310h06a4308_0 + - mkl=2023.1.0=h213fc3f_46344 + - mkl-service=2.4.0=py310h5eee18b_1 + - mkl_fft=1.3.8=py310h5eee18b_0 + - mkl_random=1.2.4=py310hdb19cb5_0 + - more-itertools=10.2.0=pyhd8ed1ab_0 + - msgpack-python=1.0.7=py310hd41b1e2_0 + - multidict=6.0.5=py310h2372a71_0 + - nbclient=0.8.0=py310h06a4308_0 + - nbconvert=7.10.0=py310h06a4308_0 + - nbformat=5.9.2=py310h06a4308_0 + - ncurses=6.4=h6a678d5_0 + - nest-asyncio=1.6.0=py310h06a4308_0 + - notebook-shim=0.2.3=py310h06a4308_0 + - numexpr=2.8.7=py310h85018f9_0 + - numpy=1.26.4=py310h5f9d8c6_0 + - numpy-base=1.26.4=py310hb5e798b_0 + - oauthlib=3.2.2=pyhd8ed1ab_0 + - openjpeg=2.5.2=h488ebb8_0 + - openssl=3.3.1=h4ab18f5_1 + - opentelemetry-api=1.16.0=pyhd8ed1ab_0 + - opentelemetry-exporter-otlp=1.16.0=pyhd8ed1ab_0 + - opentelemetry-exporter-otlp-proto-grpc=1.16.0=pyhd8ed1ab_0 + - opentelemetry-exporter-otlp-proto-http=1.16.0=pyhd8ed1ab_0 + - opentelemetry-proto=1.16.0=pyhd8ed1ab_0 + - opentelemetry-sdk=1.16.0=pyhd8ed1ab_0 + - opentelemetry-semantic-conventions=0.37b0=pyhd8ed1ab_0 + - orc=2.0.0=h1e5e2c1_0 + - ordered-set=4.1.0=pyhd8ed1ab_0 + - orderedmultidict=1.0.1=pyhd8ed1ab_1 + - overrides=7.4.0=py310h06a4308_0 + - packaging=23.1=py310h06a4308_0 + - pandas=2.2.1=py310hcc13569_0 + - pandocfilters=1.5.0=pyhd3eb1b0_0 + - paramiko=3.4.0=pyhd8ed1ab_0 + - parso=0.8.3=pyhd3eb1b0_0 + - partd=1.4.1=pyhd8ed1ab_0 + - passlib=1.7.4=pyhd8ed1ab_1 + - pathspec=0.12.1=pyhd8ed1ab_0 + - pendulum=3.0.0=py310h58cd448_0 + - pexpect=4.8.0=pyhd3eb1b0_3 + - phonenumbers=8.13.31=pyhd8ed1ab_0 + - pillow=10.0.1=py310h29da1c1_1 + - pip=23.3.1=py310h06a4308_0 + - platformdirs=3.10.0=py310h06a4308_0 + - pluggy=1.4.0=pyhd8ed1ab_0 + - ply=3.11=py_1 + - prison=0.2.1=pyhd8ed1ab_0 + - prometheus_client=0.14.1=py310h06a4308_0 + - prompt-toolkit=3.0.43=py310h06a4308_0 + - prompt_toolkit=3.0.43=hd3eb1b0_0 + - protobuf=4.25.3=py310ha8c1f0e_0 + - psutil=5.9.0=py310h5eee18b_0 + - psycopg2=2.9.9=py310h275853b_0 + - pthread-stubs=0.4=h36c2ea0_1001 + - ptyprocess=0.7.0=pyhd3eb1b0_2 + - pure_eval=0.2.2=pyhd3eb1b0_0 + - pyarrow=15.0.1=py310hf9e7431_1_cpu + - pyarrow-hotfix=0.6=pyhd8ed1ab_0 + - pyasn1=0.5.1=pyhd8ed1ab_0 + - pyasn1-modules=0.3.0=pyhd8ed1ab_0 + - pyathena=3.3.0=pyhd8ed1ab_0 + - pycparser=2.21=pyhd3eb1b0_0 + - pygments=2.15.1=py310h06a4308_1 + - pyjwt=2.8.0=pyhd8ed1ab_1 + - pynacl=1.5.0=py310h2372a71_3 + - pyopenssl=24.0.0=pyhd8ed1ab_0 + - pyparsing=3.0.9=py310h06a4308_0 + - pysocks=1.7.1=py310h06a4308_0 + - python=3.10.13=h955ad1f_0 + - python-daemon=3.0.1=pyhd8ed1ab_2 + - python-dateutil=2.8.2=pyhd3eb1b0_0 + - python-dotenv=1.0.1=pyhd8ed1ab_0 + - python-fastjsonschema=2.16.2=py310h06a4308_0 + - python-json-logger=2.0.7=py310h06a4308_0 + - python-kubernetes=29.0.0=pyhd8ed1ab_0 + - python-nvd3=0.15.0=pyh9f0ad1d_2 + - python-slugify=8.0.4=pyhd8ed1ab_0 + - python-swiftclient=4.5.0=pyhd8ed1ab_0 + - python-tzdata=2023.3=pyhd3eb1b0_0 + - python_abi=3.10=2_cp310 + - pytz=2023.3.post1=py310h06a4308_0 + - pyu2f=0.1.5=pyhd8ed1ab_0 + - pywin32-on-windows=0.1.0=pyh1179c8e_3 + - pyyaml=6.0.1=py310h5eee18b_0 + - pyzmq=25.1.2=py310h6a678d5_0 + - rdma-core=50.0=hd3aeb46_1 + - re2=2023.09.01=h7f4b329_2 + - readline=8.2=h5eee18b_0 + - redshift_connector=2.1.0=pyhd8ed1ab_0 + - referencing=0.30.2=py310h06a4308_0 + - requests=2.31.0=py310h06a4308_1 + - requests-oauthlib=1.4.0=pyhd8ed1ab_0 + - requests-toolbelt=1.0.0=pyhd8ed1ab_0 + - rfc3339-validator=0.1.4=py310h06a4308_0 + - rfc3986-validator=0.1.1=py310h06a4308_0 + - rich=13.7.1=pyhd8ed1ab_0 + - rich-argparse=1.4.0=pyhd8ed1ab_0 + - rpds-py=0.10.6=py310hb02cf49_0 + - rsa=4.9=pyhd8ed1ab_0 + - s2n=1.4.5=h06160fa_0 + - s3transfer=0.10.0=pyhd8ed1ab_0 + - scramp=1.4.4=pyhd8ed1ab_0 + - send2trash=1.8.2=py310h06a4308_0 + - setproctitle=1.3.3=py310h2372a71_0 + - setuptools=68.2.2=py310h06a4308_0 + - six=1.16.0=pyhd3eb1b0_1 + - snappy=1.1.10=h9fff704_0 + - sniffio=1.3.0=py310h06a4308_0 + - sortedcontainers=2.4.0=pyhd8ed1ab_0 + - soupsieve=2.5=py310h06a4308_0 + - sqlalchemy=1.4.49=py310h2372a71_1 + - sqlalchemy-jsonfield=1.0.1.post0=pyhd8ed1ab_0 + - sqlalchemy-redshift=0.8.14=pyhd8ed1ab_0 + - sqlalchemy-utils=0.41.1=pyhd8ed1ab_0 + - sqlalchemy-utils-arrow=0.41.1=pyhd8ed1ab_0 + - sqlalchemy-utils-babel=0.41.1=pyhd8ed1ab_0 + - sqlalchemy-utils-base=0.41.1=pyhd8ed1ab_0 + - sqlalchemy-utils-color=0.41.1=pyhd8ed1ab_0 + - sqlalchemy-utils-encrypted=0.41.1=pyhd8ed1ab_0 + - sqlalchemy-utils-intervals=0.41.1=pyhd8ed1ab_0 + - sqlalchemy-utils-password=0.41.1=pyhd8ed1ab_0 + - sqlalchemy-utils-pendulum=0.41.1=pyhd8ed1ab_0 + - sqlalchemy-utils-phone=0.41.1=pyhd8ed1ab_0 + - sqlalchemy-utils-timezone=0.41.1=pyhd8ed1ab_0 + - sqlalchemy-utils-url=0.41.1=pyhd8ed1ab_0 + - sqlite=3.41.2=h5eee18b_0 + - sqlparse=0.4.4=pyhd8ed1ab_0 + - stack_data=0.2.0=pyhd3eb1b0_0 + - tabulate=0.9.0=pyhd8ed1ab_1 + - tbb=2021.8.0=hdb19cb5_0 + - tblib=3.0.0=pyhd8ed1ab_0 + - tenacity=8.2.3=pyhd8ed1ab_0 + - termcolor=2.4.0=pyhd8ed1ab_0 + - terminado=0.17.1=py310h06a4308_0 + - text-unidecode=1.3=pyhd8ed1ab_1 + - time-machine=2.13.0=py310h2372a71_1 + - tinycss2=1.2.1=py310h06a4308_0 + - tk=8.6.12=h1ccaba5_0 + - tomli=2.0.1=py310h06a4308_0 + - toolz=0.12.1=pyhd8ed1ab_0 + - tornado=6.3.3=py310h5eee18b_0 + - tqdm=4.65.0=py310h2f386ee_0 + - traitlets=5.7.1=py310h06a4308_0 + - types-python-dateutil=2.8.19.20240311=pyhd8ed1ab_0 + - typing-extensions=4.9.0=py310h06a4308_1 + - typing_extensions=4.9.0=py310h06a4308_1 + - tzdata=2024a=h04d1e81_0 + - uc-micro-py=1.0.3=pyhd8ed1ab_0 + - ucx=1.15.0=h9929b8b_6 + - unicodecsv=0.14.1=pyhd8ed1ab_2 + - universal_pathlib=0.1.4=pyhd8ed1ab_0 + - vine=5.1.0=pyhd8ed1ab_0 + - watchtower=3.1.0=pyhd8ed1ab_0 + - wcwidth=0.2.5=pyhd3eb1b0_0 + - webencodings=0.5.1=py310h06a4308_1 + - websocket-client=0.58.0=py310h06a4308_4 + - werkzeug=2.3.8=pyhd8ed1ab_0 + - wheel=0.41.2=py310h06a4308_0 + - wrapt=1.16.0=py310h2372a71_0 + - wtforms=3.1.2=pyhd8ed1ab_0 + - xorg-libxau=1.0.11=hd590300_0 + - xorg-libxdmcp=1.1.3=h7f98852_0 + - xyzservices=2023.10.1=pyhd8ed1ab_0 + - xz=5.4.6=h5eee18b_0 + - yaml=0.2.5=h7b6447c_0 + - yarl=1.9.4=py310h2372a71_0 + - zeromq=4.3.5=h6a678d5_0 + - zict=3.0.0=pyhd8ed1ab_0 + - zipp=3.17.0=pyhd8ed1ab_0 + - zlib=1.2.13=hd590300_5 + - zstd=1.5.5=hfc55251_0 + - pip: + - bucket-manager==0.4.0.dev2 + - urllib3==2.0.7 +prefix: /home/dave/miniconda3/envs/lsst-uk diff --git a/scripts/process_collated_zips.py b/scripts/process_collated_zips.py index ce834cc..4f616d1 100644 --- a/scripts/process_collated_zips.py +++ b/scripts/process_collated_zips.py @@ -154,7 +154,7 @@ def extract_and_upload_mp(bucket_name, debug, zipfile_key): pbar.update(zf.getinfo(content_file).file_size) def extract_and_upload_zipfiles(extract_list, bucket_name, pool_size, debug): - print(f'Extracting zip files and uploading contents using {pool_size} processes...') + print(f'Extracting zip files and uploading contents using {pool_size} processes...') with Pool(pool_size) as p: p.map(partial(extract_and_upload_mp, bucket_name, debug), extract_list)#, chunksize=len(extract_list)//pool_size) @@ -227,12 +227,12 @@ def error(self, message): debug = True else: debug = False - + if args.extract: extract = True else: extract = False - + if args.nprocs: nprocs = args.nprocs if nprocs < 1: @@ -247,13 +247,12 @@ def error(self, message): get_contents_metadata = True # Setup bucket object - try: assert bm.check_keys() except AssertionError as e: print(e) sys.exit() - + s3 = bm.get_resource() bucket_list = bm.bucket_list(s3) @@ -266,7 +265,7 @@ def error(self, message): if list_contents: for i in range(len(zipfiles_df)): print(f'{zipfiles_df.iloc[i]["zipfile"]}: {zipfiles_df.iloc[i]["contents"]}') - + if verify_contents: print('Verifying zip file contents...') zipfiles_df = prepend_zipfile_path_to_contents(zipfiles_df, debug) @@ -276,9 +275,7 @@ def error(self, message): print(extract_list) else: print('All zip files previously extracted.') - # for zipfile in extract_list: - # print(zipfile) - + if extract: print('Extracting zip files...') zipfiles_df = prepend_zipfile_path_to_contents(zipfiles_df, debug) @@ -291,9 +288,7 @@ def error(self, message): else: print('All zip files previously extracted.') - print('Done.') - if __name__ == '__main__': main() \ No newline at end of file