Skip to content

Commit

Permalink
unindent for as_futures - potentially no need for stream
Browse files Browse the repository at this point in the history
davedavemckay committed Dec 17, 2024
1 parent a5e0369 commit 46cd5e4
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions csd3-side/scripts/lsst-backup.py
Original file line number Diff line number Diff line change
@@ -1381,14 +1381,15 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des
processing_start,
mem_per_worker,
))
for ulf in as_completed(upload_futures):
if 'exception' in ulf.status or 'error' in ulf.status:
f_tuple = ulf.exception(), ulf.traceback()
failed.append(f_tuple)
upload_futures.remove(ulf)
else:
upload_futures.remove(ulf)
to_collate.loc[to_collate['id'] == id, 'upload'] = False

for ulf in as_completed(upload_futures):
if 'exception' in ulf.status or 'error' in ulf.status:
f_tuple = ulf.exception(), ulf.traceback()
failed.append(f_tuple)
upload_futures.remove(ulf)
else:
upload_futures.remove(ulf)
to_collate.loc[to_collate['id'] == id, 'upload'] = False



@@ -1840,14 +1841,14 @@ def error(self, message):
elif api == 'swift':
process_files(s3, bucket_name, api, current_objects, exclude, local_dir, destination_dir, dryrun, log, global_collate, use_compression, client, mem_per_worker, collate_list_file, save_collate_list, file_count_stop)

def get_all_tasks(dask_scheduler=None):
return dask_scheduler.tasks
# def get_all_tasks(dask_scheduler=None):
# return dask_scheduler.tasks

stream = client.run_on_scheduler(get_all_tasks)
print(stream)
if len(stream) > 0:
print('Waiting for tasks to complete.')
client.wait(stream)
# stream = client.run_on_scheduler(get_all_tasks)
# print(stream)
# if len(stream) > 0:
# print('Waiting for tasks to complete.')
# client.wait(stream)
# success = True
# except Exception as e:
# print(e)

0 comments on commit 46cd5e4

Please sign in to comment.