Skip to content

Commit

Permalink
Add software tests for JobStore
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Dec 2, 2023
1 parent ec14081 commit dc6a828
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 21 deletions.
18 changes: 18 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,24 @@ jobs:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}

# https://docs.github.com/en/actions/using-containerized-services/about-service-containers
services:
cratedb:
image: crate/crate:nightly
ports:
- 4200:4200
postgres:
image: postgres
ports:
- 5432:5432
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
name: "
Python ${{ matrix.python-version }} on OS ${{ matrix.os }}
"
Expand Down
42 changes: 27 additions & 15 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import icecream
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
Expand Down Expand Up @@ -27,7 +28,9 @@

class Supertask:

def __init__(self):
def __init__(self, job_store_address: str, pre_delete_jobs: bool = False):
self.job_store_address = job_store_address
self.pre_delete_jobs = pre_delete_jobs
self.scheduler: BackgroundScheduler = None
self.configure()

Expand All @@ -38,16 +41,20 @@ def configure(self):
logger.info("Configuring scheduler")

# Initialize a job store.
# job_store = MemoryJobStore()
# job_store = SQLAlchemyJobStore(url="postgresql://postgres@localhost", engine_options={"echo": True})
# job_store = CrateDBMongoDBJobStore(dburi="crate://localhost")
job_store = CrateDBSQLAlchemyJobStore(url="crate://localhost/", engine_options={"echo": True})

# TODO: Only in sandbox mode, to have a fresh database canvas.
try:
job_store.remove_all_jobs()
except:
pass
if self.job_store_address.startswith("memory://"):
job_store = MemoryJobStore()
elif self.job_store_address.startswith("postgresql://"):
job_store = SQLAlchemyJobStore(url=self.job_store_address, engine_options={"echo": True})
elif self.job_store_address.startswith("crate://"):
job_store = CrateDBSQLAlchemyJobStore(url=self.job_store_address, engine_options={"echo": True})
else:
raise RuntimeError(f"Initializing job store failed. Unknown address: {self.job_store_address}")

if self.pre_delete_jobs:
try:
job_store.remove_all_jobs()
except:
pass

job_defaults = {
'coalesce': False,
Expand Down Expand Up @@ -140,13 +147,18 @@ def run_server():
return self


def main():
def run_supertask(job_store_address: str, pre_delete_jobs: bool = False):
setup_logging()
st = Supertask()
st = Supertask(job_store_address=job_store_address, pre_delete_jobs=pre_delete_jobs)
st.seed_jobs()
st.start()
st.wait()
return st


if __name__ == "__main__":
main()
# TODO: Use only in sandbox mode, to have a fresh database canvas.
pre_delete_jobs = True
#main(job_store_address="memory://", pre_delete_jobs=pre_delete_jobs)
#main(job_store_address="postgresql://postgres@localhost", pre_delete_jobs=pre_delete_jobs)
st = run_supertask(job_store_address="crate://localhost", pre_delete_jobs=pre_delete_jobs)
st.wait()
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ validate-pyproject<0.16

# Software tests
httpx<0.26
psycopg2-binary<3
pytest<8
pytest-cov<5
pytest-mock<4
2 changes: 1 addition & 1 deletion test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_read_cronjobs():
},
{
"crontab": "* * * * *",
"enabled": False,
"enabled": True,
"id": 1,
"job": "job3",
"last_run": None,
Expand Down
3 changes: 2 additions & 1 deletion test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ def test_my_job(mocker, capsys):
mocker.patch("time.sleep")
my_job("foo")
out, err = capsys.readouterr()
assert "DONE" in err
assert "JOB-START" in err
assert "JOB-FINISH" in err
31 changes: 27 additions & 4 deletions test_main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,35 @@
from main import main
import pytest
import sqlalchemy as sa

from main import run_supertask

def test_main(mocker, caplog):
mocker.patch("main.Supertask.wait")

def check_store(address: str):
engine: sa.engine.Engine = sa.create_engine(url=address)
try:
with engine.connect() as conn:
conn.execute(sa.text("SELECT 1;"))
except sa.exc.OperationalError as ex:
if "No more Servers available" in str(ex):
raise pytest.skip(f"Skipping test case, because job store is not available: {address}") from ex


@pytest.mark.parametrize(
"job_store_address", ["memory://", "postgresql://postgres:postgres@localhost", "crate://crate@localhost"]
)
def test_run_supertask(mocker, caplog, job_store_address):
if not job_store_address.startswith("memory://"):
check_store(job_store_address)
mocker.patch("main.Supertask.start_http_service")
main()
run_supertask(job_store_address, pre_delete_jobs=True)
assert "Configuring scheduler" in caplog.messages
assert "Seeding jobs" in caplog.messages
assert "Adding job tentatively -- it will be properly scheduled when the scheduler starts" in caplog.messages
assert "Starting scheduler" in caplog.messages
assert 'Added job "my_job" to job store "default"' in caplog.messages


def test_run_supertask_unknown():
with pytest.raises(RuntimeError) as ex:
run_supertask("foo://")
assert ex.match("Initializing job store failed. Unknown address: foo://")

0 comments on commit dc6a828

Please sign in to comment.