diff --git a/docs/source/introduction/intro.rst b/docs/source/introduction/intro.rst index cf1e1a837ef..08401368c05 100644 --- a/docs/source/introduction/intro.rst +++ b/docs/source/introduction/intro.rst @@ -53,7 +53,7 @@ OpenKAT scans for vulnerabilities. If you find any, it is valid that you deal wi Many organizations have their contact information in ``security.txt`` in the root of their domain, so you get straight to the right people. Not every organization handles it equally professionally, but that's no reason not to want to use that standard yourself. -If you find any vulnerabilities in the software of OpenKAT itself you can report them per e-mail to: security @ rdobeheer.nl (remove the spaces). +If you find any vulnerabilities in the software of OpenKAT itself you can report them per e-mail to: security @ irealisatie.nl (remove the spaces). What are the plans for the future? ================================== diff --git a/log.txt b/log.txt deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/mula/docs/architecture.md b/mula/docs/architecture.md index 81f7d9fd1ca..33a9e05574b 100644 --- a/mula/docs/architecture.md +++ b/mula/docs/architecture.md @@ -2,26 +2,19 @@ ## Purpose -The _scheduler_ is tasked with populating and maintaining a priority queue of -items that are ranked, and can be popped off through HTTP API calls. -The scheduler is designed to be extensible, such that you're able to create -your own rules for the population, and prioritization of tasks. - -The _scheduler_ implements a priority queue for prioritization of tasks to be -performed by the worker(s). In the implementation of the scheduler within KAT -the scheduler is tasked with populating the priority queue with 'boefje' and -'normalizer' tasks. Additionally the scheduler is responsible for maintaining -and updating its internal priority queue. - -A priority queue is used, in as such, that it allows us to determine what tasks -should be picked up first, or more regularly. Because of the use of a priority -queue we can differentiate between tasks that are to be executed first, e.g. -tasks created by the user get precedence over tasks that are created by the -internal rescheduling processes within the scheduler. - -Calculations in order to determine the priority of a task are performed by the -`ranker`. The `ranker` can leverage information from multiple (external) -sources, called `connectors`. +The _scheduler_ is tasked with populating and maintaining a priority queues of +ranked tasks, and can be popped off through HTTP API calls. The scheduler is +designed to be extensible, such that you're able to create your own rules for +the population, scheduling, and prioritization of tasks. + +In the implementation of the scheduler within OpenKAT is tasked with +scheduling and populating the priority queues of 'boefje', 'normalizer' and +`report` tasks. + +Because of the use of a priority queue we can differentiate between tasks that +are to be executed first, e.g. tasks created by the user get precedence over +tasks that are created by the internal rescheduling processes within the +scheduler. In this document we will outline how the scheduler operates within KAT, how internal systems function and how external services use it. @@ -39,157 +32,212 @@ external services. In this overview arrows from external services indicate how and why those services communicate with the scheduler. The `Scheduler` system combines data from the `Octopoes`, `Katalogus`, `Bytes` and `RabbitMQ` systems. -![diagram001](./img/diagram001.svg) +External services used and for what purpose: + +- Octopoes; retrieval of ooi information + +- RabbitMQ; messaging queues to notify the scheduler of scan level changes + and the creation of raw files from bytes -- Octopoes +- Katalogus; retrieval of plugin and organization information -- RabbitMQ +- Bytes; retrieval of raw file information -- Katalogus +- Rocky; interfaces with the scheduler through its rest api -- Bytes +![scheduler_system.svg](./img/scheduler_system.svg) ### C3 Component level When we take a closer look at the `scheduler` system itself we can identify -several components. The 'Scheduler App' directs the creation and maintenance -of a multitude of schedulers. Typically in a KAT installation 2 scheduler will -be created per organisation: a _boefje scheduler_ and a _normalizer scheduler_. +several components. The `SchedulerApp` directs the creation and maintenance +of a multitude of schedulers. + +| Scheduler | Schedulers | +| :-------------------------------- | --------------------------------------: | +| ![scheduler](./img/scheduler.svg) | ![schedulers.svg](./img/schedulers.svg) | + +Typically in a OpenKAT installation 3 scheduler will be created per organisation: + +1. _boefje scheduler_ +2. _normalizer scheduler_ +3. _report scheduler_ -![diagram002](./img/diagram002.svg) +Each scheduler type implements it's own priority queue, and can implement it's +own processes of populating, and prioritization of its tasks. -Each scheduler type implements it's own priority queue, and can implement it's own -way of populating, and prioritization of its queue. The associated queues of an -individual scheduler is persisted in a SQL database. +![queue.svg](./img/queue.svg) -Interaction with the scheduler and access to the internals of the 'Scheduler -App' can be accessed by the `Server` which implements a HTTP REST API interface. +Interaction with the scheduler and access to the internals of the +`SchedulerApp` can be accessed by the `Server` which implements a HTTP REST API +interface. ## Dataflows Within a KAT implementation of the scheduler we can identify several dataflows of how tasks are created and pushed onto the priority queue. In the following -section we review how different dataflows, from the `boefjes` and the -`normalizers` schedulers are implemented within the `Scheduler` system. +section we review how different dataflows, from the `boefjes`, the +`normalizers`, `report` schedulers are implemented within the `Scheduler` +system. -### `BoefjeScheduler` +First let's explore the base classes from which the individual schedulers are +derived. -#### Design +### `Scheduler` -First, we will use the `BoefjeScheduler` as an example. A `BoefjeScheduler` is -tasked with creating tasks that are able to be picked up and processed by a -'Task Runner'. The scheduler creates a `BoefjeTask` to the specification that -the 'Task Runner' can interpret, namely in this instance of a `BoefjeTask`. +#### `PriorityQueue` -![diagram003](./img/diagram003.svg) +Every implementation of a `Scheduler` contains a `PriorityQueue` that is +responsible for maintaining a queue of tasks for `Task Runners` to pick up and +process. A `Scheduler` is responsible for creating `Task` objects and pushing +them onto the queue. -The scheduler wraps this `BoefjeTask` within a `PrioritizedItem` (`p_item`), -this is done such that we can push the task on the queue and add extra -information to this `PrioritizedItem`, like its priority. We uniquely identify -a task that is contained within the `PrioritizedItem` by its hash. +![tasks.svg](./img/tasks.svg) -![diagram004](./img/diagram004.svg) +The `PriorityQueue` derives its state from the state of the `Task` objects that +are persisted in the database. In other words, the current state of the +`PriorityQueue` are the `Task` objects with the status of `QUEUED`. -By doing this, it allows the scheduler to wrap whatever object within a -`PrioritizedItem`, and as a result we're able to create and extend more types -of schedulers that are not specifically bound to a type. Additionally this -allows us to persist its items to a database and makes it so that the queue can -be interchanged with a different technology if so desired. +#### `Task` -And we want to uniquely identify a task because we want to make sure that the -same tasks are not being pushed on the queue (de-duplication), or that they are -being rescheduled too quickly (grace-period). For example with a `BoefjeTask` -we unique identify a task by hashing the values of: the ooi, the boefje id, and -the organization id. So for a `PrioritizedItem` we know what specific -`BoefjeTask` it contains by this hash. +A `Task` object contains the following fields: -Before a `BoefjeTask` is wrapped by a `PrioritizedItem`, and pushed on the queue -we will check the following: +- `scheduler_id` - The id of the scheduler for which this task is created +- `schedule_id` - Optional, the id of the `Schedule` that created the task +- `priority` - The priority of the task +- `status` - The status of the task +- `type` - The type of the task +- `data` - A JSON object containing the task data +- `hash` - A unique hash generated by specific fields from the task data -- `is_task_allowed_to_run()` +Important to note is the `data` field contains the object that a `Task Runner` +will use to execute the task. This field is a JSON field that allows any object +to be persisted. It's schema is enforced by which scheduler its task is pushed +on. For a `BoefjeScheduler` only `BoefjeTask` objects are allowed to be +pushed. This is the same for the other schedulers. - - is boefje enabled - - are scan levels between boefje and ooi correct +By doing this, it allows the scheduler to wrap whatever object within a `Task`, +and as a result we're able to create and extend more types of schedulers that +are not specifically bound to a type. -- `is_task_running()` +This approach ensures that the historical record of each task's execution is +distinct, providing a clear and isolated view of each instance of the task's +lifecycle. This strategy enables maintaining accurate and unambiguous +monitoring and logging of task executions over time. Additionally it enables us +an overview and insights of what tasks have been created in the past and what +tasks are currently running. You might know this overview from Rocky as the +task list. - - is task still running according to the datastore (`TaskStore`)? - - is task still running according to Bytes? +To keep track of the status of this task throughout the system we update its +`status` -- `has_grace_period_passed()` +- When a `Task` has been created by the scheduler it will get the status of + `PENDING` (1) meaning a task has been created but it hasn't been queued yet. - - has the grace period passed according to the datastore (`TaskStore`)? - - has the grace period passed according to Bytes? +- When the `Task` is pushed onto the queue it will get the status of `QUEUED` (2). -- `is_task_stalled()` +- When the `Task Runner` picks up the task by popping the `Task` + from the queue the status will be updated to `DISPATCHED` (3). - - is task status still `DISPATCHED` for longer than the grace-period? +- The `Task Runner` is now able to start executing the `Task` and the status + will be updated to `RUNNING` (4) by the `Task Runner`. -- `is_item_on_queue_by_hash()` +- Whenever the task has been completed, the 'Task Runner' will update the + status by either setting the status to `COMPLETED`, `FAILED` or `CANCELLED`. + (5) - - check if the same task is already on the priority queue +#### `Schedule` -Important to note is that when a `BoefjeTask` is created and pushed onto the -queue as a `PrioritizedItem` a new unique `TaskRun` is generated.[^1] This -ensures that each task has its own dedicated `TaskRun` throughout its entire -lifecycle. This approach maintains a distinct record for each task, providing -an accurate and independent history of task statuses. This means that each -execution of a `BoefjeTask`, regardless of whether it's the same task being -repeated in the future, is tracked independently with its own unique `TaskRun`. +When a `Task` is created for a `Scheduler` it can be defined whether or not +that `Scheduler` can create `Schedule` objects for its `Task` objects. A +`Schedule` object is a way to define when a `Task` should be executed +automatically on a recurring schedule by the `Scheduler`. -This approach ensures that the historical record of each task's execution is -distinct, providing a clear and isolated view of each instance of the task's -lifecycle. This strategy enables maintaining accurate and unambiguous -monitoring and logging of task executions over time. Additionally it enables us -an overview and insights of what tasks have been created in the past and what -tasks are currently running. You might know this overview from Rocky as the -task list. +A `Schedule` will use the 'blueprint' that is defined in its `data` field (this +is the same as the `data` field of a `Task`) to generate a `Task` object to be +pushed on the queue of a `Scheduler`. + +![schedules.svg](./img/schedules.svg) + +A `Schedule` object contains the following fields: + +- `scheduler_id` - The id of the scheduler that created the schedule +- `schedule` - A cron expression that defines when the task should be + executed, this is used to update the value of `deadline_at` +- `deadline_at` - A timestamp that defines when the task should be executed +- `data` - A JSON object containing data for the schedule (this is the same as + the `data` field in the `Task` object) +- `hash` - A unique hash generated by specific fields from the schedule data + +A `Scheduler` can be extended by a process that checks if the `deadline_at` +of a `Schedule` has passed, and if so, creates a `Task` object for the +`Scheduler` to push onto the queue. + +When the `Task` object is pushed onto the queue, the new `deadline_at` value +of the `Schedule` is calculated using the cron expression defined in the +`schedule` field. -![diagram005](./img/diagram005.svg) +### `BoefjeScheduler` -Whenever a task is created in the scheduler it flows through the system, to -keep track of the status of this task throughout the system we update its -`TaskRun` reference. +#### Design -- When a `BoefjeTask` has been created by the scheduler it is packaged within - its `PrioritizedItem` and it will get the status of `PENDING` (1) meaning a - task has been created but it hasn't been queued yet. +A `BoefjeScheduler` is tasked with creating tasks that are able to be picked +up and processed by the "Boefje Runner". The `BoefjeScheduler` creates a +`BoefjeTask` to the specification that the "Boefje Runner" can interpret. -- When the `PrioritizedItem` is pushed onto the queue the `TaskRun` will get a - status update and will get the status of `QUEUED` (2). +The scheduler wraps this `BoefjeTask` within a `Task`, this is done such that we +can push the task on the queue and add extra information to this `Task`, like +its priority, its status, and its type. We uniquely identify a task that is +contained within the `Task` by its hash. -- When the 'Task Runner' picks up the task by popping the `PrioritizedItem` - from the queue the `TaskRun` status will be updated to `DISPATCHED` (3). +For example with a `BoefjeTask` we unique identify a task by hashing the values +of: the ooi, the boefje id, and the organization id. So for a `Task` we know +what specific `BoefjeTask` it contains by this hash. -- The 'Task Runner' is now able to start executing the `BoefjeTask` that was - contained in the `PrioritizedItem` and the status of the `TaskRun` will be - updated to `RUNNING` (4). +Before a `BoefjeTask` and pushed on the queue we will check the following: -- Whenever the task has been completed the 'Task Runner' will update the - `TaskRun` status by either setting the status to `COMPLETED`, `FAILED` or - `CANCELLED`. (5) +- `has_boefje_permission_to_run()` + + - is boefje enabled + - are scan levels between boefje and ooi correct + +- `has_boefje_task_grace_period_passed()` + + - has the grace period passed according to the datastore (`TaskStore`)? + - has the grace period passed according to Bytes? + +- `has_boefje_task_stalled()` + + - is the task status still `DISPATCHED` for longer than the grace-period? + +- `has_boefje_task_started_running()` + + - is task still running according to the datastore (`TaskStore`)? + - is task still running according to Bytes? + +- `is_item_on_queue_by_hash()` + + - check if the same task is already on the priority queue using the `hash` #### Processes +![boefje_scheduler.svg](./img/boefje_scheduler.svg) + In order to create a `BoefjeTask` and trigger the dataflow we described above -we have 4 different processes within a `BoefjeScheduler` that can create boefje -tasks. Namely: +we have 4 different processes running in threads within a `BoefjeScheduler` +that can create boefje tasks. Namely: 1. scan profile mutations 2. enabling of boefjes 3. rescheduling of prior tasks 4. manual scan job -![diagram006](./img/diagram006.svg) - ##### 1. Scan profile mutations -![diagram007](./img/diagram007.svg) - When a scan level is increased on an OOI (`schedulers.boefje.push_tasks_for_scan_profile_mutations`) a message is pushed on the RabbitMQ `{organization_id}__scan_profile_mutations` queue. The scheduler -continuously check if new messages are posted on the queue. The resulting tasks +continuously checks if new messages are posted on the queue. The resulting tasks from this process will get the second highest priority of 2 on the queue. The dataflow is as follows: @@ -198,22 +246,19 @@ The dataflow is as follows: profile mutation from the `RabbitMQ` system. - For the associated OOI of this scan profile mutation, the `Scheduler` system - will get the enabled boefjes for this OOI. (`tasks = ooi * boefjes`) + will get the enabled boefjes for this OOI. (`ooi * boefjes = tasks`) - For each enabled boefje, a `BoefjeTask` will be created and added to the `PriorityQueue` of the `BoefjeScheduler`. A `BoefjeTask` is an object with the correct specification for the task runner to execute a boefje. -- The `BoefjeScheduler` will then create a `PrioritizedItem` and push it to the +- The `BoefjeScheduler` will then create a `Task` and push it to the queue. The `PrioritizedItem` will contain the created `BoefjeTask`. -- A `TaskRun` reference to the task is created to keep track of the status - of the task (`post_push`). +- A `Schedule` is created, or updated for the `Task` (`post_push()`). ##### 2. Enabling of boefjes -![diagram008](./img/diagram008.svg) - When a plugin of type `boefje` is enabled or disabled in Rocky. The dataflow is triggered when the plugin cache of an organisation is flushed. @@ -228,46 +273,29 @@ The dataflow is as follows: - New `BoefjeTask` tasks will be created for enabled boefjes and on which type of ooi it can be used. -- The `BoefjeScheduler` will then create a `PrioritizedItem` and push it to the - queue. The `PrioritizedItem` will contain the created `BoefjeTask`. +- The `BoefjeScheduler` will then create a `Task` and push it to the queue. The + `Task` will contain the created `BoefjeTask`. -- A `TaskRun` reference to the task is created to keep track of the status - of the task (`post_push`). +- A `Schedule` is created, or updated for the `Task` (`post_push()`). ##### 3. Rescheduling of prior tasks -![diagram009](./img/diagram009.svg) - -In order to re-run tasks that have been executed in the past we try to create -new tasks on ooi's. We continuously get a batch of random ooi's from octopoes -(`schedulers.boefje.push_tasks_for_random_objects`). The `BoefjeTask` tasks -from these these ooi's (`tasks = ooi * boefjes`) will get the priority that has -been calculated by the ranker. - -At the moment a task will get the priority of 3, when 7 days have gone by (e.g. -how longer it _hasn't_ executed again, the higher the priority it will get). -For everything before those 7 days it will scale the priority appropriately. +In order to re-run tasks that have been executed in the past we reference +`Schedule` objects whose `deadline_at` has passed. The `BoefjeScheduler` will +create a `BoefjeTask` for the `Task` that is associated with the `Schedule`. The dataflow is as follows: -- From Octopoes we get `n` random ooi's (`get_random_objects`) - -- For each OOI, the `Scheduler` will get the enabled boefjes for this OOI. - (`tasks = ooi * boefjes`) +- From the database we get the `Schedule` objects whose `deadline_at` has passed. -- For each enabled boefje, a `BoefjeTask` will be created and added to the - `PriorityQueue` of the `BoefjeScheduler`. +- For each `Schedule` we create a new `Task` containing a `BoefjeTask` -- The `BoefjeScheduler` will then create a `PrioritizedItem` and push it to the - queue. The `PrioritizedItem` will contain the created `BoefjeTask`. +- The `BoefjeScheduler` will push it to the queue. -- A `TaskRun` reference to the task is created to keep track of the status - of the task (`post_push`). +- The `Schedule` is updated for the `Task` (`post_push()`). ##### 4. Manual scan job -![diagram010](./img/diagram010.svg) - Scan jobs created by the user in Rocky (`server.push_queue`), will get the highest priority of 1. Note, that this will circumvent all the checks that are present in the `BoefjeScheduler`. @@ -277,37 +305,39 @@ The dataflow is as follows: - Rocky will create a `BoefjeTask` that will be pushed directly to the specified queue. -- The `BoefjeScheduler` will then create a `PrioritizedItem` and push it to the - queue. The `PrioritizedItem` will contain the created `BoefjeTask`. +- The `BoefjeScheduler` will then create a `Task` and push it to the + queue. The `Task` will contain the created `BoefjeTask`. -- A `TaskRun` reference to the task is created to keep track of the status - of the task (`post_push`). +- A `Schedule` is created, or updated for the `Task` (`post_push()`). ### `NormalizerScheduler` #### Design The `NormalizerScheduler` is tasked with creating tasks that are able to be -picked up and processed by a 'Task Runner'. The scheduler creates a -`NormalizerTask` to the specification that the 'Task Runner' can interpret, +picked up and processed by a normalizer task runner. The scheduler creates a +`NormalizerTask` to the specification that the task runner can interpret, namely the instance of a `NormalizerTask`. -The of queueing and processing a `NormalizerTask` task is the same as for -the `BoefjeScheduler`. Reference that section for a more in-depth explanation. - -Before `NormalizerTask` is wrapped by a `PrioritizedItem`, and pushed to the +Before `NormalizerTask` is wrapped by a `Task`, and pushed to the queue we will check the following: -- `is_task_allowed_to_run()` +- `has_normalizer_permission_to_run()` - is the normalizer enabled -- `is_task_running()` +- `has_normalizer_task_started_running()` - is task still running according to the datastore (`TaskStore`)? +- `is_item_on_queue_by_hash()` + + - check if the same task is already on the priority queue using the `hash` + #### Processes +![normalizer_scheduler.svg](./img/normalizer_scheduler.svg) + The following processes within a `NormalizerScheduler` will create a `NormalizerTask` tasks: @@ -321,182 +351,41 @@ When a raw file is created (`schedulers.normalizer.create_tasks_for_raw_data`) from a message queue. - For every mime type of the raw file, the `NormalizerScheduler` will retrieve - the enabled normalizers for this mime type. (`create_tasks_for_raw_data()`) + the enabled normalizers for this mime type. - For every enabled normalizer, a `NormalizerTask` will be created and added to the `PriorityQueue` of the `NormalizerScheduler`. -## Class Diagram +### `ReportScheduler` -The following diagram we can explore the code level of the scheduler -application, and its class structure. +#### Design -The following describes the main components of the scheduler application: +The `ReportScheduler` is tasked with creating report tasks that are able to be +picked up and processed by the report task runner. -- `App` - The main application class, which is responsible for starting the - schedulers. It also contains the server, which is responsible for handling - the rest api requests. The `App` implements multiple `Scheduler` instances. - The `run()` method starts the schedulers, the listeners, the monitors, and - the server in threads. The `run()` method is the main thread of the - application. +#### Processes -- `Scheduler` - And implementation of a `Scheduler` class is responsible for - populating the queue with tasks. Contains a `PriorityQueue`. The `run()` - method starts executes threads and listeners, which fill up the queue with - tasks. +![report_scheduler.svg](./img/report_scheduler.svg) -- `PriorityQueue` - The queue class, which is responsible for storing the - tasks. +The `ReportScheduler` will create a `ReportTask` for the `Task` that is +associated with a `Schedule` object. -- `Server` - The server class, which is responsible for handling the HTTP - requests. +1. Manual creation of `Schedule` for `ReportTask` +2. Rescheduling of `ReportTask` based on `Schedule` objects -```mermaid -classDiagram - class App { - +AppContext ctx - +Dict[str, Scheduler] schedulers - +Server server - - monitor_organisations() - collect_metrics() - start_schedulers() - start_monitors() - start_collectors() - start_server() - run() - shutdown() - stop_threads() - } - - class Scheduler { - <> - +AppContext ctx - +PriorityQueue queue - +Dict[str, Listener] listeners - +Dict[str, ThreadRunner] threads - - run() - run_in_thread() - - push_items_to_queue() - push_item_to_queue_with_timeout() - push_item_to_queue() - post_push() - - pop_item_from_queue() - post_pop() - - enable() - disable() - stop() - stop_listeners() - stop_threads() - - is_enabled() - is_space_on_queue() - is_item_on_queue_by_hash() - - last_activity() - dict() - } - - class Server { - +AppContext ctx - +Dict[str, Scheduler] schedulers - - health() - metrics() - get_schedulers() - get_scheduler() - patch_scheduler() - list_tasks() - get_task() - patch_task() - get_task_stats() - get_queues() - get_queue() - pop_queue() - push_queue() - run() - } - - class PriorityQueue{ - <> - +PriorityQueueStore pq_store - pop() - push() - peek() - remove() - empty() - qsize() - full() - is_item_on_queue() - is_item_on_queue_by_hash() - get_p_item_by_identifier() - create_hash() - dict() - _is_valid_item() - } - - class PriorityQueueStore{ - +Datastore datastore - pop() - push() - peek() - update() - remove() - get() - empty() - qsize() - get_item_by_hash() - get_items_by_scheduler_id() - } - - class Listener { - listen() - } - - class ThreadRunner { - run_forever() - run_once() - run() - join() - stop() - } - - App --|> "many" Scheduler : Implements - App --|> "one" Server: Has - Scheduler --|> "1" PriorityQueue : Has - Scheduler --|> "many" ThreadRunner : Has - Scheduler --|> "many" Listener : Has - PriorityQueue --|> PriorityQueueStore: References -``` +##### 1. Manual creation of `Schedule` for `ReportTask` -## Database Entity Relationship Diagram - -```mermaid -erDiagram - items { - uuid id PK - character_varying scheduler_id - character_varying hash - integer priority - jsonb data - timestamp_with_time_zone created_at - timestamp_with_time_zone modified_at - } - - tasks { - uuid id PK - character_varying scheduler_id - taskstatus status - timestamp_with_time_zone created_at - timestamp_with_time_zone modified_at - jsonb p_item - character_varying type - } -``` +A user can create a "Report Recipe" within Rocky, and define a recurrence +schedule of this report to be executed. A `Schedule` is created for this +"Report Recipe" and posted to the `Scheduler` API. The `ReportScheduler` will +continuously check for `Schedule` object whose `deadline_at` has passed. + +##### 2. Rescheduling of `ReportTask` based on `Schedule` objects + +The `ReportScheduler` will create a `ReportTask` for the `Task` that is +associated with a `Schedule` object. The `ReportScheduler` will continuously +check for `Schedule` objects whose `deadline_at` has passed and will push the +`ReportTask` tasks to the queue. ## Project structure @@ -512,15 +401,19 @@ $ tree -L 3 --dirsfirst │   │   └── __init__.py │   ├── context/ # shared application context │   ├── models/ # internal model definitions -│   ├── queues/ # priority queue +│   ├── queues/ # priority queue definition │   ├── rankers/ # priority/score calculations │   ├── storage/ # data abstraction layer │   ├── schedulers/ # schedulers +│   │   ├── boefje.py # boefje scheduler implementation +│   │   ├── normalizer.py # normalizer scheduler implementation +│   │   ├── report.py # report scheduler implementation +│   │   └── scheduler.py # abstract base class for schedulers │   ├── server/ # http rest api server │   ├── utils/ # common utility functions │   ├── __init__.py │   ├── __main__.py -│   ├── app.py # kat scheduler app implementation +│   ├── app.py # openkat scheduler app implementation │   └── version.py # version information └─── tests/    ├── factories/ @@ -533,7 +426,22 @@ $ tree -L 3 --dirsfirst    └── __init__.py ``` -[^1]: - As of writing a `TaskRun` is known within the scheduler as a `Task`. In the - future the naming of this model will change to accurate describe its role - and functionality. +The following describes the main components of the scheduler application: + +- `App` - The main application class, which is responsible for starting the + schedulers. It also contains the server, which is responsible for handling + the rest api requests. The `App` implements multiple `Scheduler` instances. + The `run()` method starts the schedulers, the listeners, the monitors, and + the server in threads. The `run()` method is the main thread of the + application. + +- `Scheduler` - And implementation of a `Scheduler` class is responsible for + populating the queue with tasks. Contains a `PriorityQueue`. The `run()` + method starts executes threads and listeners, which fill up the queue with + tasks. + +- `PriorityQueue` - The queue class, which is responsible for storing the + tasks. + +- `Server` - The server class, which is responsible for handling the HTTP + requests. diff --git a/mula/docs/img/boefje_scheduler.svg b/mula/docs/img/boefje_scheduler.svg new file mode 100644 index 00000000000..9f854ad21bf --- /dev/null +++ b/mula/docs/img/boefje_scheduler.svg @@ -0,0 +1,4 @@ + + + +
BoefjeScheduler
queue
mutations
new boefjes
rescheduling
manual
diff --git a/mula/docs/img/diagram001.svg b/mula/docs/img/diagram001.svg deleted file mode 100644 index 2ccc3de4674..00000000000 --- a/mula/docs/img/diagram001.svg +++ /dev/null @@ -1,4 +0,0 @@ - - - -
  Scheduler App
  [software system]
 Task Runner
 [software system]
 Rocky
 [software system]
 Octopoes
 [graph database]
 RabbitMQ
 [message broker]
 Katalogus
 [software system]
 Bytes
 [software system]
