Skip to content

Commit

Permalink
implement last_successful_run code and configuration for status
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-jones committed Oct 10, 2024
1 parent 2584b0c commit 85ebff1
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 88 deletions.
13 changes: 8 additions & 5 deletions doajtest/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from portality.lib import paths, dates
from portality.lib.dates import FMT_DATE_STD
from portality.lib.thread_utils import wait_until
from portality.tasks.redis_huey import main_queue, long_running
from portality.tasks.redis_huey import events_queue, scheduled_short_queue, scheduled_long_queue
from portality.util import url_for


Expand Down Expand Up @@ -163,10 +163,13 @@ def setUpClass(cls) -> None:
# always_eager has been replaced by immediate
# for huey version > 2
# https://huey.readthedocs.io/en/latest/guide.html
main_queue.always_eager = True
long_running.always_eager = True
main_queue.immediate = True
long_running.immediate = True
events_queue.always_eager = True
scheduled_short_queue.always_eager = True
scheduled_long_queue.always_eager = True

events_queue.immediate = True
scheduled_short_queue.immediate = True
scheduled_long_queue.immediate = True

dao.DomainObject.save = dao_proxy(dao.DomainObject.save, type="instance")
dao.DomainObject.delete = dao_proxy(dao.DomainObject.delete, type="instance")
Expand Down
17 changes: 9 additions & 8 deletions doajtest/unit/test_background_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
from portality.background import BackgroundTask
from portality.core import app
from portality.tasks.helpers import background_helper
from portality.tasks.redis_huey import long_running, main_queue
from portality.tasks.redis_huey import events_queue, scheduled_short_queue, scheduled_long_queue


class TestBackgroundHelper(TestCase):

def test_get_queue_id_by_task_queue(self):
cases = [
(long_running, constants.BGJOB_QUEUE_ID_LONG),
(main_queue, constants.BGJOB_QUEUE_ID_MAIN),
(events_queue, constants.BGJOB_QUEUE_ID_EVENTS),
(scheduled_long_queue, constants.BGJOB_QUEUE_ID_SCHEDULED_LONG),
(scheduled_short_queue, constants.BGJOB_QUEUE_ID_SCHEDULED_SHORT),
(None, constants.BGJOB_QUEUE_ID_UNKNOWN),
]

Expand Down Expand Up @@ -57,7 +58,7 @@ def tearDownClass(cls) -> None:
helpers.patch_config(app, cls.org_config)

def test_register_schedule(self):
helper = background_helper.RedisHueyTaskHelper(main_queue, fixture_bgtask_class(self.task_name_a))
helper = background_helper.RedisHueyTaskHelper(scheduled_short_queue, fixture_bgtask_class(self.task_name_a))

@helper.register_schedule
def _fn():
Expand All @@ -66,15 +67,15 @@ def _fn():
assert isinstance(_fn, huey.api.TaskWrapper)

