Skip to content

Commit

Permalink
Merge pull request #231 from HSF/dev
Browse files Browse the repository at this point in the history
bulk submitter and poller updates to db
  • Loading branch information
wguanicedew authored Oct 26, 2023
2 parents e0ad9b5 + 13f8dbe commit 006f317
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 66 deletions.
21 changes: 21 additions & 0 deletions common/lib/idds/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@
DATE_FORMAT = '%a, %d %b %Y %H:%M:%S UTC'


def get_log_dir():
if config_has_section('common') and config_has_option('common', 'logdir'):
return config_get('common', 'logdir')
return "/var/log/idds"


def setup_logging(name, stream=None, loglevel=None):
"""
Setup logging
Expand Down Expand Up @@ -588,3 +594,18 @@ def pid_exists(pid):
def get_list_chunks(full_list, bulk_size=2000):
chunks = [full_list[i:i + bulk_size] for i in range(0, len(full_list), bulk_size)]
return chunks


def report_availability(availability):
try:
log_dir = get_log_dir()
if log_dir:
filename = os.path.join(log_dir, 'idds_availability')
with open(filename, 'w') as f:
json.dump(availability, f)
else:
print("availability: %s" % str(availability))
except Exception as ex:
error = "Failed to report availablity: %s" % str(ex)
print(error)
logging.debug(error)
4 changes: 2 additions & 2 deletions main/config_default/idds.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ coordination_interval_delay = 300


[clerk]
num_threads = 16
max_number_workers = 16
num_threads = 4
max_number_workers = 4
poll_period = 300
new_poll_period = 10
update_poll_period = 300
Expand Down
8 changes: 4 additions & 4 deletions main/lib/idds/agents/carrier/finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ class Finisher(Poller):
Finisher works to submit and running tasks to WFMS.
"""

def __init__(self, num_threads=1, finisher_max_number_workers=3, max_number_workers=3, poll_time_period=10, retries=3, retrieve_bulk_size=2,
def __init__(self, num_threads=1, finisher_max_number_workers=None, max_number_workers=3, poll_time_period=10, retries=3, retrieve_bulk_size=2,
message_bulk_size=1000, **kwargs):
if finisher_max_number_workers > num_threads:
self.max_number_workers = finisher_max_number_workers
if finisher_max_number_workers:
self.max_number_workers = int(finisher_max_number_workers)
else:
self.max_number_workers = max_number_workers
self.max_number_workers = int(max_number_workers)
self.set_max_workers()

num_threads = int(self.max_number_workers)
Expand Down
6 changes: 5 additions & 1 deletion main/lib/idds/agents/carrier/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Poller(BaseAgent):
"""

def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2,
name='Poller', message_bulk_size=1000, **kwargs):
max_updates_per_round=2000, name='Poller', message_bulk_size=1000, **kwargs):
self.max_number_workers = max_number_workers
if int(num_threads) < int(self.max_number_workers):
num_threads = int(self.max_number_workers)
Expand Down Expand Up @@ -83,6 +83,9 @@ def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries=
else:
self.max_number_workers = int(self.max_number_workers)

self.max_updates_per_round = max_updates_per_round
self.logger.info("max_updates_per_round: %s" % self.max_updates_per_round)

self.show_queue_size_time = None

def is_ok_to_run_more_processings(self):
Expand Down Expand Up @@ -279,6 +282,7 @@ def handle_update_processing(self, processing):
log_prefix = self.get_log_prefix(processing)
ret_handle_update_processing = handle_update_processing(processing,
self.agent_attributes,
max_updates_per_round=self.max_updates_per_round,
logger=self.logger,
log_prefix=log_prefix)

Expand Down
1 change: 1 addition & 0 deletions main/lib/idds/agents/carrier/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def handle_new_processing(self, processing):
ret_new_processing = handle_new_processing(processing,
self.agent_attributes,
func_site_to_cloud=self.get_site_to_cloud,
max_updates_per_round=self.max_updates_per_round,
logger=self.logger,
log_prefix=log_prefix)
status, processing, update_colls, new_contents, new_input_dependency_contents, msgs, errors = ret_new_processing
Expand Down
19 changes: 11 additions & 8 deletions main/lib/idds/agents/carrier/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ class Trigger(Poller):
Trigger works to trigger to release jobs
"""

def __init__(self, num_threads=1, trigger_max_number_workers=3, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2,
def __init__(self, num_threads=1, trigger_max_number_workers=None, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2,
name='Trigger', message_bulk_size=1000, max_updates_per_round=2000, **kwargs):
if trigger_max_number_workers > num_threads:
self.max_number_workers = trigger_max_number_workers
if trigger_max_number_workers:
self.max_number_workers = int(trigger_max_number_workers)
else:
self.max_number_workers = max_number_workers
self.max_number_workers = int(max_number_workers)

self.set_max_workers()

num_threads = int(self.max_number_workers)
super(Trigger, self).__init__(num_threads=num_threads, name=name, max_number_workers=self.max_number_workers,
retrieve_bulk_size=retrieve_bulk_size, **kwargs)
max_updates_per_round=max_updates_per_round, retrieve_bulk_size=retrieve_bulk_size, **kwargs)
self.logger.info("num_threads: %s" % num_threads)

self.max_updates_per_round = max_updates_per_round
Expand Down Expand Up @@ -103,7 +103,7 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False):
max_updates_per_round=self.max_updates_per_round,
logger=self.logger,
log_prefix=log_prefix)
process_status, update_contents, ret_msgs, parameters, update_dep_contents_status_name, update_dep_contents_status, new_update_contents, ret_update_transforms = ret_trigger_processing
process_status, update_contents, ret_msgs, parameters, update_dep_contents_status_name, update_dep_contents_status, new_update_contents, ret_update_transforms, has_updates = ret_trigger_processing

self.logger.debug(log_prefix + "handle_trigger_processing: ret_update_transforms: %s" % str(ret_update_transforms))

Expand All @@ -127,7 +127,8 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False):
'new_update_contents': new_update_contents,
'update_transforms': ret_update_transforms,
'update_dep_contents': (processing['request_id'], update_dep_contents_status_name, update_dep_contents_status),
'processing_status': new_process_status}
'processing_status': new_process_status,
'has_updates': has_updates}
except exceptions.ProcessFormatNotSupported as ex:
self.logger.error(ex)
self.logger.error(traceback.format_exc())
Expand Down Expand Up @@ -199,6 +200,7 @@ def process_trigger_processing_real(self, event):
self.update_processing(ret, pr)

update_transforms = ret.get('update_transforms', None)
has_updates = ret.get('has_updates', None)
if update_transforms:
# self.logger.info(log_pre + "update_contents_to_others_by_dep_id")
# core_catalog.update_contents_to_others_by_dep_id(request_id=pr['request_id'], transform_id=pr['transform_id'])
Expand Down Expand Up @@ -231,7 +233,8 @@ def process_trigger_processing_real(self, event):
if ((event._content and 'has_updates' in event._content and event._content['has_updates'])
or ('update_contents' in ret and ret['update_contents']) # noqa W503
or ('new_contents' in ret and ret['new_contents']) # noqa W503
or ('messages' in ret and ret['messages'])): # noqa E129
or ('messages' in ret and ret['messages']) # noqa W503
or has_updates): # noqa E129
self.logger.info(log_pre + "SyncProcessingEvent(processing_id: %s)" % pr['processing_id'])
event = SyncProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'],
content=event._content,
Expand Down
Loading

0 comments on commit 006f317

Please sign in to comment.