PostgreSQL
diagram001
diff --git a/mula/docs/img/diagram002.svg b/mula/docs/img/diagram002.svg deleted file mode 100644 index 7254bd55862..00000000000 --- a/mula/docs/img/diagram002.svg +++ /dev/null @@ -1,4 +0,0 @@ - - - -
diagram002
 Schedulers
 Server
Priority Queue
 Scheduler
Scheduler App
[software system]
 Task Runner
 [software system]
 Rocky
 [software system]
TaskStore
[table]
PQStore
[table]
diff --git a/mula/docs/img/diagram003.svg b/mula/docs/img/diagram003.svg deleted file mode 100644 index 9491f3fba36..00000000000 --- a/mula/docs/img/diagram003.svg +++ /dev/null @@ -1,4 +0,0 @@ - - - -
diagram003
Priority Queue
 BoefjeScheduler
p_item
Push
p_item
 Task Runner
Pop
p_item
p_item
p_item
BoefjeTask
Scheduler App
diff --git a/mula/docs/img/diagram004.svg b/mula/docs/img/diagram004.svg deleted file mode 100644 index 59e958b196c..00000000000 --- a/mula/docs/img/diagram004.svg +++ /dev/null @@ -1,4 +0,0 @@ - - - -
diagram004
Priority Queue
 BoefjeScheduler
