Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update scheduler folder structure #3883

Merged
merged 13 commits into from
Nov 27, 2024
78 changes: 46 additions & 32 deletions docs/source/developer-documentation/mula.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ The scheduler is responsible for scheduling the execution of tasks. The
execution of those tasks are being prioritized / scored by a ranker. The tasks
are then pushed onto a priority queue.

Within the project of KAT, the scheduler is tasked with scheduling boefje and
normalizer tasks.
Within the project of KAT, the scheduler is tasked with scheduling boefje,
normalizer, and report tasks.

## Architecture

Expand All @@ -20,12 +20,12 @@ rankers.

### Stack, packages and libraries

| Name | Version | Description |
| ---------- | -------- | ------------------- |
| Python | ^3.8 | |
| FastAPI | ^0.109.0 | Used for api server |
| SQLAlchemy | ^2.0.23 | |
| pydantic | ^2.5.2 | |
| Name | Version | Description |
| ---------- | --------- | ------------------- |
| Python | ^3.10 | |
| FastAPI | ^0.1115.2 | Used for api server |
| SQLAlchemy | ^2.0.23 | |
| pydantic | ^2.7.2 | |

### External services

Expand All @@ -41,36 +41,34 @@ The scheduler interfaces with the following services:
### Project structure

```
$ tree -L 3 --dirsfirst
.
├── docs/ # additional documentation
├── scheduler/ # scheduler python module
│   ├── config # application settings configuration
│   ├── connectors # external service connectors
│   │   ├── listeners # channel/socket listeners
│   │   ├── services # rest api connectors
│   │   └── __init__.py
│   ├── context/ # shared application context
│   ├── models/ # internal model definitions
│   ├── queues/ # priority queue
│   ├── rankers/ # priority/score calculations
│   ├── storage/ # data abstraction layer
│   ├── schedulers/ # schedulers
│   ├── server/ # http rest api server
│   ├── utils/ # common utility functions
│ ├── clients/ # external service clients
│ │ ├── amqp/ # rabbitmq clients
│ │ └── http/ # http clients
│ ├── context/ # shared application context, and configuration
│ ├── models/ # internal model definitions
│ ├── schedulers/ # schedulers
│ │ ├── queue/ # priority queue
│ │ ├── rankers/ # priority/score calculations
│ │ └── schedulers/ # schedulers implementations
│ ├── server/ # http rest api server
│ ├── storage/ # data abstraction layer
│ │ ├── migrations/ # database migrations
│ │ └── stores/ # data stores
│ ├── utils/ # common utility functions
│   ├── __init__.py
│   ├── __main__.py
│   ├── app.py # kat scheduler app implementation
│   ├── app.py # OpenKAT scheduler app implementation
│   └── version.py # version information
└─── tests/
   ├── factories/
   ├── integration/
   ├── mocks/
   ├── scripts/
   ├── simulation/
   ├── unit/
   ├── utils/
   └── __init__.py
└─── tests/ # test suite
   ├── factories/ # factories for test data
   ├── integration/ # integration tests
   ├── mocks/ # mocks for testing
   ├── simulation/ # simulation tests
   ├── unit/ # unit tests
   └── utils/ # utility functions for tests
```

## Running / Developing
Expand All @@ -93,6 +91,22 @@ scheduler. See the environment settings section under Installation and Deploymen
$ docker compose up --build -d scheduler
```

### Migrations

Creating a migration:

```
# Run migrations
make revid=0008 m="add_task_schedule" migrations
```

Sometimes it is helpful to run the migrations in a clean environment:

```
docker system prune
docker volume prune --force --all
```

## Testing

```
Expand Down
6 changes: 3 additions & 3 deletions mula/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ cov: ## Generate a test coverage report

sql: ## Generate raw sql for the migrations.
docker compose run --rm scheduler \
alembic --config /app/scheduler/scheduler/alembic.ini \
alembic --config /app/scheduler/scheduler/storage/migrations/alembic.ini \
upgrade $(rev1):$(rev2) --sql

migrations: ## Create migration.
Expand All @@ -84,14 +84,14 @@ else ifeq ($(revid),)
$(HIDE) (echo "ERROR: Specify a message with m={message} and a rev-id with revid={revid} (e.g. 0001 etc.)"; exit 1)
else
docker compose run --rm scheduler \
alembic --config /app/scheduler/scheduler/alembic.ini \
alembic --config /app/scheduler/scheduler/storage/migrations/alembic.ini \
revision --autogenerate \
-m "$(m)" --rev-id "$(revid)"
endif

