From 6a7250c6e4ba5a77cbe17edfbab7b8dbbe6c4840 Mon Sep 17 00:00:00 2001 From: Dave McKay Date: Fri, 8 Nov 2024 16:00:28 +0000 Subject: [PATCH] use mem_check --- csd3-side/scripts/lsst-backup.py | 56 ++++++++++++++++---------------- csd3-side/test_dask.ipynb | 0 2 files changed, 28 insertions(+), 28 deletions(-) create mode 100644 csd3-side/test_dask.ipynb diff --git a/csd3-side/scripts/lsst-backup.py b/csd3-side/scripts/lsst-backup.py index 5d2d274..19bdde3 100644 --- a/csd3-side/scripts/lsst-backup.py +++ b/csd3-side/scripts/lsst-backup.py @@ -1062,34 +1062,34 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects, for i in range(len(to_collate)): - mem_half_full = len(zul_futures)*np.sum(np.array([os.stat(fp).st_size for fp in to_collate.iloc[i]['file_paths']])) > len(client.scheduler_info()['workers'])*mem_per_worker / 2 - if mem_half_full: - # clean up finished futures, but don't block - print(f'Memory is half full. Cleaning up finished futures.', flush=True) - for f in zul_futures: - result = f.result() - if result[0] is not None: - upload_futures.append(result[0]) - to_collate = to_collate[to_collate.object_names != result[1]] - print(f'Zip {result[1]} created and added to upload queue.', flush=True) - del f - else: - print(f'No files to zip as {result[1]}. Skipping upload.', flush=True) - del f - if len(upload_futures) > 0: - if sum([f.status == 'finished' for f in upload_futures]) > 0: - for f in upload_futures: - if 'exception' in f.status and f not in failed: - f_tuple = f.exception(), f.traceback() - del f - if f_tuple not in failed: - failed.append(f_tuple) - elif 'finished' in f.status: - del f - print('Rebalancing memory.', flush=True) - client.rebalance() - - + # mem_half_full = len(zul_futures)*np.sum(np.array([os.stat(fp).st_size for fp in to_collate.iloc[i]['file_paths']])) > len(client.scheduler_info()['workers'])*mem_per_worker / 2 + # if mem_half_full: + # # clean up finished futures, but don't block + # print(f'Memory is half full. Cleaning up finished futures.', flush=True) + # for f in zul_futures: + # result = f.result() + # if result[0] is not None: + # upload_futures.append(result[0]) + # to_collate = to_collate[to_collate.object_names != result[1]] + # print(f'Zip {result[1]} created and added to upload queue.', flush=True) + # del f + # else: + # print(f'No files to zip as {result[1]}. Skipping upload.', flush=True) + # del f + # if len(upload_futures) > 0: + # if sum([f.status == 'finished' for f in upload_futures]) > 0: + # for f in upload_futures: + # if 'exception' in f.status and f not in failed: + # f_tuple = f.exception(), f.traceback() + # del f + # if f_tuple not in failed: + # failed.append(f_tuple) + # elif 'finished' in f.status: + # del f + # print('Rebalancing memory.', flush=True) + # client.rebalance() + + mem_check(zul_futures) zul_futures.append(client.submit( zip_and_upload, s3_host, diff --git a/csd3-side/test_dask.ipynb b/csd3-side/test_dask.ipynb new file mode 100644 index 0000000..e69de29