p_item
push
p_item
 Task Runner
pop
p_item
p_item
p_item
BoefjeTask
BoefjeTask
 PrioritizedItem
diff --git a/mula/docs/img/diagram005.svg b/mula/docs/img/diagram005.svg deleted file mode 100644 index 3f344511e0f..00000000000 --- a/mula/docs/img/diagram005.svg +++ /dev/null @@ -1,4 +0,0 @@ - - - -
diagram005
Priority Queue
 BoefjeScheduler
p_item
push
p_item
 Task Runner
pop
p_item
p_item
p_item
BoefjeTask
TaskRun (Task List)
id
scheduler_id
type
status
created_at
modified_at
p_item
1
boefje_org_1
boefje
COMPLETED
2024-01-01
abcd-0123
2
boefje_org_1
boefje
RUNNING
2024-01-05
abcd-0123
3
post_push
update
1
1
2
2
3
3
4
4
5
5
status: PENDING
status: QUEUED
status: DISPATCHED
status: RUNNING
status: COMPLETED / FAILED / CANCELLED
diff --git a/mula/docs/img/diagram006.svg b/mula/docs/img/diagram006.svg deleted file mode 100644 index acfb4572e4c..00000000000 --- a/mula/docs/img/diagram006.svg +++ /dev/null @@ -1,4 +0,0 @@ - - - -
diagram006
Priority Queue
 BoefjeScheduler
