From d141f8304317b0ad63e7c49a52bfe3200ffe5e9d Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sat, 2 Dec 2023 04:46:04 +0100 Subject: [PATCH] Add software tests for JobStore --- .github/workflows/main.yml | 18 ++++++++++++++++ main.py | 42 ++++++++++++++++++++++++-------------- requirements-dev.txt | 1 + test_http.py | 2 +- test_job.py | 3 ++- test_main.py | 31 ++++++++++++++++++++++++---- 6 files changed, 76 insertions(+), 21 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index d6a2cb0..9f8b703 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -33,6 +33,24 @@ jobs: OS: ${{ matrix.os }} PYTHON: ${{ matrix.python-version }} + # https://docs.github.com/en/actions/using-containerized-services/about-service-containers + services: + cratedb: + image: crate/crate:nightly + ports: + - 4200:4200 + postgres: + image: postgres + ports: + - 5432:5432 + env: + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + name: " Python ${{ matrix.python-version }} on OS ${{ matrix.os }} " diff --git a/main.py b/main.py index 6c6a56c..4a1c9c8 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,6 @@ import logging import icecream +from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor @@ -27,7 +28,9 @@ class Supertask: - def __init__(self): + def __init__(self, job_store_address: str, pre_delete_jobs: bool = False): + self.job_store_address = job_store_address + self.pre_delete_jobs = pre_delete_jobs self.scheduler: BackgroundScheduler = None self.configure() @@ -38,16 +41,20 @@ def configure(self): logger.info("Configuring scheduler") # Initialize a job store. - # job_store = MemoryJobStore() - # job_store = SQLAlchemyJobStore(url="postgresql://postgres@localhost", engine_options={"echo": True}) - # job_store = CrateDBMongoDBJobStore(dburi="crate://localhost") - job_store = CrateDBSQLAlchemyJobStore(url="crate://localhost/", engine_options={"echo": True}) - - # TODO: Only in sandbox mode, to have a fresh database canvas. - try: - job_store.remove_all_jobs() - except: - pass + if self.job_store_address.startswith("memory://"): + job_store = MemoryJobStore() + elif self.job_store_address.startswith("postgresql://"): + job_store = SQLAlchemyJobStore(url=self.job_store_address, engine_options={"echo": True}) + elif self.job_store_address.startswith("crate://"): + job_store = CrateDBSQLAlchemyJobStore(url=self.job_store_address, engine_options={"echo": True}) + else: + raise RuntimeError(f"Initializing job store failed. Unknown address: {self.job_store_address}") + + if self.pre_delete_jobs: + try: + job_store.remove_all_jobs() + except: + pass job_defaults = { 'coalesce': False, @@ -140,13 +147,18 @@ def run_server(): return self -def main(): +def run_supertask(job_store_address: str, pre_delete_jobs: bool = False): setup_logging() - st = Supertask() + st = Supertask(job_store_address=job_store_address, pre_delete_jobs=pre_delete_jobs) st.seed_jobs() st.start() - st.wait() + return st if __name__ == "__main__": - main() + # TODO: Use only in sandbox mode, to have a fresh database canvas. + pre_delete_jobs = True + #main(job_store_address="memory://", pre_delete_jobs=pre_delete_jobs) + #main(job_store_address="postgresql://postgres@localhost", pre_delete_jobs=pre_delete_jobs) + st = run_supertask(job_store_address="crate://localhost", pre_delete_jobs=pre_delete_jobs) + st.wait() diff --git a/requirements-dev.txt b/requirements-dev.txt index 3522e0d..fa92f36 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -8,6 +8,7 @@ validate-pyproject<0.16 # Software tests httpx<0.26 +psycopg2-binary<3 pytest<8 pytest-cov<5 pytest-mock<4 diff --git a/test_http.py b/test_http.py index 7c4a3a8..217c099 100644 --- a/test_http.py +++ b/test_http.py @@ -46,7 +46,7 @@ def test_read_cronjobs(): }, { "crontab": "* * * * *", - "enabled": False, + "enabled": True, "id": 1, "job": "job3", "last_run": None, diff --git a/test_job.py b/test_job.py index 00b55b4..7410e1b 100644 --- a/test_job.py +++ b/test_job.py @@ -6,4 +6,5 @@ def test_my_job(mocker, capsys): mocker.patch("time.sleep") my_job("foo") out, err = capsys.readouterr() - assert "DONE" in err + assert "JOB-START" in err + assert "JOB-FINISH" in err diff --git a/test_main.py b/test_main.py index b2e4199..33c2fb6 100644 --- a/test_main.py +++ b/test_main.py @@ -1,12 +1,35 @@ -from main import main +import pytest +import sqlalchemy as sa +from main import run_supertask -def test_main(mocker, caplog): - mocker.patch("main.Supertask.wait") + +def check_store(address: str): + engine: sa.engine.Engine = sa.create_engine(url=address) + try: + with engine.connect() as conn: + conn.execute(sa.text("SELECT 1;")) + except sa.exc.OperationalError as ex: + if "No more Servers available" in str(ex): + raise pytest.skip(f"Skipping test case, because job store is not available: {address}") from ex + + +@pytest.mark.parametrize( + "job_store_address", ["memory://", "postgresql://postgres:postgres@localhost", "crate://crate@localhost"] +) +def test_run_supertask(mocker, caplog, job_store_address): + if not job_store_address.startswith("memory://"): + check_store(job_store_address) mocker.patch("main.Supertask.start_http_service") - main() + run_supertask(job_store_address, pre_delete_jobs=True) assert "Configuring scheduler" in caplog.messages assert "Seeding jobs" in caplog.messages assert "Adding job tentatively -- it will be properly scheduled when the scheduler starts" in caplog.messages assert "Starting scheduler" in caplog.messages assert 'Added job "my_job" to job store "default"' in caplog.messages + + +def test_run_supertask_unknown(): + with pytest.raises(RuntimeError) as ex: + run_supertask("foo://") + assert ex.match("Initializing job store failed. Unknown address: foo://")