def test_register_schedule__schedule_not_found(self):
helper = background_helper.RedisHueyTaskHelper(main_queue,
helper = background_helper.RedisHueyTaskHelper(scheduled_short_queue,
fixture_bgtask_class(self.task_name_schedule_not_exist))
with self.assertRaises(RuntimeError):
@helper.register_schedule
def _fn():
print('fake fn')

def test_register_execute(self):
helper = background_helper.RedisHueyTaskHelper(main_queue, fixture_bgtask_class(self.task_name_b))
helper = background_helper.RedisHueyTaskHelper(scheduled_short_queue, fixture_bgtask_class(self.task_name_b))

@helper.register_execute(is_load_config=True)
def _fn():
Expand All @@ -84,7 +85,7 @@ def _fn():
assert _fn.retries == self.expected_retries

def test_register_execute__config_not_found(self):
helper = background_helper.RedisHueyTaskHelper(main_queue,
helper = background_helper.RedisHueyTaskHelper(events_queue,
fixture_bgtask_class(self.task_name_schedule_not_exist))

with self.assertRaises(RuntimeError):
Expand All @@ -93,7 +94,7 @@ def _fn():
print('fake fn')

def test_register_execute__without_load_config(self):
helper = background_helper.RedisHueyTaskHelper(main_queue,
helper = background_helper.RedisHueyTaskHelper(events_queue,
fixture_bgtask_class(self.task_name_schedule_not_exist))

@helper.register_execute(is_load_config=False)
Expand Down
52 changes: 27 additions & 25 deletions doajtest/unit/test_background_task_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,17 @@
# config BG_MONITOR_LAST_COMPLETED
bg_monitor_last_completed__now = {
'BG_MONITOR_LAST_COMPLETED': {
'main_queue': 0,
'long_running': 0,
'events_queue': 0,
'scheduled_long': 0,
'scheduled_short': 0
}
}

bg_monitor_last_completed__a = {
'BG_MONITOR_LAST_COMPLETED': {
'main_queue': 10000,
'long_running': 10000,
'events_queue': 10000,
'scheduled_long': 10000,
'scheduled_short': 10000
}
}

Expand Down Expand Up @@ -95,43 +97,43 @@ def assert_unstable_dict(val):
assert len(val.get('err_msgs'))

@apply_test_case_config(bg_monitor_last_completed__now)
def test_create_background_status__invalid_last_completed__main_queue(self):
def test_create_background_status__invalid_last_completed__events_queue(self):
save_mock_bgjob(JournalCSVBackgroundTask.__action__,
queue_id=constants.BGJOB_QUEUE_ID_MAIN,
queue_id=constants.BGJOB_QUEUE_ID_EVENTS,
status=constants.BGJOB_STATUS_COMPLETE, )

status_dict = background_task_status.create_background_status()

assert not is_stable(status_dict['status'])
self.assert_unstable_dict(status_dict['queues'].get('main_queue', {}))
self.assert_stable_dict(status_dict['queues'].get('long_running', {}))
self.assert_unstable_dict(status_dict['queues'].get('events', {}))
self.assert_stable_dict(status_dict['queues'].get('scheduled_long', {}))

@apply_test_case_config(bg_monitor_last_completed__now)
def test_create_background_status__invalid_last_completed__long_running(self):
def test_create_background_status__invalid_last_completed__scheduled_long(self):
save_mock_bgjob(AnonExportBackgroundTask.__action__,
queue_id=constants.BGJOB_QUEUE_ID_LONG,
queue_id=constants.BGJOB_QUEUE_ID_SCHEDULED_LONG,
status=constants.BGJOB_STATUS_COMPLETE, )

status_dict = background_task_status.create_background_status()

assert not is_stable(status_dict['status'])
self.assert_stable_dict(status_dict['queues'].get('main_queue', {}))
self.assert_unstable_dict(status_dict['queues'].get('long_running', {}))
self.assert_stable_dict(status_dict['queues'].get('events', {}))
self.assert_unstable_dict(status_dict['queues'].get('scheduled_long', {}))

@apply_test_case_config(bg_monitor_last_completed__a)
def test_create_background_status__valid_last_completed(self):
save_mock_bgjob(JournalCSVBackgroundTask.__action__,
queue_id=constants.BGJOB_QUEUE_ID_MAIN,
queue_id=constants.BGJOB_QUEUE_ID_EVENTS,
status=constants.BGJOB_STATUS_COMPLETE, )
save_mock_bgjob(AnonExportBackgroundTask.__action__,
queue_id=constants.BGJOB_QUEUE_ID_LONG,
queue_id=constants.BGJOB_QUEUE_ID_SCHEDULED_LONG,
status=constants.BGJOB_STATUS_COMPLETE, )

status_dict = background_task_status.create_background_status()

assert is_stable(status_dict['status'])
self.assert_stable_dict(status_dict['queues'].get('main_queue', {}))
self.assert_stable_dict(status_dict['queues'].get('long_running', {}))
self.assert_stable_dict(status_dict['queues'].get('events', {}))
self.assert_stable_dict(status_dict['queues'].get('scheduled_long', {}))

@apply_test_case_config(bg_monitor_last_completed__now)
def test_create_background_status__valid_last_completed__no_record(self):
Expand All @@ -145,13 +147,13 @@ def test_create_background_status__empty_errors_config(self):

status_dict = background_task_status.create_background_status()

journal_csv_dict = status_dict['queues']['main_queue']['errors'].get(JournalCSVBackgroundTask.__action__, {})
journal_csv_dict = status_dict['queues']['scheduled_short']['errors'].get(JournalCSVBackgroundTask.__action__, {})

assert not is_stable(status_dict['status'])
assert journal_csv_dict
# unstable action should be on top of the list after sorting
first_key = next(iter(status_dict['queues']['main_queue']['errors']))
assert not is_stable(status_dict['queues']['main_queue']['errors'][first_key]['status'])
first_key = next(iter(status_dict['queues']['scheduled_short']['errors']))
assert not is_stable(status_dict['queues']['scheduled_short']['errors'][first_key]['status'])

@apply_test_case_config(bg_monitor_errors_config__a)
def test_create_background_status__error_in_period_found(self):
Expand All @@ -160,7 +162,7 @@ def test_create_background_status__error_in_period_found(self):

status_dict = background_task_status.create_background_status()

journal_csv_dict = status_dict['queues']['main_queue']['errors'].get(JournalCSVBackgroundTask.__action__, {})
journal_csv_dict = status_dict['queues']['scheduled_short']['errors'].get(JournalCSVBackgroundTask.__action__, {})

assert not is_stable(status_dict['status'])
self.assert_unstable_dict(journal_csv_dict)
Expand All @@ -174,7 +176,7 @@ def test_create_background_status__error_in_period_not_found(self):

status_dict = background_task_status.create_background_status()

journal_csv_dict = status_dict['queues']['main_queue']['errors'].get(JournalCSVBackgroundTask.__action__, {})
journal_csv_dict = status_dict['queues']['scheduled_short']['errors'].get(JournalCSVBackgroundTask.__action__, {})

assert is_stable(status_dict['status'])
self.assert_stable_dict(journal_csv_dict)
Expand All @@ -187,7 +189,7 @@ def test_create_background_status__queued_invalid_total(self):

status_dict = background_task_status.create_background_status()

journal_csv_dict = status_dict['queues']['main_queue']['queued'].get(JournalCSVBackgroundTask.__action__, {})
journal_csv_dict = status_dict['queues']['scheduled_short']['queued'].get(JournalCSVBackgroundTask.__action__, {})

assert not is_stable(status_dict['status'])
assert journal_csv_dict.get('total', 0)
Expand All @@ -200,7 +202,7 @@ def test_create_background_status__queued_valid_total(self):

status_dict = background_task_status.create_background_status()

journal_csv_dict = status_dict['queues']['main_queue']['queued'].get(JournalCSVBackgroundTask.__action__, {})
journal_csv_dict = status_dict['queues']['scheduled_short']['queued'].get(JournalCSVBackgroundTask.__action__, {})

assert is_stable(status_dict['status'])
assert journal_csv_dict.get('total', 0) == 0
Expand All @@ -214,7 +216,7 @@ def test_create_background_status__queued_invalid_oldest(self):

status_dict = background_task_status.create_background_status()

journal_csv_dict = status_dict['queues']['main_queue']['queued'].get(JournalCSVBackgroundTask.__action__, {})
journal_csv_dict = status_dict['queues']['scheduled_short']['queued'].get(JournalCSVBackgroundTask.__action__, {})

assert not is_stable(status_dict['status'])
self.assert_unstable_dict(journal_csv_dict)
Expand All @@ -228,7 +230,7 @@ def test_create_background_status__queued_valid_oldest(self):
status_dict = background_task_status.create_background_status()
print(json.dumps(status_dict, indent=4))

journal_csv_dict = status_dict['queues']['main_queue']['queued'].get(JournalCSVBackgroundTask.__action__, {})
journal_csv_dict = status_dict['queues']['scheduled_short']['queued'].get(JournalCSVBackgroundTask.__action__, {})

assert is_stable(status_dict['status'])
self.assert_stable_dict(journal_csv_dict)
Expand Down
5 changes: 3 additions & 2 deletions doajtest/unit_tester/bgtask_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@

def test_queue_id_assigned(bgtask_class: Type[BackgroundTask]):
job = bgtask_class.prepare('just a username')
assert job.queue_id in {constants.BGJOB_QUEUE_ID_MAIN,
constants.BGJOB_QUEUE_ID_LONG}
assert job.queue_id in {constants.BGJOB_QUEUE_ID_EVENTS,
constants.BGJOB_QUEUE_ID_SCHEDULED_LONG,
constants.BGJOB_QUEUE_ID_SCHEDULED_SHORT}
8 changes: 4 additions & 4 deletions docs/dev/how-to-implement.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ How to create a background job
* choice a task queue, details of task queue can have find in `portality/tasks/redis_huey.py`

```python
huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(main_queue)
huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(queue)
```

* add execute function below BackgroundTask class

```python
huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(main_queue)
huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(queue)


@huey_helper.register_execute(is_load_config=False)
Expand Down Expand Up @@ -76,5 +76,5 @@ HUEY_SCHEDULE = {

### Register your task

* add your execute and schedule function in `portality/tasks/consumer_long_running.py`
or `portality/tasks/consumer_main_queue.py`
* add your execute and/or schedule function in `portality/tasks/consumer_scheduled_long.py`
`portality/tasks/consumer_scheduled_short.py` or `portality/tasks/consumer_events_queue.py`
4 changes: 2 additions & 2 deletions docs/dev/user-guide/user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ HUEY_SCHEDULE = {
}
```

* run your `main` background job consumer
* run your `scheduled_short` background job consumer
```
~/venv/doaj/bin/huey_consumer.py portality.tasks.consumer_main_queue.main_queue
~/venv/doaj/bin/huey_consumer.py portality.tasks.consumer_scheduled_short_queue.scheduled_short_queue
```

* wait 10 ~ 30 minute for generate some background jobs
Expand Down
50 changes: 48 additions & 2 deletions portality/bll/services/background_task_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import itertools
from typing import Iterable

from portality.constants import BGJOB_QUEUE_ID_LONG, BGJOB_QUEUE_ID_MAIN, BGJOB_STATUS_ERROR, BGJOB_STATUS_QUEUED, \
from constants import BGJOB_STATUS_COMPLETE
from portality.constants import BGJOB_QUEUE_ID_LONG, BGJOB_QUEUE_ID_MAIN, BGJOB_QUEUE_ID_EVENTS, BGJOB_QUEUE_ID_SCHEDULED_LONG, BGJOB_QUEUE_ID_SCHEDULED_SHORT, BGJOB_STATUS_ERROR, BGJOB_STATUS_QUEUED, \
BG_STATUS_STABLE, BG_STATUS_UNSTABLE
from portality.core import app
from portality.lib import dates
Expand Down Expand Up @@ -33,6 +34,47 @@ def all_stable(self, items: Iterable, field_name='status') -> bool:
def all_stable_str(self, items: Iterable, field_name='status') -> str:
return self.to_bg_status_str(self.all_stable(items, field_name))

def create_last_successfully_run_status(self, action, last_successful_run=0, **_) -> dict:
if last_successful_run == 0:
return dict(
status=BG_STATUS_STABLE,
last_run=None,
last_run_status=None,
err_msgs=[]
)

lr_query = (BackgroundJobQueryBuilder().action(action)
.since(dates.before_now(last_successful_run))
.size(1)
.order_by('created_date', 'desc')
.build_query_dict())

lr_results = BackgroundJob.q2obj(q=lr_query)
lr_job = lr_results and lr_results[0]

status = BG_STATUS_UNSTABLE
lr = None
last_run_status = None
msg = ["No background jobs run in the time period"]

if lr_job is not None:
lr = lr_job.created_date
last_run_status = lr_job.status
if lr_job.status == BGJOB_STATUS_COMPLETE:
status = BG_STATUS_STABLE
msg = []
else:
msg = ["Last job did not complete successfully"]

return dict(
status=status,
last_run=lr,
last_run_status=last_run_status,
err_msgs=msg
)



def create_errors_status(self, action, check_sec=3600, allowed_num_err=0, **_) -> dict:
in_monitoring_query = SimpleBgjobQueue(action, status=BGJOB_STATUS_ERROR, since=dates.before_now(check_sec))
num_err_in_monitoring = BackgroundJob.hit_count(query=in_monitoring_query.query())
Expand Down Expand Up @@ -92,6 +134,9 @@ def create_queues_status(self, queue_name) -> dict:
queued = {action: self.create_queued_status(action, **config) for action, config
in self.get_config_dict_by_queue_name('BG_MONITOR_QUEUED_CONFIG', queue_name).items()}

last_run = {action: self.create_last_successfully_run_status(action, **config) for action, config
in self.get_config_dict_by_queue_name('BG_MONITOR_LAST_SUCCESSFULLY_RUN_CONFIG', queue_name).items()}

# prepare for err_msgs
limited_sec = app.config.get('BG_MONITOR_LAST_COMPLETED', {}).get(queue_name)
if limited_sec is None:
Expand All @@ -111,6 +156,7 @@ def create_queues_status(self, queue_name) -> dict:
last_completed_job=last_completed_date and dates.format(last_completed_date),
errors=errors,
queued=queued,
last_successfully_run=last_run,
err_msgs=err_msgs,
)
return result_dict
Expand All @@ -129,7 +175,7 @@ def get_config_dict_by_queue_name(config_name, queue_name):
def create_background_status(self) -> dict:
queues = {
queue_name: self.create_queues_status(queue_name)
for queue_name in [BGJOB_QUEUE_ID_LONG, BGJOB_QUEUE_ID_MAIN]
for queue_name in [BGJOB_QUEUE_ID_LONG, BGJOB_QUEUE_ID_MAIN, BGJOB_QUEUE_ID_EVENTS, BGJOB_QUEUE_ID_SCHEDULED_LONG, BGJOB_QUEUE_ID_SCHEDULED_SHORT]
}

result_dict = dict(
Expand Down
3 changes: 3 additions & 0 deletions portality/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@
BGJOB_QUEUE_ID_LONG = 'long_running'
BGJOB_QUEUE_ID_MAIN = 'main_queue'
BGJOB_QUEUE_ID_UNKNOWN = 'unknown'
BGJOB_QUEUE_ID_EVENTS = "events"
BGJOB_QUEUE_ID_SCHEDULED_SHORT = "scheduled_short"
BGJOB_QUEUE_ID_SCHEDULED_LONG = "scheduled_long"

# Background monitor status
BG_STATUS_STABLE = 'stable'
Expand Down
Loading

0 comments on commit 85ebff1

Please sign in to comment.