p_item
push
p_item
 Task Runner
pop
p_item
p_item
p_item
BoefjeTask
scan profile mutations
enabled boefjes
rescheduling
1
2
3
manual scan job
4
diff --git a/mula/docs/img/diagram007.svg b/mula/docs/img/diagram007.svg deleted file mode 100644 index 59b5244e1e1..00000000000 --- a/mula/docs/img/diagram007.svg +++ /dev/null @@ -1,4 +0,0 @@ - - - -
diagram007
Priority Queue
p_item
push
p_item
 Task Runner
pop
p_item
p_item
p_item
BoefjeTask
 BoefjeScheduler
scan profile mutations
subscribe
 RabbitMQ
 [message broker]
1
diff --git a/mula/docs/img/diagram008.svg b/mula/docs/img/diagram008.svg deleted file mode 100644 index a61b882fe6f..00000000000 --- a/mula/docs/img/diagram008.svg +++ /dev/null @@ -1,4 +0,0 @@ - - - -
diagram008
Priority Queue
p_item
push
p_item
 Task Runner
pop
p_item
p_item
p_item
BoefjeTask
 BoefjeScheduler
enabled boefjes
cached
 Katalogus
 [external software system]
2
diff --git a/mula/docs/img/diagram009.svg b/mula/docs/img/diagram009.svg deleted file mode 100644 index 2c4f7e39761..00000000000 --- a/mula/docs/img/diagram009.svg +++ /dev/null @@ -1,4 +0,0 @@ - - - -
diagram009
Priority Queue
p_item
push
p_item
 Task Runner