migrate: ## Run migrations using alembic.
docker compose run scheduler \
alembic --config /app/scheduler/scheduler/alembic.ini \
alembic --config /app/scheduler/scheduler/storage/migrations/alembic.ini \
upgrade head

##
Expand Down
2 changes: 1 addition & 1 deletion mula/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -e
shopt -s nocasematch

if [ "$DATABASE_MIGRATION" = "1" ] || [[ $DATABASE_MIGRATION == "true" ]]; then
python -m alembic --config /app/scheduler/scheduler/alembic.ini upgrade head
python -m alembic --config /app/scheduler/scheduler/storage/migrations/alembic.ini upgrade head
fi

exec "$@"
9 changes: 4 additions & 5 deletions mula/scheduler/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import structlog
from opentelemetry import trace

from scheduler import context, schedulers, server
from scheduler.connectors.errors import ExternalServiceError
from scheduler import clients, context, schedulers, server
from scheduler.utils import thread

tracer = trace.get_tracer(__name__)
Expand Down Expand Up @@ -83,7 +82,7 @@ def monitor_organisations(self) -> None:
}
try:
orgs = self.ctx.services.katalogus.get_organisations()
except ExternalServiceError:
except clients.errors.ExternalServiceError:
self.logger.exception("Failed to get organisations from Katalogus")
return

Expand Down Expand Up @@ -117,7 +116,7 @@ def monitor_organisations(self) -> None:
for org_id in additions:
try:
org = self.ctx.services.katalogus.get_organisation(org_id)
except ExternalServiceError as e:
except clients.errors.ExternalServiceError as e:
self.logger.error("Failed to get organisation from Katalogus", error=e, org_id=org_id)
continue

Expand Down Expand Up @@ -166,7 +165,7 @@ def start_schedulers(self) -> None:
# Initialize the schedulers
try:
orgs = self.ctx.services.katalogus.get_organisations()
except ExternalServiceError as e:
except clients.errors.ExternalServiceError as e:
self.logger.error("Failed to get organisations from Katalogus", error=e)
return

Expand Down
6 changes: 6 additions & 0 deletions mula/scheduler/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .amqp.raw_data import RawData
from .amqp.scan_profile import ScanProfileMutation
from .http.external.bytes import Bytes
from .http.external.katalogus import Katalogus
from .http.external.octopoes import Octopoes
from .http.external.rocky import Rocky
2 changes: 2 additions & 0 deletions mula/scheduler/clients/http/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .client import HTTPClient
from .service import HTTPService
77 changes: 77 additions & 0 deletions mula/scheduler/clients/http/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import socket
import time
from collections.abc import Callable

import httpx
import structlog


class HTTPClient:
"""A class that provides methods to check if a host is available and healthy."""

def __init__(self):
self.logger = structlog.get_logger(self.__class__.__name__)

def is_host_available(self, hostname: str, port: int) -> bool:
"""Check if the host is available.

Args:
hostname: A string representing the hostname.
port: An integer representing the port number.

Returns:
A boolean
"""
try:
socket.create_connection((hostname, port))
return True
except OSError:
return False

def is_host_healthy(self, host: str, health_endpoint: str) -> bool:
"""Check if host is healthy by inspecting the host's health endpoint.

Args:
host: A string representing the hostname.
health_endpoint: A string representing the health endpoint.

Returns:
A boolean
"""
try:
url = f"{host}/{health_endpoint}"
response = httpx.get(url, timeout=5)
healthy = response.json().get("healthy")
return healthy
except httpx.HTTPError as exc:
self.logger.warning("Exception: %s", exc)
return False

def retry(self, func: Callable, *args, **kwargs) -> bool:
"""Retry a function until it returns True.

Args:
func: A python callable that needs to be retried.

Returns:
A boolean signifying whether or not the func was executed successfully.
"""
for i in range(10):
if func(*args, **kwargs):
self.logger.info(
"Function %s, executed successfully. Retry count: %d",
func.__name__,
i,
name=func.__name__,
args=args,
kwargs=kwargs,
)
return True

self.logger.warning(
"Function %s, failed. Retry count: %d", func.__name__, i, name=func.__name__, args=args, kwargs=kwargs
)

time.sleep(10)

return False
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@

import httpx

from scheduler.connectors.errors import ExternalServiceResponseError, exception_handler
from scheduler.clients.errors import ExternalServiceResponseError, exception_handler
from scheduler.clients.http import HTTPService
from scheduler.models import BoefjeMeta

from .services import HTTPService

ClientSessionMethod = Callable[..., Any]


Expand Down
Loading