Skip to content

Commit

Permalink
use mem_check
Browse files Browse the repository at this point in the history
  • Loading branch information
davedavemckay committed Nov 8, 2024
1 parent b8fefd0 commit 6a7250c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 28 deletions.
56 changes: 28 additions & 28 deletions csd3-side/scripts/lsst-backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,34 +1062,34 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,


for i in range(len(to_collate)):
mem_half_full = len(zul_futures)*np.sum(np.array([os.stat(fp).st_size for fp in to_collate.iloc[i]['file_paths']])) > len(client.scheduler_info()['workers'])*mem_per_worker / 2
if mem_half_full:
# clean up finished futures, but don't block
print(f'Memory is half full. Cleaning up finished futures.', flush=True)
for f in zul_futures:
result = f.result()
if result[0] is not None:
upload_futures.append(result[0])
to_collate = to_collate[to_collate.object_names != result[1]]
print(f'Zip {result[1]} created and added to upload queue.', flush=True)
del f
else:
print(f'No files to zip as {result[1]}. Skipping upload.', flush=True)
del f
if len(upload_futures) > 0:
if sum([f.status == 'finished' for f in upload_futures]) > 0:
for f in upload_futures:
if 'exception' in f.status and f not in failed:
f_tuple = f.exception(), f.traceback()
del f
if f_tuple not in failed:
failed.append(f_tuple)
elif 'finished' in f.status:
del f
print('Rebalancing memory.', flush=True)
client.rebalance()


# mem_half_full = len(zul_futures)*np.sum(np.array([os.stat(fp).st_size for fp in to_collate.iloc[i]['file_paths']])) > len(client.scheduler_info()['workers'])*mem_per_worker / 2
# if mem_half_full:
# # clean up finished futures, but don't block
# print(f'Memory is half full. Cleaning up finished futures.', flush=True)
# for f in zul_futures:
# result = f.result()
# if result[0] is not None:
# upload_futures.append(result[0])
# to_collate = to_collate[to_collate.object_names != result[1]]
# print(f'Zip {result[1]} created and added to upload queue.', flush=True)
# del f
# else:
# print(f'No files to zip as {result[1]}. Skipping upload.', flush=True)
# del f
# if len(upload_futures) > 0:
# if sum([f.status == 'finished' for f in upload_futures]) > 0:
# for f in upload_futures:
# if 'exception' in f.status and f not in failed:
# f_tuple = f.exception(), f.traceback()
# del f
# if f_tuple not in failed:
# failed.append(f_tuple)
# elif 'finished' in f.status:
# del f
# print('Rebalancing memory.', flush=True)
# client.rebalance()

mem_check(zul_futures)
zul_futures.append(client.submit(
zip_and_upload,
s3_host,
Expand Down
Empty file added csd3-side/test_dask.ipynb
Empty file.

0 comments on commit 6a7250c

Please sign in to comment.