pop
p_item
p_item
p_item
BoefjeTask
 BoefjeScheduler
rescheduling
GET
 Octopoes
 [external software system]
3
diff --git a/mula/docs/img/diagram010.svg b/mula/docs/img/diagram010.svg deleted file mode 100644 index 917e3ca172a..00000000000 --- a/mula/docs/img/diagram010.svg +++ /dev/null @@ -1,4 +0,0 @@ - - - -
diagram010
Priority Queue
p_item
push
p_item
 Task Runner
pop
p_item
p_item
p_item
BoefjeTask
 BoefjeScheduler
manual scan job
POST
 Rocky
 [external software system]
4
diff --git a/mula/docs/img/normalizer_scheduler.svg b/mula/docs/img/normalizer_scheduler.svg new file mode 100644 index 00000000000..18b53d70fe8 --- /dev/null +++ b/mula/docs/img/normalizer_scheduler.svg @@ -0,0 +1,4 @@ + + + +
queue
raw data
received
NormalizerScheduler
diff --git a/mula/docs/img/queue.svg b/mula/docs/img/queue.svg new file mode 100644 index 00000000000..1f7fdbfcdee --- /dev/null +++ b/mula/docs/img/queue.svg @@ -0,0 +1,4 @@ + + + +
Scheduler
API
queue
(process)
diff --git a/mula/docs/img/report_scheduler.svg b/mula/docs/img/report_scheduler.svg new file mode 100644 index 00000000000..c6a78c79e97 --- /dev/null +++ b/mula/docs/img/report_scheduler.svg @@ -0,0 +1,4 @@ + + + +
queue
rescheduling
ReportScheduler
diff --git a/mula/docs/img/scheduler.svg b/mula/docs/img/scheduler.svg new file mode 100644 index 00000000000..87fc74ee30a --- /dev/null +++ b/mula/docs/img/scheduler.svg @@ -0,0 +1,4 @@ + + + +
Scheduler
API
SchedulerApp
diff --git a/mula/docs/img/scheduler_system.svg b/mula/docs/img/scheduler_system.svg new file mode 100644 index 00000000000..ac511569ad2 --- /dev/null +++ b/mula/docs/img/scheduler_system.svg @@ -0,0 +1,4 @@ + + + +
Scheduler System
API
SchedulerApp
Rocky
RabbitMQ
KAT-alogus
Octopoes
Bytes
Task Runners
diff --git a/mula/docs/img/schedulers.svg b/mula/docs/img/schedulers.svg new file mode 100644 index 00000000000..d804fc43df1 --- /dev/null +++ b/mula/docs/img/schedulers.svg @@ -0,0 +1,4 @@ + + + +
Scheduler
API
boefje-org1
boefje-org2
diff --git a/mula/docs/img/schedules.svg b/mula/docs/img/schedules.svg new file mode 100644 index 00000000000..2d4cf387854 --- /dev/null +++ b/mula/docs/img/schedules.svg @@ -0,0 +1,4 @@ + + + +
(process)
task
task
task
task
Schedule
Task
diff --git a/mula/docs/img/schematic-drawing.svg b/mula/docs/img/schematic-drawing.svg deleted file mode 100644 index 027054eba25..00000000000 --- a/mula/docs/img/schematic-drawing.svg +++ /dev/null @@ -1,4 +0,0 @@ - - - -
  Scheduler App
  [software system]
