diff --git a/.env-dist b/.env-dist index daca58ec3ff..3d38e57838e 100644 --- a/.env-dist +++ b/.env-dist @@ -65,9 +65,6 @@ BYTES_DB_URI=postgresql://${BYTES_DB_USER}:${BYTES_DB_PASSWORD}@postgres:5432/${ # --- Octopoes --- # # See `octopoes/octopoes/config/settings.py` -# Number of Celery workers (for the Octopoes API worker) that need to be started -CELERY_WORKER_CONCURRENCY=${CELERY_WORKER_CONCURRENCY:-4} - # --- Mula --- # # See `mula/scheduler/config/settings.py` diff --git a/boefjes/boefjes/dependencies/plugins.py b/boefjes/boefjes/dependencies/plugins.py index dea646e52b3..6f216c6fe78 100644 --- a/boefjes/boefjes/dependencies/plugins.py +++ b/boefjes/boefjes/dependencies/plugins.py @@ -17,11 +17,11 @@ from boefjes.storage.interfaces import ( ConfigStorage, DuplicatePlugin, - IntegrityError, NotFound, PluginNotFound, PluginStorage, SettingsNotConformingToSchema, + UniqueViolation, ) logger = structlog.get_logger(__name__) @@ -49,9 +49,9 @@ def get_all(self, organisation_id: str) -> list[PluginType]: return [self._set_plugin_enabled(plugin, organisation_id) for plugin in all_plugins.values()] def _get_all_without_enabled(self) -> dict[str, PluginType]: - all_plugins = {plugin.id: plugin for plugin in self.local_repo.get_all()} + all_plugins = {plugin.id: plugin for plugin in self.plugin_storage.get_all()} - for plugin in self.plugin_storage.get_all(): + for plugin in self.local_repo.get_all(): # Local plugins take precedence all_plugins[plugin.id] = plugin return all_plugins @@ -94,7 +94,7 @@ def clone_settings_to_organisation(self, from_organisation: str, to_organisation self.set_enabled_by_id(plugin_id, to_organisation, enabled=True) def upsert_settings(self, settings: dict, organisation_id: str, plugin_id: str): - self._assert_settings_match_schema(settings, plugin_id) + self._assert_settings_match_schema(settings, plugin_id, organisation_id) self._put_boefje(plugin_id) return self.config_storage.upsert(organisation_id, plugin_id, settings=settings) @@ -113,29 +113,25 @@ def create_boefje(self, boefje: Boefje) -> None: try: with self.plugin_storage as storage: storage.create_boefje(boefje) - except IntegrityError as error: - raise DuplicatePlugin(self._translate_duplicate_plugin(error.message)) + except UniqueViolation as error: + raise DuplicatePlugin(error.field) except KeyError: try: with self.plugin_storage as storage: storage.create_boefje(boefje) - except IntegrityError as error: - raise DuplicatePlugin(self._translate_duplicate_plugin(error.message)) - - def _translate_duplicate_plugin(self, error_message): - translations = {"boefje_plugin_id": "id", "boefje_name": "name"} - return next((value for key, value in translations.items() if key in error_message), None) + except UniqueViolation as error: + raise DuplicatePlugin(error.field) def create_normalizer(self, normalizer: Normalizer) -> None: try: self.local_repo.by_id(normalizer.id) - raise DuplicatePlugin("id") + raise DuplicatePlugin(field="id") except KeyError: try: plugin = self.local_repo.by_name(normalizer.name) if plugin.types == "normalizer": - raise DuplicatePlugin("name") + raise DuplicatePlugin(field="name") else: self.plugin_storage.create_normalizer(normalizer) except KeyError: @@ -177,12 +173,12 @@ def delete_settings(self, organisation_id: str, plugin_id: str): # We don't check the schema anymore because we can provide entries through the global environment as well def schema(self, plugin_id: str) -> dict | None: - try: - boefje = self.plugin_storage.boefje_by_id(plugin_id) + plugin = self._get_all_without_enabled().get(plugin_id) - return boefje.boefje_schema - except PluginNotFound: - return self.local_repo.schema(plugin_id) + if plugin is None or not isinstance(plugin, Boefje): + return None + + return plugin.boefje_schema def cover(self, plugin_id: str) -> Path: try: @@ -212,8 +208,8 @@ def set_enabled_by_id(self, plugin_id: str, organisation_id: str, enabled: bool) self.config_storage.upsert(organisation_id, plugin_id, enabled=enabled) - def _assert_settings_match_schema(self, all_settings: dict, plugin_id: str): - schema = self.schema(plugin_id) + def _assert_settings_match_schema(self, all_settings: dict, plugin_id: str, organisation_id: str): + schema = self.by_plugin_id(plugin_id, organisation_id).boefje_schema if schema: # No schema means that there is nothing to assert try: diff --git a/boefjes/boefjes/plugins/kat_answer_parser/normalize.py b/boefjes/boefjes/plugins/kat_answer_parser/normalize.py index acef37ad53f..8e4d817c471 100644 --- a/boefjes/boefjes/plugins/kat_answer_parser/normalize.py +++ b/boefjes/boefjes/plugins/kat_answer_parser/normalize.py @@ -10,4 +10,4 @@ def run(input_ooi: dict, raw: bytes) -> Iterable[NormalizerOutput]: bit_id = data["schema"].removeprefix("/bit/") - yield Config(ooi=input_ooi["primary_key"], bit_id=bit_id, config=data["answer"]) + yield Config(ooi=data["answer_ooi"], bit_id=bit_id, config=data["answer"]) diff --git a/boefjes/boefjes/plugins/kat_fierce/boefje.json b/boefjes/boefjes/plugins/kat_fierce/boefje.json index 9e1ab8182ef..9f15dbcb544 100644 --- a/boefjes/boefjes/plugins/kat_fierce/boefje.json +++ b/boefjes/boefjes/plugins/kat_fierce/boefje.json @@ -1,9 +1,9 @@ { "id": "fierce", "name": "Fierce", - "description": "Perform DNS reconnaissance using Fierce. Helps to locate non-contiguous IP space and hostnames against specified hostnames. No exploitation is performed.", + "description": "Perform DNS reconnaissance using Fierce. Helps to locate non-contiguous IP space and hostnames against specified hostnames. No exploitation is performed. Beware if your DNS is managed by an external party. This boefjes performs a brute force attack against the name server.", "consumes": [ "Hostname" ], - "scan_level": 1 + "scan_level": 3 } diff --git a/boefjes/boefjes/plugins/kat_kat_finding_types/kat_finding_types.json b/boefjes/boefjes/plugins/kat_kat_finding_types/kat_finding_types.json index a23c8d04f2f..2262f34d4be 100644 --- a/boefjes/boefjes/plugins/kat_kat_finding_types/kat_finding_types.json +++ b/boefjes/boefjes/plugins/kat_kat_finding_types/kat_finding_types.json @@ -132,6 +132,13 @@ "impact": "System administrator ports should only be reachable from safe and known locations to reduce attack surface.", "recommendation": "Determine if this port should be reachable from the identified location. Limit access to reduce the attack surface if necessary." }, + "KAT-REMOTE-DESKTOP-PORT": { + "description": "An open Microsoft Remote Desktop Protocol (RDP) port was detected.", + "source": "https://www.cloudflare.com/en-gb/learning/access-management/rdp-security-risks/", + "risk": "medium", + "impact": "Remote desktop ports are often the root cause in ransomware attacks, due to weak password usage, outdated software or insecure configurations.", + "recommendation": "Disable the Microsoft RDP service on port 3389 if this is publicly reachable. Add additional security layers, such as VPN access if these ports do require to be enabled to limit the attack surface." + }, "KAT-OPEN-DATABASE-PORT": { "description": "A database port is open.", "source": "https://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers", diff --git a/boefjes/boefjes/plugins/kat_shodan_internetdb/main.py b/boefjes/boefjes/plugins/kat_shodan_internetdb/main.py index 5c39e821581..281f89e0312 100644 --- a/boefjes/boefjes/plugins/kat_shodan_internetdb/main.py +++ b/boefjes/boefjes/plugins/kat_shodan_internetdb/main.py @@ -1,6 +1,6 @@ from ipaddress import ip_address -import requests +import httpx from boefjes.job_models import BoefjeMeta @@ -12,7 +12,8 @@ def run(boefje_meta: BoefjeMeta) -> list[tuple[set, bytes | str]]: ip = boefje_meta.arguments["input"]["address"] if ip_address(ip).is_private: return [({"info/boefje"}, "Skipping private IP address")] - response = requests.get(f"https://internetdb.shodan.io/{ip}", timeout=REQUEST_TIMEOUT) - response.raise_for_status() + response = httpx.get(f"https://internetdb.shodan.io/{ip}", timeout=REQUEST_TIMEOUT) + if response.status_code != httpx.codes.NOT_FOUND: + response.raise_for_status() return [(set(), response.content)] diff --git a/boefjes/boefjes/plugins/pdio_subfinder/boefje.json b/boefjes/boefjes/plugins/pdio_subfinder/boefje.json index fd69ae598c1..abb75748aa0 100644 --- a/boefjes/boefjes/plugins/pdio_subfinder/boefje.json +++ b/boefjes/boefjes/plugins/pdio_subfinder/boefje.json @@ -1,7 +1,7 @@ { "id": "pdio-subfinder", "name": "Subfinder", - "description": "A subdomain discovery tool. (projectdiscovery.io)", + "description": "A subdomain discovery tool. (projectdiscovery.io). Returns valid subdomains for websites using passive online sources. Beware that many of the online sources require their own API key to get more accurate data.", "consumes": [ "Hostname" ], @@ -9,5 +9,5 @@ "SUBFINDER_RATE_LIMIT", "SUBFINDER_VERSION" ], - "scan_level": 2 + "scan_level": 1 } diff --git a/boefjes/boefjes/sql/session.py b/boefjes/boefjes/sql/session.py index a48f238d410..3cffa901927 100644 --- a/boefjes/boefjes/sql/session.py +++ b/boefjes/boefjes/sql/session.py @@ -4,7 +4,7 @@ from sqlalchemy.orm import Session from typing_extensions import Self -from boefjes.storage.interfaces import IntegrityError, StorageError +from boefjes.storage.interfaces import IntegrityError, StorageError, UniqueViolation logger = structlog.get_logger(__name__) @@ -40,7 +40,7 @@ def __exit__(self, exc_type: type[Exception], exc_value: str, exc_traceback: str self.session.commit() except exc.IntegrityError as e: if isinstance(e.orig, errors.UniqueViolation): - raise IntegrityError(str(e.orig)) + raise UniqueViolation(str(e.orig)) raise IntegrityError("An integrity error occurred") from e except exc.DatabaseError as e: raise StorageError("A storage error occurred") from e diff --git a/boefjes/boefjes/storage/interfaces.py b/boefjes/boefjes/storage/interfaces.py index 24a7d77f9df..ccb9831b319 100644 --- a/boefjes/boefjes/storage/interfaces.py +++ b/boefjes/boefjes/storage/interfaces.py @@ -1,3 +1,4 @@ +import re from abc import ABC from boefjes.models import Boefje, Normalizer, Organisation, PluginType @@ -17,6 +18,20 @@ def __init__(self, message: str): self.message = message +class UniqueViolation(IntegrityError): + def __init__(self, message: str): + self.field = self._get_field_name(message) + self.message = message + + def _get_field_name(self, message: str) -> str | None: + matches = re.findall(r"Key \((.*)\)=", message) + + if matches: + return matches[0] + + return None + + class SettingsNotConformingToSchema(StorageError): def __init__(self, plugin_id: str, validation_error: str): super().__init__(f"Settings for plugin {plugin_id} are not conform the plugin schema: {validation_error}") @@ -56,8 +71,8 @@ def __init__(self, plugin_id: str): class DuplicatePlugin(NotAllowed): - def __init__(self, key: str): - super().__init__(f"Duplicate plugin {key}") + def __init__(self, field: str | None): + super().__init__(f"Duplicate plugin: a plugin with this {field} already exists") class OrganisationStorage(ABC): diff --git a/boefjes/tests/integration/test_api.py b/boefjes/tests/integration/test_api.py index 9596578a5ee..327274ce937 100644 --- a/boefjes/tests/integration/test_api.py +++ b/boefjes/tests/integration/test_api.py @@ -45,12 +45,12 @@ def test_cannot_add_plugin_reserved_id(test_client, organisation): boefje = Boefje(id="dns-records", name="My test boefje", static=False) response = test_client.post(f"/v1/organisations/{organisation.id}/plugins", content=boefje.model_dump_json()) assert response.status_code == 400 - assert response.json() == {"detail": "Duplicate plugin id"} + assert response.json() == {"detail": "Duplicate plugin: a plugin with this id already exists"} normalizer = Normalizer(id="kat_nmap_normalize", name="My test normalizer") response = test_client.post(f"/v1/organisations/{organisation.id}/plugins", content=normalizer.model_dump_json()) assert response.status_code == 400 - assert response.json() == {"detail": "Duplicate plugin id"} + assert response.json() == {"detail": "Duplicate plugin: a plugin with this id already exists"} def test_add_boefje(test_client, organisation): @@ -80,7 +80,7 @@ def test_cannot_add_static_plugin_with_duplicate_name(test_client, organisation) boefje = Boefje(id="test_plugin", name="DNS records", static=False) response = test_client.post(f"/v1/organisations/{organisation.id}/plugins", content=boefje.model_dump_json()) assert response.status_code == 400 - assert response.json() == {"detail": "Duplicate plugin name"} + assert response.json() == {"detail": "Duplicate plugin: a plugin with this name already exists"} def test_cannot_add_plugin_with_duplicate_name(test_client, organisation): @@ -91,7 +91,7 @@ def test_cannot_add_plugin_with_duplicate_name(test_client, organisation): boefje = Boefje(id="test_plugin_2", name="My test boefje", static=False) response = test_client.post(f"/v1/organisations/{organisation.id}/plugins", content=boefje.model_dump_json()) assert response.status_code == 400 - assert response.json() == {"detail": "Duplicate plugin name"} + assert response.json() == {"detail": "Duplicate plugin: a plugin with this name already exists"} normalizer = Normalizer(id="test_normalizer", name="My test normalizer", static=False) response = test_client.post(f"/v1/organisations/{organisation.id}/plugins", content=normalizer.model_dump_json()) @@ -169,6 +169,16 @@ def test_cannot_create_boefje_with_invalid_schema(test_client, organisation): assert r.status_code == 422 +def test_schema_is_taken_from_disk(test_client, organisation, session): + # creates a database record of dns-records + test_client.patch(f"/v1/organisations/{organisation.id}/plugins/dns-records", json={"enabled": True}) + session.execute("UPDATE boefje set schema = null where plugin_id = 'dns-records'") + session.commit() + + response = test_client.get(f"/v1/organisations/{organisation.id}/plugins/dns-records").json() + assert response["boefje_schema"] is not None + + def test_cannot_set_invalid_cron(test_client, organisation): boefje = Boefje(id="test_plugin", name="My test boefje", description="123").model_dump(mode="json") boefje["cron"] = "bad format" diff --git a/boefjes/tests/plugins/test_answer_parser.py b/boefjes/tests/plugins/test_answer_parser.py index 9762a2683ad..74e98fdb459 100644 --- a/boefjes/tests/plugins/test_answer_parser.py +++ b/boefjes/tests/plugins/test_answer_parser.py @@ -13,10 +13,10 @@ def test_config_yielded(normalizer_runner): normalizer_runner.run(meta, bytes(raw, "UTF-8")) with pytest.raises(ValidationError): - raw = '{"schema": "/bit/port-classification-ip", "answer": [{"key": "test"}]}' + raw = '{"schema": "/bit/port-classification-ip", "answer": [{"key": "test"}], "answer_ooi": "Network|internet"}' normalizer_runner.run(meta, bytes(raw, "UTF-8")) - raw = '{"schema": "/bit/port-classification-ip", "answer": {"key": "test"}}' + raw = '{"schema": "/bit/port-classification-ip", "answer": {"key": "test"}, "answer_ooi": "Network|internet"}' output = normalizer_runner.run(meta, bytes(raw, "UTF-8")) assert len(output.observations) == 1 diff --git a/docker-compose.yml b/docker-compose.yml index 7ad1333c859..d56efc4baa0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,6 +23,9 @@ services: test: ["CMD", "gosu", "postgres", "pg_isready"] interval: 10s retries: 10 + # Django runserver does not limit the number of threads. We need to increase + # the maximum number of connection to make sure that we don't hit the limit. + command: -c max_connections=500 volumes: - postgres-data:/var/lib/postgresql/data - ./init-user-db.sh:/docker-entrypoint-initdb.d/init-user-db.sh diff --git a/docs/source/developer-documentation/mula.md b/docs/source/developer-documentation/mula.md index 9005e197a52..492caebc825 100644 --- a/docs/source/developer-documentation/mula.md +++ b/docs/source/developer-documentation/mula.md @@ -6,8 +6,8 @@ The scheduler is responsible for scheduling the execution of tasks. The execution of those tasks are being prioritized / scored by a ranker. The tasks are then pushed onto a priority queue. -Within the project of KAT, the scheduler is tasked with scheduling boefje and -normalizer tasks. +Within the project of KAT, the scheduler is tasked with scheduling boefje, +normalizer, and report tasks. ## Architecture @@ -20,12 +20,12 @@ rankers. ### Stack, packages and libraries -| Name | Version | Description | -| ---------- | -------- | ------------------- | -| Python | ^3.8 | | -| FastAPI | ^0.109.0 | Used for api server | -| SQLAlchemy | ^2.0.23 | | -| pydantic | ^2.5.2 | | +| Name | Version | Description | +| ---------- | --------- | ------------------- | +| Python | ^3.10 | | +| FastAPI | ^0.1115.2 | Used for api server | +| SQLAlchemy | ^2.0.23 | | +| pydantic | ^2.7.2 | | ### External services @@ -41,36 +41,34 @@ The scheduler interfaces with the following services: ### Project structure ``` -$ tree -L 3 --dirsfirst . ├── docs/ # additional documentation ├── scheduler/ # scheduler python module -│ ├── config # application settings configuration -│ ├── connectors # external service connectors -│ │ ├── listeners # channel/socket listeners -│ │ ├── services # rest api connectors -│ │ └── __init__.py -│ ├── context/ # shared application context -│ ├── models/ # internal model definitions -│ ├── queues/ # priority queue -│ ├── rankers/ # priority/score calculations -│ ├── storage/ # data abstraction layer -│ ├── schedulers/ # schedulers -│ ├── server/ # http rest api server -│ ├── utils/ # common utility functions +│ ├── clients/ # external service clients +│ │ ├── amqp/ # rabbitmq clients +│ │ └── http/ # http clients +│ ├── context/ # shared application context, and configuration +│ ├── models/ # internal model definitions +│ ├── schedulers/ # schedulers +│ │ ├── queue/ # priority queue +│ │ ├── rankers/ # priority/score calculations +│ │ └── schedulers/ # schedulers implementations +│ ├── server/ # http rest api server +│ ├── storage/ # data abstraction layer +│ │ ├── migrations/ # database migrations +│ │ └── stores/ # data stores +│ ├── utils/ # common utility functions │ ├── __init__.py │ ├── __main__.py -│ ├── app.py # kat scheduler app implementation +│ ├── app.py # OpenKAT scheduler app implementation │ └── version.py # version information -└─── tests/ - ├── factories/ - ├── integration/ - ├── mocks/ - ├── scripts/ - ├── simulation/ - ├── unit/ - ├── utils/ - └── __init__.py +└─── tests/ # test suite + ├── factories/ # factories for test data + ├── integration/ # integration tests + ├── mocks/ # mocks for testing + ├── simulation/ # simulation tests + ├── unit/ # unit tests + └── utils/ # utility functions for tests ``` ## Running / Developing @@ -93,6 +91,22 @@ scheduler. See the environment settings section under Installation and Deploymen $ docker compose up --build -d scheduler ``` +### Migrations + +Creating a migration: + +``` +# Run migrations +make revid=0008 m="add_task_schedule" migrations +``` + +Sometimes it is helpful to run the migrations in a clean environment: + +``` +docker system prune +docker volume prune --force --all +``` + ## Testing ``` diff --git a/mula/Makefile b/mula/Makefile index c152665e536..7d9724afed5 100644 --- a/mula/Makefile +++ b/mula/Makefile @@ -74,7 +74,7 @@ cov: ## Generate a test coverage report sql: ## Generate raw sql for the migrations. docker compose run --rm scheduler \ - alembic --config /app/scheduler/scheduler/alembic.ini \ + alembic --config /app/scheduler/scheduler/storage/migrations/alembic.ini \ upgrade $(rev1):$(rev2) --sql migrations: ## Create migration. @@ -84,14 +84,14 @@ else ifeq ($(revid),) $(HIDE) (echo "ERROR: Specify a message with m={message} and a rev-id with revid={revid} (e.g. 0001 etc.)"; exit 1) else docker compose run --rm scheduler \ - alembic --config /app/scheduler/scheduler/alembic.ini \ + alembic --config /app/scheduler/scheduler/storage/migrations/alembic.ini \ revision --autogenerate \ -m "$(m)" --rev-id "$(revid)" endif migrate: ## Run migrations using alembic. docker compose run scheduler \ - alembic --config /app/scheduler/scheduler/alembic.ini \ + alembic --config /app/scheduler/scheduler/storage/migrations/alembic.ini \ upgrade head ## diff --git a/mula/entrypoint.sh b/mula/entrypoint.sh index 7e52612a54a..cdaf19c6b8c 100755 --- a/mula/entrypoint.sh +++ b/mula/entrypoint.sh @@ -5,7 +5,7 @@ set -e shopt -s nocasematch if [ "$DATABASE_MIGRATION" = "1" ] || [[ $DATABASE_MIGRATION == "true" ]]; then - python -m alembic --config /app/scheduler/scheduler/alembic.ini upgrade head + python -m alembic --config /app/scheduler/scheduler/storage/migrations/alembic.ini upgrade head fi exec "$@" diff --git a/mula/scheduler/app.py b/mula/scheduler/app.py index 58a6e0890ac..d8770730762 100644 --- a/mula/scheduler/app.py +++ b/mula/scheduler/app.py @@ -4,8 +4,7 @@ import structlog from opentelemetry import trace -from scheduler import context, schedulers, server -from scheduler.connectors.errors import ExternalServiceError +from scheduler import clients, context, schedulers, server from scheduler.utils import thread tracer = trace.get_tracer(__name__) @@ -83,7 +82,7 @@ def monitor_organisations(self) -> None: } try: orgs = self.ctx.services.katalogus.get_organisations() - except ExternalServiceError: + except clients.errors.ExternalServiceError: self.logger.exception("Failed to get organisations from Katalogus") return @@ -117,7 +116,7 @@ def monitor_organisations(self) -> None: for org_id in additions: try: org = self.ctx.services.katalogus.get_organisation(org_id) - except ExternalServiceError as e: + except clients.errors.ExternalServiceError as e: self.logger.error("Failed to get organisation from Katalogus", error=e, org_id=org_id) continue @@ -166,7 +165,7 @@ def start_schedulers(self) -> None: # Initialize the schedulers try: orgs = self.ctx.services.katalogus.get_organisations() - except ExternalServiceError as e: + except clients.errors.ExternalServiceError as e: self.logger.error("Failed to get organisations from Katalogus", error=e) return diff --git a/mula/scheduler/clients/__init__.py b/mula/scheduler/clients/__init__.py new file mode 100644 index 00000000000..0a306fe0e41 --- /dev/null +++ b/mula/scheduler/clients/__init__.py @@ -0,0 +1,6 @@ +from .amqp.raw_data import RawData +from .amqp.scan_profile import ScanProfileMutation +from .http.external.bytes import Bytes +from .http.external.katalogus import Katalogus +from .http.external.octopoes import Octopoes +from .http.external.rocky import Rocky diff --git a/mula/scheduler/connectors/listeners/__init__.py b/mula/scheduler/clients/amqp/__init__.py similarity index 100% rename from mula/scheduler/connectors/listeners/__init__.py rename to mula/scheduler/clients/amqp/__init__.py diff --git a/mula/scheduler/connectors/listeners/listeners.py b/mula/scheduler/clients/amqp/listeners.py similarity index 100% rename from mula/scheduler/connectors/listeners/listeners.py rename to mula/scheduler/clients/amqp/listeners.py diff --git a/mula/scheduler/connectors/listeners/raw_data.py b/mula/scheduler/clients/amqp/raw_data.py similarity index 100% rename from mula/scheduler/connectors/listeners/raw_data.py rename to mula/scheduler/clients/amqp/raw_data.py diff --git a/mula/scheduler/connectors/listeners/scan_profile.py b/mula/scheduler/clients/amqp/scan_profile.py similarity index 100% rename from mula/scheduler/connectors/listeners/scan_profile.py rename to mula/scheduler/clients/amqp/scan_profile.py diff --git a/mula/scheduler/connectors/connector.py b/mula/scheduler/clients/connector.py similarity index 100% rename from mula/scheduler/connectors/connector.py rename to mula/scheduler/clients/connector.py diff --git a/mula/scheduler/connectors/errors.py b/mula/scheduler/clients/errors.py similarity index 100% rename from mula/scheduler/connectors/errors.py rename to mula/scheduler/clients/errors.py diff --git a/mula/scheduler/clients/http/__init__.py b/mula/scheduler/clients/http/__init__.py new file mode 100644 index 00000000000..52343f6deae --- /dev/null +++ b/mula/scheduler/clients/http/__init__.py @@ -0,0 +1,2 @@ +from .client import HTTPClient +from .service import HTTPService diff --git a/mula/scheduler/clients/http/client.py b/mula/scheduler/clients/http/client.py new file mode 100644 index 00000000000..24202afb211 --- /dev/null +++ b/mula/scheduler/clients/http/client.py @@ -0,0 +1,77 @@ +import socket +import time +from collections.abc import Callable + +import httpx +import structlog + + +class HTTPClient: + """A class that provides methods to check if a host is available and healthy.""" + + def __init__(self): + self.logger = structlog.get_logger(self.__class__.__name__) + + def is_host_available(self, hostname: str, port: int) -> bool: + """Check if the host is available. + + Args: + hostname: A string representing the hostname. + port: An integer representing the port number. + + Returns: + A boolean + """ + try: + socket.create_connection((hostname, port)) + return True + except OSError: + return False + + def is_host_healthy(self, host: str, health_endpoint: str) -> bool: + """Check if host is healthy by inspecting the host's health endpoint. + + Args: + host: A string representing the hostname. + health_endpoint: A string representing the health endpoint. + + Returns: + A boolean + """ + try: + url = f"{host}/{health_endpoint}" + response = httpx.get(url, timeout=5) + healthy = response.json().get("healthy") + return healthy + except httpx.HTTPError as exc: + self.logger.warning("Exception: %s", exc) + return False + + def retry(self, func: Callable, *args, **kwargs) -> bool: + """Retry a function until it returns True. + + Args: + func: A python callable that needs to be retried. + + Returns: + A boolean signifying whether or not the func was executed successfully. + """ + for i in range(10): + if func(*args, **kwargs): + self.logger.info( + "Function %s, executed successfully. Retry count: %d", + func.__name__, + i, + name=func.__name__, + args=args, + kwargs=kwargs, + ) + return True + + self.logger.warning( + "Function %s, failed. Retry count: %d", func.__name__, i, name=func.__name__, args=args, kwargs=kwargs + ) + + time.sleep(10) + + return False diff --git a/mula/scheduler/alembic/__init__.py b/mula/scheduler/clients/http/external/__init__.py similarity index 100% rename from mula/scheduler/alembic/__init__.py rename to mula/scheduler/clients/http/external/__init__.py diff --git a/mula/scheduler/connectors/services/bytes.py b/mula/scheduler/clients/http/external/bytes.py similarity index 96% rename from mula/scheduler/connectors/services/bytes.py rename to mula/scheduler/clients/http/external/bytes.py index 86a5e434433..bfceab892a8 100644 --- a/mula/scheduler/connectors/services/bytes.py +++ b/mula/scheduler/clients/http/external/bytes.py @@ -6,11 +6,10 @@ import httpx -from scheduler.connectors.errors import ExternalServiceResponseError, exception_handler +from scheduler.clients.errors import ExternalServiceResponseError, exception_handler +from scheduler.clients.http import HTTPService from scheduler.models import BoefjeMeta -from .services import HTTPService - ClientSessionMethod = Callable[..., Any] diff --git a/mula/scheduler/connectors/services/katalogus.py b/mula/scheduler/clients/http/external/katalogus.py similarity index 99% rename from mula/scheduler/connectors/services/katalogus.py rename to mula/scheduler/clients/http/external/katalogus.py index 8a44543a326..ba174259de5 100644 --- a/mula/scheduler/connectors/services/katalogus.py +++ b/mula/scheduler/clients/http/external/katalogus.py @@ -2,12 +2,11 @@ import httpx -from scheduler.connectors.errors import exception_handler +from scheduler.clients.errors import exception_handler +from scheduler.clients.http import HTTPService from scheduler.models import Boefje, Organisation, Plugin from scheduler.utils import dict_utils -from .services import HTTPService - class Katalogus(HTTPService): """A class that provides methods to interact with the Katalogus API.""" diff --git a/mula/scheduler/connectors/services/octopoes.py b/mula/scheduler/clients/http/external/octopoes.py similarity index 97% rename from mula/scheduler/connectors/services/octopoes.py rename to mula/scheduler/clients/http/external/octopoes.py index 4632f8414a9..6d3538b0e46 100644 --- a/mula/scheduler/connectors/services/octopoes.py +++ b/mula/scheduler/clients/http/external/octopoes.py @@ -3,11 +3,10 @@ import httpx from pydantic import BaseModel -from scheduler.connectors.errors import exception_handler +from scheduler.clients.errors import exception_handler +from scheduler.clients.http import HTTPService from scheduler.models import OOI, Organisation -from .services import HTTPService - class ListObjectsResponse(BaseModel): count: int diff --git a/mula/scheduler/connectors/services/rocky.py b/mula/scheduler/clients/http/external/rocky.py similarity index 50% rename from mula/scheduler/connectors/services/rocky.py rename to mula/scheduler/clients/http/external/rocky.py index b4c0fea8df1..558882e2791 100644 --- a/mula/scheduler/connectors/services/rocky.py +++ b/mula/scheduler/clients/http/external/rocky.py @@ -1,4 +1,4 @@ -from .services import HTTPService +from scheduler.clients.http import HTTPService class Rocky(HTTPService): diff --git a/mula/scheduler/connectors/services/services.py b/mula/scheduler/clients/http/service.py similarity index 100% rename from mula/scheduler/connectors/services/services.py rename to mula/scheduler/clients/http/service.py diff --git a/mula/scheduler/config/__init__.py b/mula/scheduler/config/__init__.py index e69de29bb2d..906791fa29d 100644 --- a/mula/scheduler/config/__init__.py +++ b/mula/scheduler/config/__init__.py @@ -0,0 +1 @@ +from .settings import Settings diff --git a/mula/scheduler/connectors/services/__init__.py b/mula/scheduler/connectors/services/__init__.py deleted file mode 100644 index 2082f2c95e6..00000000000 --- a/mula/scheduler/connectors/services/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .bytes import Bytes -from .katalogus import Katalogus -from .octopoes import Octopoes -from .rocky import Rocky -from .services import HTTPService diff --git a/mula/scheduler/context/context.py b/mula/scheduler/context/context.py index 6fb5ea80105..00b4d8f16f4 100644 --- a/mula/scheduler/context/context.py +++ b/mula/scheduler/context/context.py @@ -7,9 +7,9 @@ from prometheus_client import CollectorRegistry, Gauge, Info import scheduler -from scheduler import storage +from scheduler import clients, storage from scheduler.config import settings -from scheduler.connectors import services +from scheduler.storage import stores from scheduler.utils import remove_trailing_slash @@ -116,7 +116,7 @@ def __init__(self) -> None: self.logger: structlog.BoundLogger = structlog.get_logger(__name__) # Services - katalogus_service = services.Katalogus( + katalogus_service = clients.Katalogus( host=remove_trailing_slash(str(self.config.host_katalogus)), source=f"scheduler/{scheduler.__version__}", timeout=self.config.katalogus_request_timeout, @@ -124,7 +124,7 @@ def __init__(self) -> None: cache_ttl=self.config.katalogus_cache_ttl, ) - bytes_service = services.Bytes( + bytes_service = clients.Bytes( host=remove_trailing_slash(str(self.config.host_bytes)), source=f"scheduler/{scheduler.__version__}", user=self.config.host_bytes_user, @@ -133,7 +133,7 @@ def __init__(self) -> None: pool_connections=self.config.bytes_pool_connections, ) - octopoes_service = services.Octopoes( + octopoes_service = clients.Octopoes( host=remove_trailing_slash(str(self.config.host_octopoes)), source=f"scheduler/{scheduler.__version__}", timeout=self.config.octopoes_request_timeout, @@ -145,9 +145,9 @@ def __init__(self) -> None: # notation self.services: SimpleNamespace = SimpleNamespace( **{ - services.Katalogus.name: katalogus_service, - services.Octopoes.name: octopoes_service, - services.Bytes.name: bytes_service, + clients.Katalogus.name: katalogus_service, + clients.Octopoes.name: octopoes_service, + clients.Bytes.name: bytes_service, } ) @@ -165,9 +165,9 @@ def __init__(self) -> None: # Datastores, SimpleNamespace allows us to use dot notation self.datastores: SimpleNamespace = SimpleNamespace( **{ - storage.ScheduleStore.name: storage.ScheduleStore(dbconn), - storage.TaskStore.name: storage.TaskStore(dbconn), - storage.PriorityQueueStore.name: storage.PriorityQueueStore(dbconn), + stores.ScheduleStore.name: stores.ScheduleStore(dbconn), + stores.TaskStore.name: stores.TaskStore(dbconn), + stores.PriorityQueueStore.name: stores.PriorityQueueStore(dbconn), } ) diff --git a/mula/scheduler/schedulers/__init__.py b/mula/scheduler/schedulers/__init__.py index 4c82914aee5..0ea877f4541 100644 --- a/mula/scheduler/schedulers/__init__.py +++ b/mula/scheduler/schedulers/__init__.py @@ -1,4 +1,2 @@ -from .boefje import BoefjeScheduler -from .normalizer import NormalizerScheduler -from .report import ReportScheduler from .scheduler import Scheduler +from .schedulers import BoefjeScheduler, NormalizerScheduler, ReportScheduler diff --git a/mula/scheduler/queues/__init__.py b/mula/scheduler/schedulers/queue/__init__.py similarity index 100% rename from mula/scheduler/queues/__init__.py rename to mula/scheduler/schedulers/queue/__init__.py diff --git a/mula/scheduler/queues/errors.py b/mula/scheduler/schedulers/queue/errors.py similarity index 100% rename from mula/scheduler/queues/errors.py rename to mula/scheduler/schedulers/queue/errors.py diff --git a/mula/scheduler/queues/pq.py b/mula/scheduler/schedulers/queue/pq.py similarity index 98% rename from mula/scheduler/queues/pq.py rename to mula/scheduler/schedulers/queue/pq.py index 7cb160c7c2e..4a1914451a6 100644 --- a/mula/scheduler/queues/pq.py +++ b/mula/scheduler/schedulers/queue/pq.py @@ -58,7 +58,7 @@ def __init__( pq_id: str, maxsize: int, item_type: Any, - pq_store: storage.PriorityQueueStore, + pq_store: storage.stores.PriorityQueueStore, allow_replace: bool = False, allow_updates: bool = False, allow_priority_updates: bool = False, @@ -94,7 +94,7 @@ def __init__( self.allow_replace: bool = allow_replace self.allow_updates: bool = allow_updates self.allow_priority_updates: bool = allow_priority_updates - self.pq_store: storage.PriorityQueueStore = pq_store + self.pq_store: storage.stores.PriorityQueueStore = pq_store self.lock: threading.Lock = threading.Lock() def pop(self, filters: storage.filters.FilterRequest | None = None) -> models.Task | None: diff --git a/mula/scheduler/rankers/__init__.py b/mula/scheduler/schedulers/rankers/__init__.py similarity index 54% rename from mula/scheduler/rankers/__init__.py rename to mula/scheduler/schedulers/rankers/__init__.py index 49a565268ec..9fd14d2b21e 100644 --- a/mula/scheduler/rankers/__init__.py +++ b/mula/scheduler/schedulers/rankers/__init__.py @@ -1,3 +1,3 @@ -from .boefje import BoefjeRanker +from .boefje import BoefjeRanker, BoefjeRankerTimeBased from .normalizer import NormalizerRanker from .ranker import Ranker diff --git a/mula/scheduler/rankers/boefje.py b/mula/scheduler/schedulers/rankers/boefje.py similarity index 94% rename from mula/scheduler/rankers/boefje.py rename to mula/scheduler/schedulers/rankers/boefje.py index 942ef55a93a..f951aea4f84 100644 --- a/mula/scheduler/rankers/boefje.py +++ b/mula/scheduler/schedulers/rankers/boefje.py @@ -31,11 +31,11 @@ def rank(self, obj: Any) -> int: grace_period = timedelta(seconds=self.ctx.config.pq_grace_period) # New tasks that have not yet run before - if obj.prior_tasks is None or not obj.prior_tasks: + if obj.latest_task is None or not obj.latest_task: return 2 # Make sure that we don't have tasks that are still in the grace period - time_since_grace_period = ((datetime.now(timezone.utc) - obj.prior_tasks[0].modified_at) - grace_period).seconds + time_since_grace_period = ((datetime.now(timezone.utc) - obj.latest_task.modified_at) - grace_period).seconds if time_since_grace_period < 0: return -1 diff --git a/mula/scheduler/rankers/normalizer.py b/mula/scheduler/schedulers/rankers/normalizer.py similarity index 100% rename from mula/scheduler/rankers/normalizer.py rename to mula/scheduler/schedulers/rankers/normalizer.py diff --git a/mula/scheduler/rankers/ranker.py b/mula/scheduler/schedulers/rankers/ranker.py similarity index 100% rename from mula/scheduler/rankers/ranker.py rename to mula/scheduler/schedulers/rankers/ranker.py diff --git a/mula/scheduler/schedulers/scheduler.py b/mula/scheduler/schedulers/scheduler.py index b8cf74e1434..b81e1efacf8 100644 --- a/mula/scheduler/schedulers/scheduler.py +++ b/mula/scheduler/schedulers/scheduler.py @@ -9,7 +9,9 @@ import structlog from opentelemetry import trace -from scheduler import connectors, context, models, queues, storage, utils +from scheduler import clients, context, models, storage, utils +from scheduler.schedulers.queue import PriorityQueue +from scheduler.schedulers.queue.errors import InvalidItemError, NotAllowedError, QueueEmptyError, QueueFullError from scheduler.utils import cron, thread tracer = trace.get_tracer(__name__) @@ -26,7 +28,7 @@ class Scheduler(abc.ABC): Application context of shared data (e.g. configuration, external services connections). queue: - A queues.PriorityQueue instance + A queue.PriorityQueue instance callback: A callback function to call when the scheduler is stopped. scheduler_id: @@ -57,10 +59,11 @@ def __init__( self, ctx: context.AppContext, scheduler_id: str, - queue: queues.PriorityQueue | None = None, + queue: PriorityQueue | None = None, callback: Callable[..., None] | None = None, max_tries: int = -1, create_schedule: bool = False, + auto_calculate_deadline: bool = True, ): """Initialize the Scheduler. @@ -71,7 +74,7 @@ def __init__( scheduler_id: The id of the scheduler. queue: - A queues.PriorityQueue instance + A queue.PriorityQueue instance callback: A callback function to call when the scheduler is stopped. max_tries: @@ -88,10 +91,11 @@ def __init__( self.max_tries: int = max_tries self.enabled: bool = True self.create_schedule: bool = create_schedule + self.auto_calculate_deadline: bool = auto_calculate_deadline self._last_activity: datetime | None = None # Queue - self.queue = queue or queues.PriorityQueue( + self.queue = queue or PriorityQueue( pq_id=scheduler_id, maxsize=self.ctx.config.pq_maxsize, item_type=self.ITEM_TYPE, @@ -99,7 +103,7 @@ def __init__( ) # Listeners - self.listeners: dict[str, connectors.listeners.Listener] = {} + self.listeners: dict[str, clients.amqp.Listener] = {} # Threads self.lock: threading.Lock = threading.Lock() @@ -139,7 +143,7 @@ def push_items_to_queue(self, items: list[models.Task]) -> None: for item in items: try: self.push_item_to_queue(item) - except (queues.errors.NotAllowedError, queues.errors.QueueFullError, queues.errors.InvalidItemError) as exc: + except (NotAllowedError, QueueFullError, InvalidItemError) as exc: self.logger.debug( "Unable to push item %s to queue %s (%s)", item.id, @@ -186,7 +190,7 @@ def push_item_to_queue_with_timeout(self, item: models.Task, max_tries: int = 5, tries += 1 if tries >= max_tries and max_tries != -1: - raise queues.errors.QueueFullError() + raise QueueFullError() self.push_item_to_queue(item) @@ -209,14 +213,14 @@ def push_item_to_queue(self, item: models.Task) -> models.Task: queue_id=self.queue.pq_id, scheduler_id=self.scheduler_id, ) - raise queues.errors.NotAllowedError("Scheduler is disabled") + raise NotAllowedError("Scheduler is disabled") try: if item.type is None: item.type = self.ITEM_TYPE.type item.status = models.TaskStatus.QUEUED item = self.queue.push(item) - except queues.errors.NotAllowedError as exc: + except NotAllowedError as exc: self.logger.warning( "Not allowed to push to queue %s (%s)", self.queue.pq_id, @@ -226,7 +230,7 @@ def push_item_to_queue(self, item: models.Task) -> models.Task: scheduler_id=self.scheduler_id, ) raise exc - except queues.errors.QueueFullError as exc: + except QueueFullError as exc: self.logger.warning( "Queue %s is full, not pushing new items (%s)", self.queue.pq_id, @@ -237,7 +241,7 @@ def push_item_to_queue(self, item: models.Task) -> models.Task: scheduler_id=self.scheduler_id, ) raise exc - except queues.errors.InvalidItemError as exc: + except InvalidItemError as exc: self.logger.warning( "Invalid item %s", item.id, @@ -327,7 +331,7 @@ def post_push(self, item: models.Task) -> models.Task: # based on the item. if schedule_db.schedule is not None: schedule_db.deadline_at = cron.next_run(schedule_db.schedule) - else: + elif self.auto_calculate_deadline: schedule_db.deadline_at = self.calculate_deadline(item) self.ctx.datastores.schedule_store.update_schedule(schedule_db) @@ -355,11 +359,11 @@ def pop_item_from_queue(self, filters: storage.filters.FilterRequest | None = No queue_qsize=self.queue.qsize(), scheduler_id=self.scheduler_id, ) - raise queues.errors.NotAllowedError("Scheduler is disabled") + raise NotAllowedError("Scheduler is disabled") try: item = self.queue.pop(filters) - except queues.QueueEmptyError as exc: + except QueueEmptyError as exc: raise exc if item is not None: diff --git a/mula/scheduler/schedulers/schedulers/__init__.py b/mula/scheduler/schedulers/schedulers/__init__.py new file mode 100644 index 00000000000..66ab1de1a08 --- /dev/null +++ b/mula/scheduler/schedulers/schedulers/__init__.py @@ -0,0 +1,3 @@ +from .boefje import BoefjeScheduler +from .normalizer import NormalizerScheduler +from .report import ReportScheduler diff --git a/mula/scheduler/schedulers/boefje.py b/mula/scheduler/schedulers/schedulers/boefje.py similarity index 97% rename from mula/scheduler/schedulers/boefje.py rename to mula/scheduler/schedulers/schedulers/boefje.py index 0b42b3e5549..c13b62f194e 100644 --- a/mula/scheduler/schedulers/boefje.py +++ b/mula/scheduler/schedulers/schedulers/boefje.py @@ -8,9 +8,8 @@ import structlog from opentelemetry import trace -from scheduler import context, queues, rankers, storage, utils -from scheduler.connectors import listeners -from scheduler.connectors.errors import ExternalServiceError +from scheduler import clients, context, storage, utils +from scheduler.clients.errors import ExternalServiceError from scheduler.models import ( OOI, Boefje, @@ -22,10 +21,11 @@ Task, TaskStatus, ) +from scheduler.schedulers import Scheduler +from scheduler.schedulers.queue import PriorityQueue, QueueFullError +from scheduler.schedulers.rankers import BoefjeRanker from scheduler.storage import filters -from .scheduler import Scheduler - tracer = trace.get_tracer(__name__) @@ -45,7 +45,7 @@ def __init__( ctx: context.AppContext, scheduler_id: str, organisation: Organisation, - queue: queues.PriorityQueue | None = None, + queue: PriorityQueue | None = None, callback: Callable[..., None] | None = None, ): """Initializes the BoefjeScheduler. @@ -60,7 +60,7 @@ def __init__( self.logger: structlog.BoundLogger = structlog.getLogger(__name__) self.organisation: Organisation = organisation - self.queue = queue or queues.PriorityQueue( + self.queue = queue or PriorityQueue( pq_id=scheduler_id, maxsize=ctx.config.pq_maxsize, item_type=self.ITEM_TYPE, @@ -68,10 +68,17 @@ def __init__( pq_store=ctx.datastores.pq_store, ) - super().__init__(ctx=ctx, queue=self.queue, scheduler_id=scheduler_id, callback=callback, create_schedule=True) + super().__init__( + ctx=ctx, + queue=self.queue, + scheduler_id=scheduler_id, + callback=callback, + create_schedule=True, + auto_calculate_deadline=True, + ) # Priority ranker - self.priority_ranker = rankers.BoefjeRanker(self.ctx) + self.priority_ranker = BoefjeRanker(self.ctx) def run(self) -> None: """The run method is called when the scheduler is started. It will @@ -90,7 +97,7 @@ def run(self) -> None: reschedule it. """ # Scan profile mutations - self.listeners["scan_profile_mutations"] = listeners.ScanProfileMutation( + self.listeners["scan_profile_mutations"] = clients.ScanProfileMutation( dsn=str(self.ctx.config.host_raw_data), queue=f"{self.organisation.id}__scan_profile_mutations", func=self.push_tasks_for_scan_profile_mutations, @@ -562,8 +569,8 @@ def push_boefje_task(self, boefje_task: BoefjeTask, caller: str = "") -> None: ) return - prior_tasks = self.ctx.datastores.task_store.get_tasks_by_hash(boefje_task.hash) - score = self.priority_ranker.rank(SimpleNamespace(prior_tasks=prior_tasks, task=boefje_task)) + latest_task = self.ctx.datastores.task_store.get_latest_task_by_hash(boefje_task.hash) + score = self.priority_ranker.rank(SimpleNamespace(latest_task=latest_task, task=boefje_task)) task = Task( id=boefje_task.id, @@ -576,7 +583,7 @@ def push_boefje_task(self, boefje_task: BoefjeTask, caller: str = "") -> None: try: self.push_item_to_queue_with_timeout(task, self.max_tries) - except queues.QueueFullError: + except QueueFullError: self.logger.warning( "Could not add task to queue, queue was full: %s", boefje_task.hash, diff --git a/mula/scheduler/schedulers/normalizer.py b/mula/scheduler/schedulers/schedulers/normalizer.py similarity index 95% rename from mula/scheduler/schedulers/normalizer.py rename to mula/scheduler/schedulers/schedulers/normalizer.py index e193a3b8398..47db57e7c97 100644 --- a/mula/scheduler/schedulers/normalizer.py +++ b/mula/scheduler/schedulers/schedulers/normalizer.py @@ -7,12 +7,12 @@ import structlog from opentelemetry import trace -from scheduler import context, models, queues, rankers -from scheduler.connectors import listeners -from scheduler.connectors.errors import ExternalServiceError +from scheduler import clients, context, models +from scheduler.clients.errors import ExternalServiceError from scheduler.models import Normalizer, NormalizerTask, Organisation, Plugin, RawDataReceivedEvent, Task, TaskStatus - -from .scheduler import Scheduler +from scheduler.schedulers import Scheduler +from scheduler.schedulers.queue import PriorityQueue, QueueFullError +from scheduler.schedulers.rankers import NormalizerRanker tracer = trace.get_tracer(__name__) @@ -33,14 +33,13 @@ def __init__( ctx: context.AppContext, scheduler_id: str, organisation: Organisation, - queue: queues.PriorityQueue | None = None, + queue: PriorityQueue | None = None, callback: Callable[..., None] | None = None, ): self.logger: structlog.BoundLogger = structlog.getLogger(__name__) self.organisation: Organisation = organisation - self.create_schedule = False - self.queue = queue or queues.PriorityQueue( + self.queue = queue or PriorityQueue( pq_id=scheduler_id, maxsize=ctx.config.pq_maxsize, item_type=self.ITEM_TYPE, @@ -48,9 +47,16 @@ def __init__( pq_store=ctx.datastores.pq_store, ) - super().__init__(ctx=ctx, queue=self.queue, scheduler_id=scheduler_id, callback=callback) + super().__init__( + ctx=ctx, + queue=self.queue, + scheduler_id=scheduler_id, + callback=callback, + create_schedule=False, + auto_calculate_deadline=False, + ) - self.ranker = rankers.NormalizerRanker(ctx=self.ctx) + self.ranker = NormalizerRanker(ctx=self.ctx) def run(self) -> None: """The run method is called when the scheduler is started. It will @@ -62,7 +68,7 @@ def run(self) -> None: for each normalizer that is registered for the mime type of the raw file. """ - listener = listeners.RawData( + listener = clients.RawData( dsn=str(self.ctx.config.host_raw_data), queue=f"{self.organisation.id}__raw_file_received", func=self.push_tasks_for_received_raw_data, @@ -249,7 +255,7 @@ def push_normalizer_task(self, normalizer_task: models.NormalizerTask, caller: s try: self.push_item_to_queue_with_timeout(task, self.max_tries) - except queues.QueueFullError: + except QueueFullError: self.logger.warning( "Could not add task to queue, queue was full: %s", task.id, diff --git a/mula/scheduler/schedulers/report.py b/mula/scheduler/schedulers/schedulers/report.py similarity index 89% rename from mula/scheduler/schedulers/report.py rename to mula/scheduler/schedulers/schedulers/report.py index 321dad01034..099c21642d9 100644 --- a/mula/scheduler/schedulers/report.py +++ b/mula/scheduler/schedulers/schedulers/report.py @@ -6,12 +6,12 @@ import structlog from opentelemetry import trace -from scheduler import context, queues, storage +from scheduler import context, storage from scheduler.models import Organisation, ReportTask, Task +from scheduler.schedulers import Scheduler +from scheduler.schedulers.queue import PriorityQueue, QueueFullError from scheduler.storage import filters -from .scheduler import Scheduler - tracer = trace.get_tracer(__name__) @@ -23,12 +23,12 @@ def __init__( ctx: context.AppContext, scheduler_id: str, organisation: Organisation, - queue: queues.PriorityQueue | None = None, + queue: PriorityQueue | None = None, callback: Callable[..., None] | None = None, ): self.logger: structlog.BoundLogger = structlog.get_logger(__name__) self.organisation = organisation - self.queue = queue or queues.PriorityQueue( + self.queue = queue or PriorityQueue( pq_id=scheduler_id, maxsize=ctx.config.pq_maxsize, item_type=self.ITEM_TYPE, @@ -36,7 +36,14 @@ def __init__( pq_store=ctx.datastores.pq_store, ) - super().__init__(ctx=ctx, queue=self.queue, scheduler_id=scheduler_id, callback=callback, create_schedule=True) + super().__init__( + ctx=ctx, + queue=self.queue, + scheduler_id=scheduler_id, + callback=callback, + create_schedule=True, + auto_calculate_deadline=False, + ) def run(self) -> None: # Rescheduling @@ -111,7 +118,7 @@ def push_report_task(self, report_task: ReportTask, caller: str = "") -> None: try: self.push_item_to_queue_with_timeout(task, self.max_tries) - except queues.QueueFullError: + except QueueFullError: self.logger.warning( "Could not add task %s to queue, queue was full", report_task.hash, diff --git a/mula/scheduler/server/handlers/queues.py b/mula/scheduler/server/handlers/queues.py index 6507a15a2cc..461c897c5e9 100644 --- a/mula/scheduler/server/handlers/queues.py +++ b/mula/scheduler/server/handlers/queues.py @@ -4,7 +4,8 @@ import structlog from fastapi import status -from scheduler import context, models, queues, schedulers, storage +from scheduler import context, models, schedulers, storage +from scheduler.schedulers.queue import NotAllowedError, QueueEmptyError, QueueFullError from scheduler.server import serializers from scheduler.server.errors import BadRequestError, ConflictError, NotFoundError, TooManyRequestsError @@ -70,7 +71,7 @@ def pop(self, queue_id: str, filters: storage.filters.FilterRequest | None = Non try: item = s.pop_item_from_queue(filters) - except queues.QueueEmptyError: + except QueueEmptyError: return None if item is None: @@ -94,9 +95,9 @@ def push(self, queue_id: str, item_in: serializers.Task) -> Any: pushed_item = s.push_item_to_queue(new_item) except ValueError: raise BadRequestError("malformed item") - except queues.QueueFullError: + except QueueFullError: raise TooManyRequestsError("queue is full") - except queues.errors.NotAllowedError: + except NotAllowedError: raise ConflictError("queue is not allowed to push items") return pushed_item diff --git a/mula/scheduler/storage/__init__.py b/mula/scheduler/storage/__init__.py index 7ef0497775c..2891e0f29d3 100644 --- a/mula/scheduler/storage/__init__.py +++ b/mula/scheduler/storage/__init__.py @@ -1,6 +1,3 @@ +from .connection import DBConn from .filters import apply_filter -from .pq_store import PriorityQueueStore -from .schedule_store import ScheduleStore -from .storage import DBConn -from .task_store import TaskStore from .utils import retry diff --git a/mula/scheduler/storage/connection.py b/mula/scheduler/storage/connection.py new file mode 100644 index 00000000000..dc381191528 --- /dev/null +++ b/mula/scheduler/storage/connection.py @@ -0,0 +1,51 @@ +import json +from functools import partial + +import sqlalchemy +import structlog + +from scheduler.config import settings +from scheduler.storage.errors import StorageError + + +class DBConn: + def __init__(self, dsn: str, pool_size: int = 25): + self.logger: structlog.BoundLogger = structlog.get_logger(__name__) + + self.dsn = dsn + self.pool_size = pool_size + + def connect(self) -> None: + db_uri_redacted = sqlalchemy.engine.make_url(name_or_url=self.dsn).render_as_string(hide_password=True) + + pool_size = settings.Settings().db_connection_pool_size + + self.logger.debug( + "Connecting to database %s with pool size %s...", + self.dsn, + pool_size, + dsn=db_uri_redacted, + pool_size=pool_size, + ) + + try: + serializer = partial(json.dumps, default=str) + self.engine = sqlalchemy.create_engine( + self.dsn, + pool_pre_ping=True, + pool_size=pool_size, + pool_recycle=300, + json_serializer=serializer, + connect_args={"options": "-c timezone=utc"}, + ) + except sqlalchemy.exc.SQLAlchemyError as e: + self.logger.error("Failed to connect to database %s: %s", self.dsn, e, dsn=db_uri_redacted) + raise StorageError("Failed to connect to database.") + + self.logger.debug("Connected to database %s.", db_uri_redacted, dsn=db_uri_redacted) + + try: + self.session = sqlalchemy.orm.sessionmaker(bind=self.engine) + except sqlalchemy.exc.SQLAlchemyError as e: + self.logger.error("Failed to create session: %s", e) + raise StorageError("Failed to create session.") diff --git a/mula/scheduler/alembic/versions/__init__.py b/mula/scheduler/storage/migrations/__init__.py similarity index 100% rename from mula/scheduler/alembic/versions/__init__.py rename to mula/scheduler/storage/migrations/__init__.py diff --git a/mula/scheduler/alembic.ini b/mula/scheduler/storage/migrations/alembic.ini similarity index 98% rename from mula/scheduler/alembic.ini rename to mula/scheduler/storage/migrations/alembic.ini index 76c3ea8eab2..13a39bc3f11 100644 --- a/mula/scheduler/alembic.ini +++ b/mula/scheduler/storage/migrations/alembic.ini @@ -2,7 +2,7 @@ [alembic] # path to migration scripts -script_location = scheduler/alembic +script_location = scheduler/storage/migrations # template used to generate migration files file_template = %%(rev)s_%%(slug)s diff --git a/mula/scheduler/alembic/env.py b/mula/scheduler/storage/migrations/env.py similarity index 100% rename from mula/scheduler/alembic/env.py rename to mula/scheduler/storage/migrations/env.py diff --git a/mula/scheduler/alembic/script.py.mako b/mula/scheduler/storage/migrations/script.py.mako similarity index 100% rename from mula/scheduler/alembic/script.py.mako rename to mula/scheduler/storage/migrations/script.py.mako diff --git a/mula/scheduler/alembic/versions/0001_initial_migration.py b/mula/scheduler/storage/migrations/versions/0001_initial_migration.py similarity index 100% rename from mula/scheduler/alembic/versions/0001_initial_migration.py rename to mula/scheduler/storage/migrations/versions/0001_initial_migration.py diff --git a/mula/scheduler/alembic/versions/0002_update_tasks.py b/mula/scheduler/storage/migrations/versions/0002_update_tasks.py similarity index 100% rename from mula/scheduler/alembic/versions/0002_update_tasks.py rename to mula/scheduler/storage/migrations/versions/0002_update_tasks.py diff --git a/mula/scheduler/alembic/versions/0003_add_type_field_to_tasks.py b/mula/scheduler/storage/migrations/versions/0003_add_type_field_to_tasks.py similarity index 100% rename from mula/scheduler/alembic/versions/0003_add_type_field_to_tasks.py rename to mula/scheduler/storage/migrations/versions/0003_add_type_field_to_tasks.py diff --git a/mula/scheduler/alembic/versions/0004_add_server_default.py b/mula/scheduler/storage/migrations/versions/0004_add_server_default.py similarity index 100% rename from mula/scheduler/alembic/versions/0004_add_server_default.py rename to mula/scheduler/storage/migrations/versions/0004_add_server_default.py diff --git a/mula/scheduler/alembic/versions/0005_size_limit_hash.py b/mula/scheduler/storage/migrations/versions/0005_size_limit_hash.py similarity index 100% rename from mula/scheduler/alembic/versions/0005_size_limit_hash.py rename to mula/scheduler/storage/migrations/versions/0005_size_limit_hash.py diff --git a/mula/scheduler/alembic/versions/0006_add_jsonb_fields_add_jsonb_fields.py b/mula/scheduler/storage/migrations/versions/0006_add_jsonb_fields_add_jsonb_fields.py similarity index 100% rename from mula/scheduler/alembic/versions/0006_add_jsonb_fields_add_jsonb_fields.py rename to mula/scheduler/storage/migrations/versions/0006_add_jsonb_fields_add_jsonb_fields.py diff --git a/mula/scheduler/alembic/versions/0007_add_cancelled_status_for_tasks.py b/mula/scheduler/storage/migrations/versions/0007_add_cancelled_status_for_tasks.py similarity index 100% rename from mula/scheduler/alembic/versions/0007_add_cancelled_status_for_tasks.py rename to mula/scheduler/storage/migrations/versions/0007_add_cancelled_status_for_tasks.py diff --git a/mula/scheduler/alembic/versions/0008_add_task_schedule.py b/mula/scheduler/storage/migrations/versions/0008_add_task_schedule.py similarity index 100% rename from mula/scheduler/alembic/versions/0008_add_task_schedule.py rename to mula/scheduler/storage/migrations/versions/0008_add_task_schedule.py diff --git a/mula/scheduler/connectors/__init__.py b/mula/scheduler/storage/migrations/versions/__init__.py similarity index 100% rename from mula/scheduler/connectors/__init__.py rename to mula/scheduler/storage/migrations/versions/__init__.py diff --git a/mula/scheduler/storage/stores/__init__.py b/mula/scheduler/storage/stores/__init__.py new file mode 100644 index 00000000000..244e81eb087 --- /dev/null +++ b/mula/scheduler/storage/stores/__init__.py @@ -0,0 +1,3 @@ +from .pq import PriorityQueueStore +from .schedule import ScheduleStore +from .task import TaskStore diff --git a/mula/scheduler/storage/pq_store.py b/mula/scheduler/storage/stores/pq.py similarity index 97% rename from mula/scheduler/storage/pq_store.py rename to mula/scheduler/storage/stores/pq.py index 9e67997f072..feb62bd01c7 100644 --- a/mula/scheduler/storage/pq_store.py +++ b/mula/scheduler/storage/stores/pq.py @@ -1,11 +1,10 @@ from uuid import UUID from scheduler import models - -from .errors import exception_handler -from .filters import FilterRequest, apply_filter -from .storage import DBConn -from .utils import retry +from scheduler.storage import DBConn +from scheduler.storage.errors import exception_handler +from scheduler.storage.filters import FilterRequest, apply_filter +from scheduler.storage.utils import retry class PriorityQueueStore: diff --git a/mula/scheduler/storage/schedule_store.py b/mula/scheduler/storage/stores/schedule.py similarity index 93% rename from mula/scheduler/storage/schedule_store.py rename to mula/scheduler/storage/stores/schedule.py index d26c10d5f5e..a91b680d03b 100644 --- a/mula/scheduler/storage/schedule_store.py +++ b/mula/scheduler/storage/stores/schedule.py @@ -3,11 +3,10 @@ from sqlalchemy import exc from scheduler import models - -from .errors import StorageError, exception_handler -from .filters import FilterRequest, apply_filter -from .storage import DBConn -from .utils import retry +from scheduler.storage import DBConn +from scheduler.storage.errors import StorageError, exception_handler +from scheduler.storage.filters import FilterRequest, apply_filter +from scheduler.storage.utils import retry class ScheduleStore: @@ -22,7 +21,7 @@ def get_schedules( self, scheduler_id: str | None = None, schedule_hash: str | None = None, - enabled: bool | None = None, + enabled: bool | None = True, # FIXME: None? min_deadline_at: datetime | None = None, max_deadline_at: datetime | None = None, min_created_at: datetime | None = None, diff --git a/mula/scheduler/storage/task_store.py b/mula/scheduler/storage/stores/task.py similarity index 91% rename from mula/scheduler/storage/task_store.py rename to mula/scheduler/storage/stores/task.py index 266b6764e8e..437e10ca538 100644 --- a/mula/scheduler/storage/task_store.py +++ b/mula/scheduler/storage/stores/task.py @@ -3,11 +3,10 @@ from sqlalchemy import exc, func from scheduler import models - -from .errors import StorageError, exception_handler -from .filters import FilterRequest, apply_filter -from .storage import DBConn -from .utils import retry +from scheduler.storage import DBConn +from scheduler.storage.errors import StorageError, exception_handler +from scheduler.storage.filters import FilterRequest, apply_filter +from scheduler.storage.utils import retry class TaskStore: @@ -74,14 +73,14 @@ def get_task(self, task_id: str) -> models.Task | None: @retry() @exception_handler - def get_tasks_by_hash(self, task_hash: str) -> list[models.Task] | None: + def get_tasks_by_hash(self, task_hash: str, limit: int | None = None) -> list[models.Task] | None: with self.dbconn.session.begin() as session: - tasks_orm = ( - session.query(models.TaskDB) - .filter(models.TaskDB.hash == task_hash) - .order_by(models.TaskDB.created_at.desc()) - .all() - ) + query = session.query(models.TaskDB).filter(models.TaskDB.hash == task_hash) + + if limit is not None: + query = query.limit(limit) + + tasks_orm = query.order_by(models.TaskDB.created_at.desc()).all() if tasks_orm is None: return None diff --git a/mula/tests/integration/test_api.py b/mula/tests/integration/test_api.py index a487f765dba..6eaa82086c2 100644 --- a/mula/tests/integration/test_api.py +++ b/mula/tests/integration/test_api.py @@ -9,6 +9,7 @@ from fastapi.testclient import TestClient from scheduler import config, models, server, storage, utils from scheduler.server import serializers +from scheduler.storage import stores from tests.factories import OrganisationFactory from tests.mocks import queue as mock_queue @@ -31,9 +32,9 @@ def setUp(self): self.mock_ctx.datastores = SimpleNamespace( **{ - storage.TaskStore.name: storage.TaskStore(self.dbconn), - storage.PriorityQueueStore.name: storage.PriorityQueueStore(self.dbconn), - storage.ScheduleStore.name: storage.ScheduleStore(self.dbconn), + stores.TaskStore.name: stores.TaskStore(self.dbconn), + stores.PriorityQueueStore.name: stores.PriorityQueueStore(self.dbconn), + stores.ScheduleStore.name: stores.ScheduleStore(self.dbconn), } ) diff --git a/mula/tests/integration/test_app.py b/mula/tests/integration/test_app.py index 99e79009efb..b75f0576883 100644 --- a/mula/tests/integration/test_app.py +++ b/mula/tests/integration/test_app.py @@ -5,6 +5,7 @@ import scheduler from scheduler import config, models, server, storage +from scheduler.storage import stores from tests.factories import OrganisationFactory from tests.mocks import MockKatalogusService @@ -25,8 +26,8 @@ def setUp(self): self.mock_ctx.datastores = SimpleNamespace( **{ - storage.TaskStore.name: storage.TaskStore(self.dbconn), - storage.PriorityQueueStore.name: storage.PriorityQueueStore(self.dbconn), + stores.TaskStore.name: stores.TaskStore(self.dbconn), + stores.PriorityQueueStore.name: stores.PriorityQueueStore(self.dbconn), } ) diff --git a/mula/tests/integration/test_boefje_scheduler.py b/mula/tests/integration/test_boefje_scheduler.py index a925bde5619..19abaeb555b 100644 --- a/mula/tests/integration/test_boefje_scheduler.py +++ b/mula/tests/integration/test_boefje_scheduler.py @@ -3,7 +3,8 @@ from types import SimpleNamespace from unittest import mock -from scheduler import config, connectors, models, schedulers, storage +from scheduler import clients, config, models, schedulers, storage +from scheduler.storage import stores from structlog.testing import capture_logs from tests.factories import ( @@ -24,21 +25,19 @@ def setUp(self): self.mock_ctx.config = config.settings.Settings() # Mock connectors: octopoes - self.mock_octopoes = mock.create_autospec(spec=connectors.services.Octopoes, spec_set=True) + self.mock_octopoes = mock.create_autospec(spec=clients.Octopoes, spec_set=True) self.mock_ctx.services.octopoes = self.mock_octopoes # Mock connectors: Scan profile mutation - self.mock_scan_profile_mutation = mock.create_autospec( - spec=connectors.listeners.ScanProfileMutation, spec_set=True - ) + self.mock_scan_profile_mutation = mock.create_autospec(spec=clients.ScanProfileMutation, spec_set=True) self.mock_ctx.services.scan_profile_mutation = self.mock_scan_profile_mutation # Mock connectors: Katalogus - self.mock_katalogus = mock.create_autospec(spec=connectors.services.Katalogus, spec_set=True) + self.mock_katalogus = mock.create_autospec(spec=clients.Katalogus, spec_set=True) self.mock_ctx.services.katalogus = self.mock_katalogus # Mock connectors: Bytes - self.mock_bytes = mock.create_autospec(spec=connectors.services.Bytes, spec_set=True) + self.mock_bytes = mock.create_autospec(spec=clients.Bytes, spec_set=True) self.mock_ctx.services.bytes = self.mock_bytes # Database @@ -49,9 +48,9 @@ def setUp(self): self.mock_ctx.datastores = SimpleNamespace( **{ - storage.ScheduleStore.name: storage.ScheduleStore(self.dbconn), - storage.TaskStore.name: storage.TaskStore(self.dbconn), - storage.PriorityQueueStore.name: storage.PriorityQueueStore(self.dbconn), + stores.ScheduleStore.name: stores.ScheduleStore(self.dbconn), + stores.TaskStore.name: stores.TaskStore(self.dbconn), + stores.PriorityQueueStore.name: stores.PriorityQueueStore(self.dbconn), } ) @@ -577,10 +576,10 @@ def test_push_task_no_ooi(self): @mock.patch("scheduler.schedulers.BoefjeScheduler.has_boefje_permission_to_run") @mock.patch("scheduler.schedulers.BoefjeScheduler.has_boefje_task_grace_period_passed") @mock.patch("scheduler.schedulers.BoefjeScheduler.is_item_on_queue_by_hash") - @mock.patch("scheduler.context.AppContext.datastores.task_store.get_tasks_by_hash") + @mock.patch("scheduler.context.AppContext.datastores.task_store.get_latest_task_by_hash") def test_push_task_queue_full( self, - mock_get_tasks_by_hash, + mock_get_latest_task_by_hash, mock_is_item_on_queue_by_hash, mock_has_boefje_task_grace_period_passed, mock_has_boefje_permission_to_run, @@ -606,7 +605,7 @@ def test_push_task_queue_full( mock_has_boefje_task_started_running.return_value = False mock_has_boefje_task_grace_period_passed.return_value = True mock_is_item_on_queue_by_hash.return_value = False - mock_get_tasks_by_hash.return_value = None + mock_get_latest_task_by_hash.return_value = None self.mock_get_plugin.return_value = PluginFactory(scan_level=0, consumes=[ooi.object_type]) # Act @@ -1333,8 +1332,8 @@ def test_push_tasks_for_new_boefjes_request_exception(self): # Mocks self.mock_get_objects_by_object_types.side_effect = [ - connectors.errors.ExternalServiceError("External service is not available."), - connectors.errors.ExternalServiceError("External service is not available."), + clients.errors.ExternalServiceError("External service is not available."), + clients.errors.ExternalServiceError("External service is not available."), ] self.mock_get_new_boefjes_by_org_id.return_value = [boefje] @@ -1414,8 +1413,8 @@ def test_push_tasks_for_new_boefjes_get_objects_request_exception(self): # Mocks self.mock_get_objects_by_object_types.side_effect = [ - connectors.errors.ExternalServiceError("External service is not available."), - connectors.errors.ExternalServiceError("External service is not available."), + clients.errors.ExternalServiceError("External service is not available."), + clients.errors.ExternalServiceError("External service is not available."), ] self.mock_get_new_boefjes_by_org_id.return_value = [boefje] diff --git a/mula/tests/integration/test_services.py b/mula/tests/integration/test_clients.py similarity index 91% rename from mula/tests/integration/test_services.py rename to mula/tests/integration/test_clients.py index 460d5f6b115..3012dfad972 100644 --- a/mula/tests/integration/test_services.py +++ b/mula/tests/integration/test_clients.py @@ -3,8 +3,7 @@ import unittest from unittest import mock -from scheduler import config, models, storage -from scheduler.connectors import services +from scheduler import clients, config, models, storage from scheduler.utils import remove_trailing_slash from tests.factories import PluginFactory @@ -13,7 +12,7 @@ class BytesTestCase(unittest.TestCase): def setUp(self) -> None: self.config = config.settings.Settings() - self.service_bytes = services.Bytes( + self.service_bytes = clients.Bytes( host=remove_trailing_slash(str(self.config.host_bytes)), user=self.config.host_bytes_user, password=self.config.host_bytes_password, @@ -48,7 +47,7 @@ def setUp(self) -> None: self.dbconn = storage.DBConn(str(self.config.db_uri)) self.dbconn.connect() - self.service_katalogus = services.Katalogus( + self.service_katalogus = clients.Katalogus( host=remove_trailing_slash(str(self.config.host_katalogus)), source="scheduler_test", timeout=self.config.katalogus_request_timeout, @@ -61,7 +60,7 @@ def tearDown(self) -> None: self.service_katalogus.boefje_cache.reset() self.service_katalogus.normalizer_cache.reset() - @mock.patch("scheduler.connectors.services.Katalogus.get_organisations") + @mock.patch("scheduler.clients.http.external.katalogus.Katalogus.get_organisations") def test_flush_plugin_cache(self, mock_get_organisations): # Mock mock_get_organisations.return_value = [ @@ -75,7 +74,7 @@ def test_flush_plugin_cache(self, mock_get_organisations): # Assert self.assertCountEqual(self.service_katalogus.plugin_cache.cache.keys(), ("org-1", "org-2")) - @mock.patch("scheduler.connectors.services.Katalogus.get_organisations") + @mock.patch("scheduler.clients.http.external.katalogus.Katalogus.get_organisations") def test_flush_plugin_cache_empty(self, mock_get_organisations): # Mock mock_get_organisations.return_value = [] @@ -86,8 +85,8 @@ def test_flush_plugin_cache_empty(self, mock_get_organisations): # Assert self.assertDictEqual(self.service_katalogus.plugin_cache.cache, {}) - @mock.patch("scheduler.connectors.services.Katalogus.get_plugins_by_organisation") - @mock.patch("scheduler.connectors.services.Katalogus.get_organisations") + @mock.patch("scheduler.clients.http.external.katalogus.Katalogus.get_plugins_by_organisation") + @mock.patch("scheduler.clients.http.external.katalogus.Katalogus.get_organisations") def test_flush_boefje_cache(self, mock_get_organisations, mock_get_plugins_by_organisation): # Mock mock_get_organisations.return_value = [ @@ -114,8 +113,8 @@ def test_flush_boefje_cache(self, mock_get_organisations, mock_get_plugins_by_or self.assertIsNotNone(self.service_katalogus.boefje_cache.get("org-2").get("Hostname")) self.assertEqual(len(self.service_katalogus.boefje_cache.get("org-2").get("Hostname")), 2) - @mock.patch("scheduler.connectors.services.Katalogus.get_plugins_by_organisation") - @mock.patch("scheduler.connectors.services.Katalogus.get_organisations") + @mock.patch("scheduler.clients.http.external.katalogus.Katalogus.get_plugins_by_organisation") + @mock.patch("scheduler.clients.http.external.katalogus.Katalogus.get_organisations") def test_flush_normalizer_cache(self, mock_get_organisations, mock_get_plugins_by_organisation): # Mock mock_get_organisations.return_value = [ @@ -142,7 +141,7 @@ def test_flush_normalizer_cache(self, mock_get_organisations, mock_get_plugins_b self.assertIsNotNone(self.service_katalogus.normalizer_cache.get("org-2").get("Hostname")) self.assertEqual(len(self.service_katalogus.normalizer_cache.get("org-2").get("Hostname")), 2) - @mock.patch("scheduler.connectors.services.Katalogus.get_plugins_by_organisation") + @mock.patch("scheduler.clients.http.external.katalogus.Katalogus.get_plugins_by_organisation") def test_get_new_boefjes_by_org_id(self, mock_get_plugins_by_organisation): # Mock mock_get_plugins_by_organisation.side_effect = [ @@ -197,8 +196,8 @@ def test_get_new_boefjes_by_org_id(self, mock_get_plugins_by_organisation): self.assertEqual(len(new_boefjes), 1) self.assertEqual(new_boefjes[0].id, "plugin-5") - @mock.patch("scheduler.connectors.services.Katalogus.get_plugins_by_organisation") - @mock.patch("scheduler.connectors.services.Katalogus.get_organisations") + @mock.patch("scheduler.clients.http.external.katalogus.Katalogus.get_plugins_by_organisation") + @mock.patch("scheduler.clients.http.external.katalogus.Katalogus.get_organisations") def test_plugin_cache_thread_safety(self, mock_get_organisations, mock_get_plugins_by_organisation): # Mock mock_get_organisations.return_value = [models.Organisation(id="org-1", name="org-1")] @@ -241,8 +240,8 @@ def write_to_cache(event): self.assertEqual(len(self.service_katalogus.plugin_cache.get("org-1")), 3) - @mock.patch("scheduler.connectors.services.Katalogus.get_plugins_by_organisation") - @mock.patch("scheduler.connectors.services.Katalogus.get_organisations") + @mock.patch("scheduler.clients.http.external.katalogus.Katalogus.get_plugins_by_organisation") + @mock.patch("scheduler.clients.http.external.katalogus.Katalogus.get_organisations") def test_boefje_cache_thread_safety(self, mock_get_organisations, mock_get_plugins_by_organisation): # Mock mock_get_organisations.return_value = [models.Organisation(id="org-1", name="org-1")] @@ -287,8 +286,8 @@ def write_to_cache(event): self.assertEqual(len(self.service_katalogus.boefje_cache.get("org-1").get("Hostname")), 2) - @mock.patch("scheduler.connectors.services.Katalogus.get_plugins_by_organisation") - @mock.patch("scheduler.connectors.services.Katalogus.get_organisations") + @mock.patch("scheduler.clients.http.external.katalogus.Katalogus.get_plugins_by_organisation") + @mock.patch("scheduler.clients.http.external.katalogus.Katalogus.get_organisations") def test_normalizer_cache_thread_safety(self, mock_get_organisations, mock_get_plugins_by_organisation): # Mock mock_get_organisations.return_value = [models.Organisation(id="org-1", name="org-1")] @@ -331,7 +330,7 @@ def write_to_cache(event): self.assertEqual(len(self.service_katalogus.normalizer_cache.get("org-1").get("Hostname")), 2) - @mock.patch("scheduler.connectors.services.Katalogus.get_plugins_by_organisation") + @mock.patch("scheduler.clients.http.external.katalogus.Katalogus.get_plugins_by_organisation") def test_new_boefjes_cache_thread_safety(self, mock_get_plugins_by_organisation): mock_get_plugins_by_organisation.side_effect = [ [ diff --git a/mula/tests/integration/test_listeners.py b/mula/tests/integration/test_listeners.py index 5c316d590e1..fd56a43cf78 100644 --- a/mula/tests/integration/test_listeners.py +++ b/mula/tests/integration/test_listeners.py @@ -4,7 +4,7 @@ from unittest import mock import pika -from scheduler import connectors, utils +from scheduler import clients, utils from tests.mocks import listener @@ -13,7 +13,7 @@ class RabbitMQTestCase(unittest.TestCase): DSN = "amqp://guest:guest@ci_rabbitmq:5672/%2Fkat" def setUp(self): - self.listeners: list[connectors.listeners.Listener] = [] + self.listeners: list[clients.listeners.Listener] = [] threading.excepthook = self.unhandled_exception diff --git a/mula/tests/integration/test_normalizer_scheduler.py b/mula/tests/integration/test_normalizer_scheduler.py index 78a25287d28..493b4bd3f54 100644 --- a/mula/tests/integration/test_normalizer_scheduler.py +++ b/mula/tests/integration/test_normalizer_scheduler.py @@ -3,7 +3,8 @@ from types import SimpleNamespace from unittest import mock -from scheduler import config, connectors, models, schedulers, storage +from scheduler import clients, config, models, schedulers, storage +from scheduler.storage import stores from structlog.testing import capture_logs from tests.factories import ( @@ -33,9 +34,9 @@ def setUp(self): self.mock_ctx.datastores = SimpleNamespace( **{ - storage.TaskStore.name: storage.TaskStore(self.dbconn), - storage.PriorityQueueStore.name: storage.PriorityQueueStore(self.dbconn), - storage.ScheduleStore.name: storage.ScheduleStore(self.dbconn), + stores.TaskStore.name: stores.TaskStore(self.dbconn), + stores.PriorityQueueStore.name: stores.PriorityQueueStore(self.dbconn), + stores.ScheduleStore.name: stores.ScheduleStore(self.dbconn), } ) @@ -160,8 +161,8 @@ def test_get_normalizers_for_mime_type(self, mock_get_normalizers_by_org_id_and_ def test_get_normalizers_for_mime_type_request_exception(self, mock_get_normalizers_by_org_id_and_type): # Mocks mock_get_normalizers_by_org_id_and_type.side_effect = [ - connectors.errors.ExternalServiceError("External service is not available."), - connectors.errors.ExternalServiceError("External service is not available."), + clients.errors.ExternalServiceError("External service is not available."), + clients.errors.ExternalServiceError("External service is not available."), ] # Act diff --git a/mula/tests/integration/test_pq_store.py b/mula/tests/integration/test_pq_store.py index 6ebefce88ce..0ace0867758 100644 --- a/mula/tests/integration/test_pq_store.py +++ b/mula/tests/integration/test_pq_store.py @@ -4,6 +4,7 @@ from unittest import mock from scheduler import config, models, storage +from scheduler.storage import stores from tests.factories import OrganisationFactory from tests.utils import functions @@ -23,8 +24,8 @@ def setUp(self): self.mock_ctx.datastores = SimpleNamespace( **{ - storage.PriorityQueueStore.name: storage.PriorityQueueStore(self.dbconn), - storage.TaskStore.name: storage.TaskStore(self.dbconn), + stores.PriorityQueueStore.name: stores.PriorityQueueStore(self.dbconn), + stores.TaskStore.name: stores.TaskStore(self.dbconn), } ) diff --git a/mula/tests/integration/test_report_scheduler.py b/mula/tests/integration/test_report_scheduler.py index d60a25dd502..ee35f7ab25a 100644 --- a/mula/tests/integration/test_report_scheduler.py +++ b/mula/tests/integration/test_report_scheduler.py @@ -3,6 +3,7 @@ from unittest import mock from scheduler import config, models, schedulers, storage +from scheduler.storage import stores from tests.factories import OrganisationFactory @@ -21,9 +22,9 @@ def setUp(self): self.mock_ctx.datastores = SimpleNamespace( **{ - storage.TaskStore.name: storage.TaskStore(self.dbconn), - storage.PriorityQueueStore.name: storage.PriorityQueueStore(self.dbconn), - storage.ScheduleStore.name: storage.ScheduleStore(self.dbconn), + stores.TaskStore.name: stores.TaskStore(self.dbconn), + stores.PriorityQueueStore.name: stores.PriorityQueueStore(self.dbconn), + stores.ScheduleStore.name: stores.ScheduleStore(self.dbconn), } ) diff --git a/mula/tests/integration/test_schedule_store.py b/mula/tests/integration/test_schedule_store.py index 369af4c6101..df957b82171 100644 --- a/mula/tests/integration/test_schedule_store.py +++ b/mula/tests/integration/test_schedule_store.py @@ -4,7 +4,7 @@ from unittest import mock from scheduler import config, models, storage -from scheduler.storage import filters +from scheduler.storage import filters, stores from tests.utils import functions @@ -23,8 +23,8 @@ def setUp(self): self.mock_ctx.datastores = SimpleNamespace( **{ - storage.TaskStore.name: storage.TaskStore(self.dbconn), - storage.ScheduleStore.name: storage.ScheduleStore(self.dbconn), + stores.TaskStore.name: stores.TaskStore(self.dbconn), + stores.ScheduleStore.name: stores.ScheduleStore(self.dbconn), } ) diff --git a/mula/tests/integration/test_scheduler.py b/mula/tests/integration/test_scheduler.py index 095b74d63a1..7483f5b9880 100644 --- a/mula/tests/integration/test_scheduler.py +++ b/mula/tests/integration/test_scheduler.py @@ -4,7 +4,9 @@ from types import SimpleNamespace from unittest import mock -from scheduler import config, models, queues, storage +from scheduler import config, models, storage +from scheduler.schedulers.queue import InvalidItemError, NotAllowedError, QueueEmptyError, QueueFullError +from scheduler.storage import stores from structlog.testing import capture_logs from tests.mocks import item as mock_item @@ -27,9 +29,9 @@ def setUp(self): self.mock_ctx.datastores = SimpleNamespace( **{ - storage.TaskStore.name: storage.TaskStore(self.dbconn), - storage.PriorityQueueStore.name: storage.PriorityQueueStore(self.dbconn), - storage.ScheduleStore.name: storage.ScheduleStore(self.dbconn), + stores.TaskStore.name: stores.TaskStore(self.dbconn), + stores.PriorityQueueStore.name: stores.PriorityQueueStore(self.dbconn), + stores.ScheduleStore.name: stores.ScheduleStore(self.dbconn), } ) @@ -138,7 +140,7 @@ def test_push_item_to_queue_full(self): # Assert self.assertEqual(1, self.scheduler.queue.qsize()) - with self.assertRaises(queues.errors.QueueFullError): + with self.assertRaises(QueueFullError): self.scheduler.push_item_to_queue_with_timeout(item=item, max_tries=1) self.assertEqual(1, self.scheduler.queue.qsize()) @@ -149,7 +151,7 @@ def test_push_item_to_queue_invalid(self): item.data = {"invalid": "data"} # Assert - with self.assertRaises(queues.errors.InvalidItemError): + with self.assertRaises(InvalidItemError): self.scheduler.push_item_to_queue(item) def test_pop_item_from_queue(self): @@ -167,7 +169,7 @@ def test_pop_item_from_queue(self): def test_pop_item_from_queue_empty(self): self.assertEqual(0, self.scheduler.queue.qsize()) - with self.assertRaises(queues.errors.QueueEmptyError): + with self.assertRaises(QueueEmptyError): self.scheduler.pop_item_from_queue() def test_post_push(self): @@ -387,7 +389,7 @@ def test_disable_scheduler(self): # Scheduler should be disabled self.assertFalse(self.scheduler.is_enabled()) - with self.assertRaises(queues.errors.NotAllowedError): + with self.assertRaises(NotAllowedError): self.scheduler.push_item_to_queue(item) def test_enable_scheduler(self): diff --git a/mula/tests/integration/test_task_store.py b/mula/tests/integration/test_task_store.py index 7a733aba6a3..c672fc78557 100644 --- a/mula/tests/integration/test_task_store.py +++ b/mula/tests/integration/test_task_store.py @@ -4,6 +4,7 @@ from unittest import mock from scheduler import config, models, storage +from scheduler.storage import stores from tests.factories import OrganisationFactory from tests.utils import functions @@ -23,8 +24,8 @@ def setUp(self): self.mock_ctx.datastores = SimpleNamespace( **{ - storage.PriorityQueueStore.name: storage.PriorityQueueStore(self.dbconn), - storage.TaskStore.name: storage.TaskStore(self.dbconn), + stores.PriorityQueueStore.name: stores.PriorityQueueStore(self.dbconn), + stores.TaskStore.name: stores.TaskStore(self.dbconn), } ) diff --git a/mula/tests/mocks/listener.py b/mula/tests/mocks/listener.py index f69bbb01cce..a69942c665f 100644 --- a/mula/tests/mocks/listener.py +++ b/mula/tests/mocks/listener.py @@ -1,6 +1,6 @@ import time -from scheduler.connectors import listeners +from scheduler.clients.amqp import listeners class MockListener(listeners.Listener): diff --git a/mula/tests/mocks/queue.py b/mula/tests/mocks/queue.py index 426ae5a46b9..a7c50cc3acf 100644 --- a/mula/tests/mocks/queue.py +++ b/mula/tests/mocks/queue.py @@ -1,7 +1,8 @@ -from scheduler import models, queues +from scheduler import models +from scheduler.schedulers.queue import PriorityQueue from scheduler.utils import dict_utils -class MockPriorityQueue(queues.PriorityQueue): +class MockPriorityQueue(PriorityQueue): def create_hash(self, p_item: models.Task) -> str: return dict_utils.deep_get(p_item.model_dump(), ["data", "id"]) diff --git a/mula/tests/unit/test_queue.py b/mula/tests/unit/test_queue.py index a4aa478c541..2861d442257 100644 --- a/mula/tests/unit/test_queue.py +++ b/mula/tests/unit/test_queue.py @@ -6,7 +6,9 @@ import uuid from unittest import mock -from scheduler import config, models, queues, storage +from scheduler import config, models, storage +from scheduler.schedulers.queue import InvalidItemError, ItemNotFoundError, NotAllowedError, QueueEmptyError +from scheduler.storage import stores from tests.mocks import queue as mock_queue from tests.utils import functions @@ -22,7 +24,7 @@ def setUp(self) -> None: models.Base.metadata.drop_all(self.dbconn.engine) models.Base.metadata.create_all(self.dbconn.engine) - self.pq_store = storage.PriorityQueueStore(self.dbconn) + self.pq_store = stores.PriorityQueueStore(self.dbconn) # Priority Queue self.pq = mock_queue.MockPriorityQueue( @@ -50,7 +52,7 @@ def test_push(self): self.assertEqual(1, self.pq.qsize()) - @mock.patch("scheduler.storage.PriorityQueueStore.push") + @mock.patch("scheduler.storage.stores.PriorityQueueStore.push") def test_push_item_not_found_in_db(self, mock_push): """When adding an item to the priority queue, but the item is not found in the database, the item shouldn't be added. @@ -59,7 +61,7 @@ def test_push_item_not_found_in_db(self, mock_push): mock_push.return_value = None - with self.assertRaises(queues.errors.ItemNotFoundError): + with self.assertRaises(ItemNotFoundError): self.pq.push(item) self.assertEqual(0, self.pq.qsize()) @@ -73,7 +75,7 @@ def test_push_incorrect_item_type(self): """ item = {"priority": 1, "data": functions.TestModel(id=uuid.uuid4().hex, name=uuid.uuid4().hex)} - with self.assertRaises(queues.errors.InvalidItemError): + with self.assertRaises(InvalidItemError): self.pq.push(item) self.assertEqual(0, self.pq.qsize()) @@ -85,7 +87,7 @@ def test_push_invalid_item(self): item = functions.create_item(scheduler_id=self.pq.pq_id, priority=1) item.data = {"invalid": "data"} - with self.assertRaises(queues.errors.InvalidItemError): + with self.assertRaises(InvalidItemError): self.pq.push(item) self.assertEqual(0, self.pq.qsize()) @@ -104,7 +106,7 @@ def test_push_replace_not_allowed(self): self.assertEqual(1, self.pq.qsize()) # Add the same item again - with self.assertRaises(queues.errors.NotAllowedError): + with self.assertRaises(NotAllowedError): self.pq.push(initial_item) self.assertEqual(1, self.pq.qsize()) @@ -146,7 +148,7 @@ def test_push_updates_not_allowed(self): updated_item.data["name"] = "updated-name" # Add the same item again - with self.assertRaises(queues.errors.NotAllowedError): + with self.assertRaises(NotAllowedError): self.pq.push(updated_item) self.assertEqual(1, self.pq.qsize()) @@ -197,7 +199,7 @@ def test_push_priority_updates_not_allowed(self): updated_item.priority = 100 # Add the same item again - with self.assertRaises(queues.errors.NotAllowedError): + with self.assertRaises(NotAllowedError): self.pq.push(updated_item) self.assertEqual(1, self.pq.qsize()) @@ -474,7 +476,7 @@ def test_pop_queue_empty(self): """When popping an item from an empty queue, it should raise an exception. """ - with self.assertRaises(queues.errors.QueueEmptyError): + with self.assertRaises(QueueEmptyError): self.pq.pop() def test_pop_highest_priority(self): diff --git a/octopoes/bits/ask_port_specification/question_schema.json b/octopoes/bits/ask_port_specification/question_schema.json index e3226c96175..13077c14c08 100644 --- a/octopoes/bits/ask_port_specification/question_schema.json +++ b/octopoes/bits/ask_port_specification/question_schema.json @@ -30,6 +30,12 @@ "pattern": "^(\\s*(,*)[0-9]+,?\\s*)*$", "default": "1433,1434,3050,3306,5432" }, + "microsoft_rdp_ports": { + "description": "Comma separated list of (Microsoft) RDP ports", + "type": "string", + "pattern": "^(\\s*(,*)[0-9]+,?\\s*)*$", + "default": "3389" + }, "aggregate_findings": { "description": "Do you want to aggregate findings into one finding of the IP? Answer with true or false.", "type": "string", diff --git a/octopoes/bits/ask_url_params_to_ignore/__init__.py b/octopoes/bits/ask_url_params_to_ignore/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/octopoes/bits/ask_url_params_to_ignore/ask_url_params_to_ignore.py b/octopoes/bits/ask_url_params_to_ignore/ask_url_params_to_ignore.py new file mode 100644 index 00000000000..971f1d68993 --- /dev/null +++ b/octopoes/bits/ask_url_params_to_ignore/ask_url_params_to_ignore.py @@ -0,0 +1,17 @@ +import json +from collections.abc import Iterator +from pathlib import Path +from typing import Any + +from octopoes.models import OOI +from octopoes.models.ooi.network import Network +from octopoes.models.ooi.question import Question + + +def run(input_ooi: Network, additional_oois: list, config: dict[str, Any]) -> Iterator[OOI]: + network = input_ooi + + with (Path(__file__).parent / "question_schema.json").open() as f: + schema = json.load(f) + + yield Question(ooi=network.reference, schema_id=schema["$id"], json_schema=json.dumps(schema)) diff --git a/octopoes/bits/ask_url_params_to_ignore/bit.py b/octopoes/bits/ask_url_params_to_ignore/bit.py new file mode 100644 index 00000000000..f78db5af1ea --- /dev/null +++ b/octopoes/bits/ask_url_params_to_ignore/bit.py @@ -0,0 +1,10 @@ +from bits.definitions import BitDefinition +from octopoes.models.ooi.network import Network + +BIT = BitDefinition( + id="ask_url_params_to_ignore", + consumes=Network, + parameters=[], + min_scan_level=0, + module="bits.ask_url_params_to_ignore.ask_url_params_to_ignore", +) diff --git a/octopoes/bits/ask_url_params_to_ignore/question_schema.json b/octopoes/bits/ask_url_params_to_ignore/question_schema.json new file mode 100644 index 00000000000..a8b368d7474 --- /dev/null +++ b/octopoes/bits/ask_url_params_to_ignore/question_schema.json @@ -0,0 +1,17 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "/bit/oois-in-headers", + "type": "object", + "default": {}, + "required": [ + "ignored_url_parameters" + ], + "properties": { + "ignored_url_parameters": { + "description": "Comma separated list of url parameters that are ignored when following location headers.", + "type": "string", + "pattern": "^(\\s*(,*)[^,]+,?\\s*)*$", + "default": "session_id, phpsessid, jsessionid, cifd, cftoken, asp.net_sessionid" + } + } +} diff --git a/octopoes/bits/oois_in_headers/bit.py b/octopoes/bits/oois_in_headers/bit.py index eac7470aa10..70beea24333 100644 --- a/octopoes/bits/oois_in_headers/bit.py +++ b/octopoes/bits/oois_in_headers/bit.py @@ -2,5 +2,9 @@ from octopoes.models.types import HTTPHeader BIT = BitDefinition( - id="oois-in-headers", consumes=HTTPHeader, parameters=[], module="bits.oois_in_headers.oois_in_headers" + id="oois-in-headers", + consumes=HTTPHeader, + parameters=[], + module="bits.oois_in_headers.oois_in_headers", + config_ooi_relation_path="HTTPHeader.resource.website.hostname.network", ) diff --git a/octopoes/bits/oois_in_headers/oois_in_headers.py b/octopoes/bits/oois_in_headers/oois_in_headers.py index a6e67d15b08..cd49dd8dffb 100644 --- a/octopoes/bits/oois_in_headers/oois_in_headers.py +++ b/octopoes/bits/oois_in_headers/oois_in_headers.py @@ -1,7 +1,7 @@ import re from collections.abc import Iterator from typing import Any -from urllib.parse import urljoin, urlparse +from urllib.parse import parse_qs, urlencode, urljoin, urlparse, urlunparse from pydantic import ValidationError @@ -16,12 +16,33 @@ def is_url(input_str): return bool(result.scheme) +def get_ignored_url_params(config: dict, config_key: str, default: list) -> list[str]: + ignored_url_params = config.get(config_key) + if ignored_url_params is None: + return default + return [param.strip() for param in ignored_url_params.split(",")] if ignored_url_params else [] + + +def remove_ignored_params(url: str, ignored_params: list[str]) -> str: + parsed_url = urlparse(url) + query_params = parse_qs(parsed_url.query) + if not query_params: + return url + filtered_params = {k: v for k, v in query_params.items() if k.lower() not in ignored_params} + new_query = urlencode(filtered_params, doseq=True) + new_url = urlunparse( + (parsed_url.scheme, parsed_url.netloc, parsed_url.path, parsed_url.params, new_query, parsed_url.fragment) + ) + return new_url + + def run(input_ooi: HTTPHeader, additional_oois: list, config: dict[str, Any]) -> Iterator[OOI]: network = Network(name="internet") if input_ooi.key.lower() == "location": + ignored_url_params = get_ignored_url_params(config, "ignored_url_parameters", []) if is_url(input_ooi.value): - u = URL(raw=input_ooi.value, network=network.reference) + u = URL(raw=remove_ignored_params(input_ooi.value, ignored_url_params), network=network.reference) else: # url is not a url but a relative path http_url = input_ooi.reference.tokenized.resource.web_url diff --git a/octopoes/bits/port_classification_ip/port_classification_ip.py b/octopoes/bits/port_classification_ip/port_classification_ip.py index 7ac7fbbb4cc..1ff5ed39533 100644 --- a/octopoes/bits/port_classification_ip/port_classification_ip.py +++ b/octopoes/bits/port_classification_ip/port_classification_ip.py @@ -26,7 +26,6 @@ 21, # FTP 22, # SSH 23, # Telnet - 3389, # Remote Desktop 5900, # VNC ] DB_TCP_PORTS = [ @@ -36,6 +35,9 @@ 3306, # MySQL 5432, # PostgreSQL ] +MICROSOFT_RDP_PORTS = [ + 3389 # Microsoft Remote Desktop +] def get_ports_from_config(config, config_key, default): @@ -53,6 +55,7 @@ def run(input_ooi: IPPort, additional_oois: list, config: dict[str, Any]) -> Ite common_udp_ports = get_ports_from_config(config, "common_udp_ports", COMMON_UDP_PORTS) sa_tcp_ports = get_ports_from_config(config, "sa_tcp_ports", SA_TCP_PORTS) db_tcp_ports = get_ports_from_config(config, "db_tcp_ports", DB_TCP_PORTS) + microsoft_rdp_ports = get_ports_from_config(config, "microsoft_rdp_ports", MICROSOFT_RDP_PORTS) for ip_port in additional_oois: port = ip_port.port @@ -66,7 +69,8 @@ def run(input_ooi: IPPort, additional_oois: list, config: dict[str, Any]) -> Ite yield Finding( finding_type=open_sa_port.reference, ooi=ip_port.reference, - description=f"Port {port}/{protocol.value} is a system administrator port and should not be open.", + description=f"Port {port}/{protocol.value} is a system administrator port and " + f"should possibly not be open.", ) elif protocol == Protocol.TCP and port in db_tcp_ports: ft = KATFindingType(id="KAT-OPEN-DATABASE-PORT") @@ -79,6 +83,18 @@ def run(input_ooi: IPPort, additional_oois: list, config: dict[str, Any]) -> Ite ooi=ip_port.reference, description=f"Port {port}/{protocol.value} is a database port and should not be open.", ) + elif port in microsoft_rdp_ports: + open_rdp_port = KATFindingType(id="KAT-REMOTE-DESKTOP-PORT") + if aggregate_findings: + open_ports.append(ip_port.port) + else: + yield open_rdp_port + yield Finding( + finding_type=open_rdp_port.reference, + ooi=ip_port.reference, + description=f"Port {port}/{protocol.value} is a Microsoft Remote Desktop port and " + f"should possibly not be open.", + ) elif (protocol == Protocol.TCP and port not in common_tcp_ports) or ( protocol == Protocol.UDP and port not in common_udp_ports ): diff --git a/octopoes/octopoes/config/celery.py b/octopoes/octopoes/config/celery.py index b9894771081..b47a011f6d2 100644 --- a/octopoes/octopoes/config/celery.py +++ b/octopoes/octopoes/config/celery.py @@ -14,3 +14,5 @@ result_accept_content = ["application/json", "application/x-python-serialize"] task_queues = (Queue(QUEUE_NAME_OCTOPOES),) + +worker_concurrency = settings.workers diff --git a/octopoes/octopoes/config/settings.py b/octopoes/octopoes/config/settings.py index 16fce6969b6..5f6e9df343d 100644 --- a/octopoes/octopoes/config/settings.py +++ b/octopoes/octopoes/config/settings.py @@ -70,6 +70,8 @@ class Settings(BaseSettings): outgoing_request_timeout: int = Field(30, description="Timeout for outgoing HTTP requests") + workers: int = Field(4, description="Number of Octopoes Celery workers") + model_config = SettingsConfigDict(env_prefix="OCTOPOES_") @classmethod diff --git a/octopoes/octopoes/models/ooi/dns/records.py b/octopoes/octopoes/models/ooi/dns/records.py index 8d63cf70ed4..972b20e4a07 100644 --- a/octopoes/octopoes/models/ooi/dns/records.py +++ b/octopoes/octopoes/models/ooi/dns/records.py @@ -50,7 +50,7 @@ class DNSMXRecord(DNSRecord): object_type: Literal["DNSMXRecord"] = "DNSMXRecord" dns_record_type: Literal["MX"] = "MX" - mail_hostname: Reference | None = ReferenceField(Hostname, default=None) + mail_hostname: Reference | None = ReferenceField(Hostname, default=None, max_inherit_scan_level=1) preference: int | None = None _reverse_relation_names = {"hostname": "dns_mx_records", "mail_hostname": "mail_server_of"} diff --git a/octopoes/tests/integration/test_ooi_deletion.py b/octopoes/tests/integration/test_ooi_deletion.py index 0ca8d56586a..ba288ed942a 100644 --- a/octopoes/tests/integration/test_ooi_deletion.py +++ b/octopoes/tests/integration/test_ooi_deletion.py @@ -131,9 +131,9 @@ def test_events_created_in_worker_during_handling( xtdb_octopoes_service.process_event(event) xtdb_octopoes_service.commit() - assert len(event_manager.queue) == 7 # Handling OOI delete event triggers Origin delete event + assert len(event_manager.queue) == 8 # Handling OOI delete event triggers Origin delete event - event = event_manager.queue[6] # OOID]elete event + event = event_manager.queue[7] # OOIDelete event assert isinstance(event, OriginDBEvent) assert event.operation_type.value == "delete" @@ -229,7 +229,7 @@ def test_deletion_events_after_nxdomain( event_manager.complete_process_events(xtdb_octopoes_service) assert len(list(filter(lambda x: x.operation_type.value == "delete", event_manager.queue))) == 0 - assert xtdb_octopoes_service.ooi_repository.list_oois({OOI}, valid_time).count == 7 + assert xtdb_octopoes_service.ooi_repository.list_oois({OOI}, valid_time).count == 8 nxd = NXDOMAIN(hostname=hostname.reference) xtdb_octopoes_service.ooi_repository.save(nxd, valid_time) @@ -250,7 +250,7 @@ def test_deletion_events_after_nxdomain( event_manager.complete_process_events(xtdb_octopoes_service) assert len(list(filter(lambda x: x.operation_type.value == "delete", event_manager.queue))) >= 3 - assert xtdb_octopoes_service.ooi_repository.list_oois({OOI}, valid_time).count == 5 + assert xtdb_octopoes_service.ooi_repository.list_oois({OOI}, valid_time).count == 6 @pytest.mark.xfail(reason="Wappalyzer works on wrong input objects (to be addressed)") diff --git a/octopoes/tests/test_bit_ports.py b/octopoes/tests/test_bit_ports.py index 09c9a86c483..4efbc3b065d 100644 --- a/octopoes/tests/test_bit_ports.py +++ b/octopoes/tests/test_bit_ports.py @@ -29,7 +29,7 @@ def test_port_classification_tcp_22(): assert len(results) == 2 finding = results[-1] assert isinstance(finding, Finding) - assert finding.description == "Port 22/tcp is a system administrator port and should not be open." + assert finding.description == "Port 22/tcp is a system administrator port and should possibly not be open." def test_port_classification_tcp_5432(): diff --git a/rocky/assets/css/components/system-tag.scss b/rocky/assets/css/components/system-tag.scss new file mode 100644 index 00000000000..3f7caab05eb --- /dev/null +++ b/rocky/assets/css/components/system-tag.scss @@ -0,0 +1,43 @@ +.system-tag, +span.system-tag { + background-color: var(--colors-white); + border: 2px solid; + border-radius: var(--border-radius-xl); + padding: var(--spacing-grid-025) var(--spacing-grid-100); + box-sizing: border-box; + width: auto; + + &::before { + content: none; + } + + &.color-1 { + color: var(--colors-blue-600); + border-color: var(--colors-blue-600); + } + + &.color-2 { + color: var(--colors-green-600); + border-color: var(--colors-green-600); + } + + &.color-3 { + color: var(--colors-ochre-500); + border-color: var(--colors-ochre-500); + } + + &.color-4 { + color: var(--colors-orange-600); + border-color: var(--colors-orange-600); + } + + &.color-5 { + color: var(--colors-red-600); + border-color: var(--colors-red-600); + } + + &.color-6 { + color: var(--colors-purrple-600); + border-color: var(--colors-purrple-600); + } +} diff --git a/rocky/assets/css/main.scss b/rocky/assets/css/main.scss index 4fb860423c0..1dd5615f43f 100644 --- a/rocky/assets/css/main.scss +++ b/rocky/assets/css/main.scss @@ -67,6 +67,7 @@ @import "components/sticky"; @import "components/sticky-column"; @import "components/state-tags"; +@import "components/system-tag"; @import "components/table"; @import "components/toggle"; @import "components/toolbar"; diff --git a/rocky/assets/css/themes/soft/fundamentals/border-radii.scss b/rocky/assets/css/themes/soft/fundamentals/border-radii.scss index 0ac5c9d7060..547a6f6d8bb 100644 --- a/rocky/assets/css/themes/soft/fundamentals/border-radii.scss +++ b/rocky/assets/css/themes/soft/fundamentals/border-radii.scss @@ -20,6 +20,10 @@ border-radius: var(--border-radius-l); } +.border-radius-xl { + border-radius: var(--border-radius-xl); +} + .border-radius-round { border-radius: var(--border-radius-round); } diff --git a/rocky/assets/js/reportActionForms.js b/rocky/assets/js/reportActionForms.js index 3f2523cc5a4..42a5dbc5a65 100644 --- a/rocky/assets/js/reportActionForms.js +++ b/rocky/assets/js/reportActionForms.js @@ -7,7 +7,7 @@ export function renderRenameSelection(modal, selection) { references.push(input_element.value); }); - let table_element = document.getElementById("rename-table"); + let table_element = document.getElementById("report-name-table"); let table_body = table_element.querySelector("tbody"); let table_row = table_element.querySelector("tr.rename-table-row"); diff --git a/rocky/components/modal/README.md b/rocky/components/modal/README.md index b6f996e6dc4..ec49e5030a8 100644 --- a/rocky/components/modal/README.md +++ b/rocky/components/modal/README.md @@ -10,7 +10,7 @@ This outlines the basic usages and provides a code block example below, of how t ### Instantiate -First you need to add `{% load component_tags %}` at the top of your template. Next you need to add the following code block at the bottom, to include the corresponding JS (if you haven't already you also need to add `{% load compress %}`). +First you need to add `{% load component_tags %}` at the top of your template. Next you need to add the following code block at the bottom, to include the corresponding JS (if you haven't already you also need to add `{% load compress %}` and `{% load static %}`). ``` {% block html_at_end_body %} @@ -58,7 +58,7 @@ Including `{% component_css_dependencies %}` is needed to inject the reference t {% fill "content" %}
diff --git a/rocky/reports/forms.py b/rocky/reports/forms.py index 265d0d15e27..e9a6e58815e 100644 --- a/rocky/reports/forms.py +++ b/rocky/reports/forms.py @@ -38,6 +38,16 @@ class ReportScheduleStartDateChoiceForm(BaseRockyForm): ) +class ReportRecurrenceChoiceForm(BaseRockyForm): + choose_recurrence = forms.ChoiceField( + label="", + required=False, + widget=forms.RadioSelect(attrs={"class": "submit-on-click"}), + choices=(("once", _("No, just once")), ("repeat", _("Yes, repeat"))), + initial="once", + ) + + class ReportScheduleStartDateForm(BaseRockyForm): start_date = forms.DateField( label=_("Start date"), @@ -47,18 +57,14 @@ class ReportScheduleStartDateForm(BaseRockyForm): input_formats=["%Y-%m-%d"], ) - -class ReportRecurrenceChoiceForm(BaseRockyForm): - choose_recurrence = forms.ChoiceField( - label="", - required=False, - widget=forms.RadioSelect(attrs={"class": "submit-on-click"}), - choices=(("once", _("No, just once")), ("repeat", _("Yes, repeat"))), - initial="once", + start_time = forms.TimeField( + label=_("Start time (UTC)"), + widget=forms.TimeInput(format="%H:%M", attrs={"form": "generate_report"}), + initial=lambda: datetime.now(tz=timezone.utc).time(), + required=True, + input_formats=["%H:%M"], ) - -class ReportScheduleRecurrenceForm(BaseRockyForm): recurrence = forms.ChoiceField( label=_("Recurrence"), required=True, @@ -66,6 +72,17 @@ class ReportScheduleRecurrenceForm(BaseRockyForm): choices=[("daily", _("Daily")), ("weekly", _("Weekly")), ("monthly", _("Monthly")), ("yearly", _("Yearly"))], ) + def clean(self): + cleaned_data = super().clean() + start_date = cleaned_data.get("start_date") + start_time = cleaned_data.get("start_time") + + if start_date and start_time: + start_datetime = datetime.combine(start_date, start_time) + cleaned_data["start_datetime"] = start_datetime + + return cleaned_data + class CustomReportScheduleForm(BaseRockyForm): start_date = forms.DateField( diff --git a/rocky/reports/report_types/definitions.py b/rocky/reports/report_types/definitions.py index 112d6807c80..ec7c4be9ed6 100644 --- a/rocky/reports/report_types/definitions.py +++ b/rocky/reports/report_types/definitions.py @@ -17,6 +17,11 @@ class ReportPlugins(TypedDict): optional: set[str] +class SubReportPlugins(TypedDict): + required: list[str] + optional: list[str] + + def report_plugins_union(report_types: list[type["BaseReport"]]) -> ReportPlugins: """Take the union of the required and optional plugin sets and remove optional plugins that are required""" diff --git a/rocky/reports/report_types/findings_report/report.html b/rocky/reports/report_types/findings_report/report.html index 35cfa3782f7..6eecba51e50 100644 --- a/rocky/reports/report_types/findings_report/report.html +++ b/rocky/reports/report_types/findings_report/report.html @@ -81,5 +81,5 @@{% translate "No findings have been found." %}
+{% translate "No findings have been identified yet." %}
{% endif %} diff --git a/rocky/reports/templates/forms/report_form_fields.html b/rocky/reports/templates/forms/report_form_fields.html index 3e7df3310eb..f8f6d645d3c 100644 --- a/rocky/reports/templates/forms/report_form_fields.html +++ b/rocky/reports/templates/forms/report_form_fields.html @@ -18,7 +18,7 @@ {% endfor %} {% endif %} {% for required_optional_plugin, plugins_ in plugins.items %} - {% for plugin in plugins_ %}{% endfor %} + {% for plugin in plugins_ %}{% endfor %} {% endfor %} {% if request.POST.choose_recurrence %} {% translate "Report schedule" %} {% include "partials/form/fieldset.html" with fields=report_schedule_form_recurrence_choice %} {% if is_scheduled_report %} ++ {% blocktranslate trimmed %} + Please choose a start date, time and recurrence for scheduling your report(s). + If you select a date on the 28th-31st of the month, it will always be scheduled on the last day of the month. + {% endblocktranslate %} +
{% blocktranslate trimmed %} The date you select will be the reference date for the data set for your report. @@ -28,14 +34,8 @@