Skip to content

Commit

Permalink
Merge pull request #264 from wguanicedew/dev
Browse files Browse the repository at this point in the history
improve min_request_id setting, to avoid missed requests
  • Loading branch information
wguanicedew authored Jan 9, 2024
2 parents f198073 + bb8cf03 commit 7d40e1b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 3 deletions.
67 changes: 65 additions & 2 deletions main/lib/idds/agents/clerk/clerk.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def __init__(self, num_threads=1, max_number_workers=8, poll_period=10, retrieve
self.poll_period = int(poll_period)
self.retrieve_bulk_size = int(retrieve_bulk_size)
self.config_section = Sections.Clerk
self.start_at = time.time()

if pending_time:
self.pending_time = float(pending_time)
else:
Expand Down Expand Up @@ -152,9 +154,24 @@ def get_new_requests(self):

self.show_queue_size()

if time.time() < self.start_at + 3600:
if BaseAgent.poll_new_min_request_id_times % 30 == 0:
# get_new_requests is called every 10 seconds. 30 * 10 = 300 seconds, which is 5 minutes.
min_request_id = BaseAgent.min_request_id - 1000
else:
min_request_id = BaseAgent.min_request_id
else:
if BaseAgent.poll_new_min_request_id_times % 180 == 0:
# get_new_requests is called every 10 seconds. 180 * 10 = 300 seconds, which is 30 minutes.
min_request_id = BaseAgent.min_request_id - 1000
else:
min_request_id = BaseAgent.min_request_id

BaseAgent.poll_new_min_request_id_times += 1

req_status = [RequestStatus.New, RequestStatus.Extend, RequestStatus.Built, RequestStatus.Throttling]
reqs_new = core_requests.get_requests_by_status_type(status=req_status, locking=True,
not_lock=True, min_request_id=BaseAgent.min_request_id,
not_lock=True, min_request_id=min_request_id,
new_poll=True, only_return_id=True,
bulk_size=self.retrieve_bulk_size)

Expand All @@ -164,6 +181,7 @@ def get_new_requests(self):

events = []
for req_id in reqs_new:
BaseAgent.min_request_id_cache[req_id] = time.time()
if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id:
BaseAgent.min_request_id = req_id
core_requests.set_min_request_id(BaseAgent.min_request_id)
Expand Down Expand Up @@ -192,14 +210,29 @@ def get_running_requests(self):

self.show_queue_size()

if time.time() < self.start_at + 3600:
if BaseAgent.poll_running_min_request_id_times % 30 == 0:
# get_new_requests is called every 10 seconds. 30 * 10 = 300 seconds, which is 5 minutes.
min_request_id = BaseAgent.min_request_id - 1000
else:
min_request_id = BaseAgent.min_request_id
else:
if BaseAgent.poll_running_min_request_id_times % 180 == 0:
# get_new_requests is called every 10 seconds. 180 * 10 = 1800 seconds, which is 30 minutes.
min_request_id = BaseAgent.min_request_id - 1000
else:
min_request_id = BaseAgent.min_request_id

BaseAgent.poll_running_min_request_id_times += 1

req_status = [RequestStatus.Transforming, RequestStatus.ToCancel, RequestStatus.Cancelling,
RequestStatus.ToSuspend, RequestStatus.Suspending,
RequestStatus.ToExpire, RequestStatus.Expiring,
RequestStatus.ToFinish, RequestStatus.ToForceFinish,
RequestStatus.ToResume, RequestStatus.Resuming,
RequestStatus.Building]
reqs = core_requests.get_requests_by_status_type(status=req_status, time_period=None,
min_request_id=BaseAgent.min_request_id,
min_request_id=min_request_id,
locking=True, bulk_size=self.retrieve_bulk_size,
not_lock=True, update_poll=True, only_return_id=True)

Expand All @@ -209,6 +242,7 @@ def get_running_requests(self):

events = []
for req_id in reqs:
BaseAgent.min_request_id_cache[req_id] = time.time()
if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id:
BaseAgent.min_request_id = req_id
core_requests.set_min_request_id(BaseAgent.min_request_id)
Expand Down Expand Up @@ -261,6 +295,7 @@ def get_operation_requests(self):

if BaseAgent.min_request_id is None or BaseAgent.min_request_id > request_id:
BaseAgent.min_request_id = request_id
BaseAgent.min_request_id_cache[request_id] = time.time()
core_requests.set_min_request_id(BaseAgent.min_request_id)

event = None
Expand Down Expand Up @@ -295,6 +330,32 @@ def get_operation_requests(self):
self.logger.error(traceback.format_exc())
return []

def clean_min_request_id(self):
try:
if BaseAgent.checking_min_request_id_times <= 0:
old_min_request_id = core_requests.get_min_request_id()
self.logger.info("old_min_request_id: %s" % old_min_request_id)
if not old_min_request_id:
min_request_id = 0
else:
min_request_id = old_min_request_id - 1000
BaseAgent.min_request_id = min_request_id
else:
for req_id in BaseAgent.min_request_id_cache:
time_stamp = BaseAgent.min_request_id_cache[req_id]
if time_stamp < time.time() - 12 * 3600: # older than 12 hours
del BaseAgent.min_request_id_cache[req_id]

if BaseAgent.min_request_id_cache:
min_request_id = min(list(BaseAgent.min_request_id_cache.keys()))
BaseAgent.min_request_id = min_request_id
core_requests.set_min_request_id(BaseAgent.min_request_id)

BaseAgent.checking_min_request_id_times += 1
except Exception as ex:
self.logger.error(ex)
self.logger.error(traceback.format_exc())

def get_request(self, request_id, status=None, locking=False):
try:
return core_requests.get_request_by_id_status(request_id=request_id, status=status, locking=locking)
Expand Down Expand Up @@ -1311,6 +1372,8 @@ def run(self):
self.add_task(task)
task = self.create_task(task_func=self.get_operation_requests, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
self.add_task(task)
task = self.create_task(task_func=self.clean_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=3600, priority=1)
self.add_task(task)
task = self.create_task(task_func=self.clean_locks, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1800, priority=1)
self.add_task(task)

Expand Down
6 changes: 5 additions & 1 deletion main/lib/idds/agents/common/baseagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2019 - 2023
# - Wen Guan, <[email protected]>, 2019 - 2024

import os
import socket
Expand Down Expand Up @@ -37,6 +37,10 @@ class BaseAgent(TimerScheduler, PluginBase):
"""

min_request_id = None
min_request_id_cache = {}
checking_min_request_id_times = 0
poll_new_min_request_id_times = 0
poll_running_min_request_id_times = 0

def __init__(self, num_threads=1, name=None, logger=None, **kwargs):
super(BaseAgent, self).__init__(num_threads, name=name)
Expand Down

0 comments on commit 7d40e1b

Please sign in to comment.