Scheduler App...
 Task Runner
 [software system]
Task Runner...
 Rocky
 [software system]
Rocky...
 Octopoes
 [graph database]
Octopoes...
 RabbitMQ
 [message broker]
RabbitMQ...
 Katalogus
 [software system]
Katalogus...
 Bytes
 [software system]
Bytes...
PostgreSQL
PostgreSQL
diagram001
diagram001
diagram002
diagram002
 Schedulers
 Schedulers
 Server
 Server
Priority Queue
Priority Queue
 Scheduler
 Scheduler
Scheduler App
[software system]
Scheduler App...
 Task Runner
 [software system]
Task Runner...
 Rocky
 [software system]
Rocky...
TaskStore
[table]
TaskStore...
PQStore
[table]
PQStore...
diagram010
diagram010
Priority Queue
Priority Queue
p_item
p_item
push
push
p_item
p_item
 Task Runner
 Task Runner
pop
pop
p_item
p_item
p_item
p_item
p_item
p_item
BoefjeTask
BoefjeTask
 BoefjeScheduler
 BoefjeScheduler
manual scan job
manual scan job
POST
POST
 Rocky
 [external software system]
Rocky...
4
4
diagram003
diagram003
Priority Queue
Priority Queue
 BoefjeScheduler
 BoefjeScheduler
p_item
p_item
Push
Push
p_item
p_item
 Task Runner
 Task Runner
Pop
Pop
p_item
p_item
p_item
p_item
p_item
p_item
BoefjeTask
BoefjeTask
Scheduler App
Scheduler App
diagram004
diagram004
Priority Queue
Priority Queue
 BoefjeScheduler
 BoefjeScheduler
p_item
p_item
push
push
p_item
p_item
 Task Runner
 Task Runner
pop
pop
p_item
p_item
p_item
p_item
p_item
p_item
BoefjeTask
BoefjeTask
BoefjeTask
BoefjeTask
 PrioritizedItem
 PrioritizedItem
diagram005
diagram005
Priority Queue
Priority Queue
 BoefjeScheduler
 BoefjeScheduler
p_item
p_item
push
push
p_item
p_item
 Task Runner
 Task Runner
pop
pop
p_item
p_item
p_item
p_item
p_item
p_item
BoefjeTask
BoefjeTask
TaskRun (Task List)
id
id
scheduler_id
scheduler_id
type
type
status
status
created_at
created_at
modified_at
modified_at
p_item
p_item
1
1
boefje_org_1
boefje_org_1
boefje
boefje
COMPLETED
COMPLETED
2024-01-01
2024-01-01
abcd-0123
abcd-0123
2
2
boefje_org_1
boefje_org_1
boefje
boefje
RUNNING
RUNNING
2024-01-05
2024-01-05
abcd-0123
abcd-0123
3
3
post_push
post_push
update
update
1
1
1
1
2
2
2
2
3
3
3
3
4
4
4
4
5
5
5
5
status: PENDING
status: PENDING
status: QUEUED
status: QUEUED
status: DISPATCHED
status: DISPATCHED
status: RUNNING
status: RUNNING
status: COMPLETED / FAILED / CANCELLED
status: COMPLETED / FAILED / CANCELLED
diagram006
diagram006
Priority Queue
Priority Queue
 BoefjeScheduler
 BoefjeScheduler
p_item
p_item
push
push
p_item
p_item
 Task Runner
 Task Runner
pop
pop
p_item
p_item
p_item
p_item
p_item
p_item
BoefjeTask
BoefjeTask
scan profile mutations
scan profile mutations
enabled boefjes
enabled boefjes
rescheduling
rescheduling
1
1
2
2
3
3
manual scan job
manual scan job
4
4
diagram007
diagram007
Priority Queue
Priority Queue
p_item
p_item
push
push
p_item
p_item
 Task Runner
 Task Runner
pop
pop
p_item
p_item
p_item
p_item
p_item
p_item
BoefjeTask
BoefjeTask
 BoefjeScheduler
 BoefjeScheduler
scan profile mutations
scan profile mutations
subscribe
subscribe
 RabbitMQ
 [message broker]
RabbitMQ...
1
1
diagram008
diagram008
Priority Queue
Priority Queue
p_item
p_item
push
push
p_item
p_item
 Task Runner
 Task Runner
pop
pop
p_item
p_item
p_item
p_item
p_item
p_item
BoefjeTask
BoefjeTask
 BoefjeScheduler
 BoefjeScheduler
enabled boefjes
enabled boefjes
cached
cached
 Katalogus
 [external software system]
Katalogus...
2
2
diagram009
diagram009
Priority Queue
Priority Queue
p_item
p_item
push
push
p_item
p_item
 Task Runner
 Task Runner
pop
pop
p_item
p_item
p_item
p_item
p_item
p_item
BoefjeTask
BoefjeTask
 BoefjeScheduler
 BoefjeScheduler
rescheduling
rescheduling
GET
GET
 Octopoes
 [external software system]
Octopoes...
3
3
Text is not SVG - cannot display
diff --git a/mula/docs/img/tasks.svg b/mula/docs/img/tasks.svg new file mode 100644 index 00000000000..0b686533366 --- /dev/null +++ b/mula/docs/img/tasks.svg @@ -0,0 +1,4 @@ + + + +
(process)
task
task
task
task
diff --git a/mula/scheduler/server/handlers/schedules.py b/mula/scheduler/server/handlers/schedules.py index 3f4a9070466..895a50c9b24 100644 --- a/mula/scheduler/server/handlers/schedules.py +++ b/mula/scheduler/server/handlers/schedules.py @@ -108,6 +108,9 @@ def list( return utils.paginate(request, results, count, offset, limit) def create(self, schedule: serializers.ScheduleCreate) -> Any: + if not (schedule.deadline_at or schedule.schedule): + raise BadRequestError("Either deadline_at or schedule must be provided") + try: new_schedule = models.Schedule(**schedule.model_dump()) except ValueError: diff --git a/mula/scheduler/server/serializers/schedule.py b/mula/scheduler/server/serializers/schedule.py index acbd1d7ab05..5e3c0a0bbb9 100644 --- a/mula/scheduler/server/serializers/schedule.py +++ b/mula/scheduler/server/serializers/schedule.py @@ -10,7 +10,7 @@ class ScheduleCreate(BaseModel): data: dict - schedule: str + schedule: str | None = None deadline_at: datetime | None = None diff --git a/mula/tests/integration/test_api.py b/mula/tests/integration/test_api.py index fa689eb1aee..a487f765dba 100644 --- a/mula/tests/integration/test_api.py +++ b/mula/tests/integration/test_api.py @@ -901,6 +901,31 @@ def test_post_schedule(self): datetime.fromisoformat(response.json().get("deadline_at")[:-1]).astimezone(timezone.utc), ) + def test_post_schedule_explicit_deadline_at(self): + """When a schedule is created, the deadline_at should be set if it is provided.""" + item = functions.create_item(self.scheduler.scheduler_id, 1) + now = datetime.now(timezone.utc) + response = self.client.post( + "/schedules", json={"scheduler_id": item.scheduler_id, "data": item.data, "deadline_at": now.isoformat()} + ) + self.assertEqual(201, response.status_code) + self.assertIsNone(response.json().get("schedule")) + self.assertEqual( + # NOTE: Remove Z from the end of the string. Until 3.11 + # datetime.fromisoformat does not accept Z at the end of the string + datetime.fromisoformat(response.json().get("deadline_at")[:-1]).astimezone(timezone.utc), + now, + ) + + def test_post_schedule_schedule_and_deadline_at_none(self): + """When a schedule is created, both schedule and deadline_at should not be None.""" + item = functions.create_item(self.scheduler.scheduler_id, 1) + response = self.client.post("/schedules", json={"scheduler_id": item.scheduler_id, "data": item.data}) + self.assertEqual(400, response.status_code) + self.assertEqual( + {"detail": "Bad request error occurred: Either deadline_at or schedule must be provided"}, response.json() + ) + def test_post_schedule_invalid_schedule(self): item = functions.create_item(self.scheduler.scheduler_id, 1) response = self.client.post( diff --git a/mula/tests/integration/test_schedule_store.py b/mula/tests/integration/test_schedule_store.py index e04adb26ee9..369af4c6101 100644 --- a/mula/tests/integration/test_schedule_store.py +++ b/mula/tests/integration/test_schedule_store.py @@ -52,12 +52,6 @@ def test_create_schedule_deadline_at_takes_precedence(self): self.assertEqual(schedule.deadline_at, now) - def test_create_schedule_not_provided_schedule(self): - """When a schedule is created, the deadline_at should be None if schedule is not provided.""" - schedule = models.Schedule(scheduler_id="test_scheduler_id", data={}) - - self.assertIsNone(schedule.deadline_at) - def test_create_schedule(self): # Arrange scheduler_id = "test_scheduler_id"