diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 9f8b703..0cd6cd0 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -78,13 +78,12 @@ jobs: pip install "setuptools>=64" --upgrade # Install package in editable mode. - # pip install --use-pep517 --prefer-binary --editable=.[test,develop] - pip install -r requirements.txt -r requirements-dev.txt + pip install --prefer-binary -r requirements.txt -r requirements-dev.txt + pip install --prefer-binary --editable=.[test,develop] - name: Run linter and software tests run: | - # poe check - poe test + poe check - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/README.md b/README.md index 3d58122..2bf08e1 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,8 @@ pinning is strongly recommended, especially when you use it as a library. ## Setup ```shell -pip install -r requirements.txt -r requirements-dev.txt +pip install --upgrade --prefer-binary -r requirements.txt -r requirements-dev.txt +pip install --upgrade --prefer-binary --editable=. ``` @@ -101,7 +102,7 @@ documentation]. [APScheduler]: https://pypi.org/project/APScheduler/ [cron]: https://en.wikipedia.org/wiki/Cron -[development sandbox documentation]: https://github.com/WalBeh/scheduler-playground/blob/main/docs/sandbox.md +[development sandbox documentation]: https://github.com/WalBeh/scheduler-playground/blob/master/docs/sandbox.md [DWIM]: https://en.wikipedia.org/wiki/DWIM [FastAPI]: https://pypi.org/project/fastapi/ [Pydantic]: https://pypi.org/project/pydantic/ diff --git a/cli.py b/cli.py index 79cb994..1e62341 100644 --- a/cli.py +++ b/cli.py @@ -1,12 +1,12 @@ import click -from dotenv import load_dotenv, find_dotenv +from dotenv import find_dotenv, load_dotenv from core import Supertask from job_seeder import JobSeeder from models import JobStoreLocation from util import setup_logging - +# TODO: Gate behind an environment variable? load_dotenv(find_dotenv()) diff --git a/core.py b/core.py index 6a8b0d7..89e4401 100644 --- a/core.py +++ b/core.py @@ -1,23 +1,24 @@ import logging +import os +import threading +import time +import typing as t + import icecream +import pytz +import uvicorn +from apscheduler.executors.pool import ProcessPoolExecutor, ThreadPoolExecutor +from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.jobstores.memory import MemoryJobStore -from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor -from icecream import ic -import typing as t -import time -import uvicorn -import os -import pytz -import threading +from fastapi import FastAPI from halo import Halo +from icecream import ic +from cronjob_routes import router as cronjob_router +from jobstore_sqlalchemy import CrateDBSQLAlchemyJobStore from models import JobStoreLocation from settings import Settings -from jobstore_sqlalchemy import CrateDBSQLAlchemyJobStore -from fastapi import FastAPI -from cronjob_routes import router as cronjob_router logger = logging.getLogger(__name__) @@ -25,11 +26,15 @@ class Supertask: - SQLALCHEMY_ECHO = False - def __init__(self, store: t.Union[JobStoreLocation, str], pre_delete_jobs: bool = False, pre_seed_jobs: str = None, debug: bool = False): - + def __init__( + self, + store: t.Union[JobStoreLocation, str], + pre_delete_jobs: bool = False, + pre_seed_jobs: str = None, + debug: bool = False, + ): # Bundle settings to be able to propagate them to the FastAPI subsystem. if isinstance(store, str): store = JobStoreLocation(address=store) @@ -58,36 +63,35 @@ def configure(self): # TODO: Need to run `CREATE SCHEMA ...` before using it? job_store = SQLAlchemyJobStore(url=address, tablename=table, engine_options={"echo": self.SQLALCHEMY_ECHO}) elif address.startswith("crate://"): - job_store = CrateDBSQLAlchemyJobStore(url=address, tableschema=schema, tablename=table, engine_options={"echo": self.SQLALCHEMY_ECHO}) + job_store = CrateDBSQLAlchemyJobStore( + url=address, tableschema=schema, tablename=table, engine_options={"echo": self.SQLALCHEMY_ECHO} + ) else: raise RuntimeError(f"Initializing job store failed. Unknown address: {address}") if self.settings.pre_delete_jobs: try: job_store.remove_all_jobs() - except: + except Exception: # noqa: S110 pass - job_defaults = { - 'coalesce': False, - 'max_instances': 1 - } - executors = { - 'default': ThreadPoolExecutor(20), - 'processpool': ProcessPoolExecutor(5) - } + job_defaults = {"coalesce": False, "max_instances": 1} + executors = {"default": ThreadPoolExecutor(20), "processpool": ProcessPoolExecutor(5)} job_stores = { - 'default': job_store, + "default": job_store, } # Create a timezone object for Vienna - timezone = pytz.timezone('Europe/Vienna') - self.scheduler = BackgroundScheduler(executors=executors, job_defaults=job_defaults, jobstores=job_stores, timezone=timezone) - logger.info(f"Configured scheduler: " - f"executors={self.scheduler._executors}, " - f"jobstores={self.scheduler._jobstores}, " - f"timezone={self.scheduler.timezone}" - ) + timezone = pytz.timezone("Europe/Vienna") + self.scheduler = BackgroundScheduler( + executors=executors, job_defaults=job_defaults, jobstores=job_stores, timezone=timezone + ) + logger.info( + f"Configured scheduler: " + f"executors={self.scheduler._executors}, " + f"jobstores={self.scheduler._jobstores}, " + f"timezone={self.scheduler.timezone}" + ) return self def start(self, listen_http: str = None): @@ -100,7 +104,7 @@ def start_scheduler(self): logger.info("Starting scheduler") self.scheduler.start() start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - ic('//======= START ======', start) + ic("//======= START ======", start) # Get next run time for all jobs jobs = self.scheduler.get_jobs() @@ -109,8 +113,8 @@ def start_scheduler(self): return self def wait(self): - print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) - spinner = Halo(text='Waiting', spinner='dots') + print("Press Ctrl+{0} to exit".format("Break" if os.name == "nt" else "C")) # noqa: T201 + spinner = Halo(text="Waiting", spinner="dots") spinner.start() try: # This is here to simulate application activity (which keeps the main thread alive). @@ -122,7 +126,6 @@ def wait(self): return self def start_http_service(self, listen_http: str): - host, port_str = listen_http.split(":") port = int(port_str) @@ -134,7 +137,6 @@ def start_http_service(self, listen_http: str): app.dependency_overrides[Settings] = lambda: self.settings app.include_router(cronjob_router) - #app.include_router(cronjob_router, dependencies=[Depends(JsonResourceWithAddress(self.pre_seed_jobs))]) def run_server(): uvicorn.run(app, host=host, port=port) diff --git a/cronjob_routes.py b/cronjob_routes.py index 96606c5..3505232 100644 --- a/cronjob_routes.py +++ b/cronjob_routes.py @@ -1,13 +1,17 @@ +# ruff: noqa: B008 +import logging import typing as t -from fastapi import APIRouter, HTTPException, Depends, Form -from fastapi.templating import Jinja2Templates +import fastapi.responses +from fastapi import APIRouter, Depends, Form, HTTPException, Request from fastapi.responses import HTMLResponse -from fastapi import Request +from fastapi.templating import Jinja2Templates -from settings import Settings -from models import CronJob from database import JsonResource +from models import CronJob +from settings import Settings + +logger = logging.getLogger(__name__) router = APIRouter() templates = Jinja2Templates(directory="templates") @@ -18,6 +22,11 @@ def get_json_resource(settings: Settings = Depends()) -> JsonResource: FastAPI Dependency to provide a JsonResource instance to the request handlers. """ from database import JsonResource + + if settings.pre_seed_jobs is None: + msg = "No web UI without pre-seed file" + logger.error(msg) + raise fastapi.exceptions.HTTPException(status_code=400, detail=msg) return JsonResource(filepath=settings.pre_seed_jobs) @@ -26,19 +35,27 @@ async def jobs_page(request: Request, json_resource: JsonResource = Depends(get_ jobs = json_resource.read_index() return templates.TemplateResponse("jobs.html", {"request": request, "jobs": jobs}) + @router.post("/cronjobs/", response_model=CronJob) -def create_cronjob(crontab: str = Form(...), job: str = Form(...), enabled: bool = Form(False), json_resource: JsonResource = Depends(get_json_resource)): +def create_cronjob( + crontab: str = Form(...), + job: str = Form(...), + enabled: bool = Form(False), + json_resource: JsonResource = Depends(get_json_resource), +): db = json_resource.read() - cronjob = CronJob(id=len(db)+1, crontab=crontab, job=job, enabled=enabled, last_run=None, last_status=None) + cronjob = CronJob(id=len(db) + 1, crontab=crontab, job=job, enabled=enabled, last_run=None, last_status=None) db.append(cronjob) json_resource.write(db) return cronjob + @router.get("/cronjobs/", response_model=t.List[CronJob]) def read_cronjobs(json_resource: JsonResource = Depends(get_json_resource)): db = json_resource.read() return db + @router.get("/cronjobs/{cronjob_id}", response_model=CronJob) def read_cronjob(cronjob_id: int, json_resource: JsonResource = Depends(get_json_resource)): db = json_resource.read() @@ -46,6 +63,7 @@ def read_cronjob(cronjob_id: int, json_resource: JsonResource = Depends(get_json raise HTTPException(status_code=404, detail="CronJob not found") return db[cronjob_id] + @router.put("/cronjobs/{cronjob_id}", response_model=CronJob) def update_cronjob(cronjob_id: int, cronjob: CronJob, json_resource: JsonResource = Depends(get_json_resource)): db = json_resource.read() @@ -55,6 +73,7 @@ def update_cronjob(cronjob_id: int, cronjob: CronJob, json_resource: JsonResourc json_resource.write(db) return cronjob + @router.delete("/cronjobs/{cronjob_id}", response_model=CronJob) def delete_cronjob(cronjob_id: int, json_resource: JsonResource = Depends(get_json_resource)): db = json_resource.read() diff --git a/database.py b/database.py index 418f2ba..cf8f91f 100644 --- a/database.py +++ b/database.py @@ -1,41 +1,45 @@ import json +import typing as t from pueblo.io import to_io from models import CronJob # Sample data storex -cronjobs_db = [] -# # Sample data store -# sample_cronjobs_data = [ -# {"cron": "*/5 * * * *", "job": "job1", "enabled": True}, -# {"cron": "0 * * * *", "job": "job2", "enabled": True}, -# {"cron": "*/15 * * * *", "job": "job3", "enabled": False}, -# ] +cronjobs_db: t.List[CronJob] = [] -# cronjobs_db = [CronJob(**cronjob) for cronjob in sample_cronjobs_data] +""" +# Sample data store +sample_cronjobs_data = [ + {"cron": "*/5 * * * *", "job": "job1", "enabled": True}, + {"cron": "0 * * * *", "job": "job2", "enabled": True}, + {"cron": "*/15 * * * *", "job": "job3", "enabled": False}, +] + +cronjobs_db = [CronJob(**cronjob) for cronjob in sample_cronjobs_data] +""" -class JsonResource: +class JsonResource: def __init__(self, filepath: str): self.filepath = filepath def read_index(self): - with open(self.filepath, 'r') as f: + with open(self.filepath, "r") as f: cronjobs_data = json.load(f) return [CronJob(**job) for job in cronjobs_data] def read(self): - with to_io(self.filepath, 'r') as f: + with to_io(self.filepath, "r") as f: cronjobs_data = json.load(f) for cronjob in cronjobs_data: - if 'id' in cronjob: - del cronjob['id'] + if "id" in cronjob: + del cronjob["id"] cronjobs_db = [CronJob(id=i, **cronjob) for i, cronjob in enumerate(cronjobs_data)] return cronjobs_db def write(self, db): cronjobs_data = [cronjob.dict() for cronjob in db] - with open(self.filepath, 'w') as f: + with open(self.filepath, "w") as f: json.dump(cronjobs_data, f) diff --git a/job_seeder.py b/job_seeder.py index 6b40fbc..546d6a2 100644 --- a/job_seeder.py +++ b/job_seeder.py @@ -1,7 +1,7 @@ +import datetime as dt import logging import os import time -import datetime as dt from apscheduler.schedulers.base import BaseScheduler from icecream import ic @@ -15,7 +15,6 @@ class JobSeeder: - def __init__(self, source: str, scheduler: BaseScheduler, start_observer: bool = False): self.source = source self.scheduler = scheduler @@ -29,9 +28,19 @@ def seed_jobs(self): if cronjob.enabled: ic(cronjob) minute, hour, day, month, day_of_week = cronjob.crontab.split() - self.scheduler.add_job(my_job, 'cron', minute=minute, hour=hour, day=day, month=month, - day_of_week=day_of_week, id=str(cronjob.id), jobstore='default', args=[cronjob.job], - max_instances=4) + self.scheduler.add_job( + my_job, + "cron", + minute=minute, + hour=hour, + day=day, + month=month, + day_of_week=day_of_week, + id=str(cronjob.id), + jobstore="default", + args=[cronjob.job], + max_instances=4, + ) return self def start_filesystem_observer(self): @@ -46,52 +55,72 @@ def start_filesystem_observer(self): return self +# ruff: noqa: ERA001 class FileChangeHandler(FileSystemEventHandler): # pragma: nocover - def __init__(self, source: str, scheduler: BaseScheduler): - self.source = source - self.scheduler = scheduler - self.last_modified = time.time() - - def on_modified(self, event): - if time.time() - self.last_modified < 1: - return - - self.last_modified = time.time() - - if not event.is_directory and event.src_path.endswith(self.source): - # Load jobs from cronjobs.json - ic("FILE CHANGED") - cronjobs = JsonResource(self.source).read() - cronjob_ids = [str(cronjob.id) for cronjob in cronjobs] - ic(cronjob_ids) - - # Get all existing jobs - existing_jobs = self.scheduler.get_jobs() - ic(existing_jobs) - - # Remove jobs that are not in cronjobs.json - for job in existing_jobs: - #ic("check-removale", job.id) - if job.id not in cronjob_ids: - ic("REMOVE: ", job.id) - self.scheduler.remove_job(job.id) - - # Add jobs that are not in cronjobs.json - for cronjob in cronjobs: - #ic("check-add", cronjob.id) - existing_job_ids = [job.id for job in existing_jobs] - #ic(existing_job_ids) - if cronjob.enabled and str(cronjob.id) not in existing_job_ids: - #ic("ADD: ", cronjob.id) - minute, hour, day, month, day_of_week = cronjob.crontab.split() - job = self.scheduler.add_job(my_job, 'cron', minute=minute, hour=hour, day=day, month=month, day_of_week=day_of_week, id=str(cronjob.id), jobstore='default', args=[cronjob.job]) - next_run_time = job.trigger.get_next_fire_time(None, dt.datetime.now()) - ic("ADDED: ", cronjob.job, next_run_time) - - # Reschedule existing jobs - for cronjob in cronjobs: - if cronjob.enabled: - #ic(cronjob.id) - minute, hour, day, month, day_of_week = cronjob.crontab.split() - job = self.scheduler.reschedule_job(str(cronjob.id), trigger='cron', minute=minute, hour=hour, day=day, month=month, day_of_week=day_of_week) - ic("RESCHED: ", cronjob.job, job.next_run_time) + def __init__(self, source: str, scheduler: BaseScheduler): + self.source = source + self.scheduler = scheduler + self.last_modified = time.time() + + def on_modified(self, event): + if time.time() - self.last_modified < 1: + return + + self.last_modified = time.time() + + if not event.is_directory and event.src_path.endswith(self.source): + # Load jobs from cronjobs.json + ic("FILE CHANGED") + cronjobs = JsonResource(self.source).read() + cronjob_ids = [str(cronjob.id) for cronjob in cronjobs] + ic(cronjob_ids) + + # Get all existing jobs + existing_jobs = self.scheduler.get_jobs() + ic(existing_jobs) + + # Remove jobs that are not in cronjobs.json + for job in existing_jobs: + # ic("check-removale", job.id) + if job.id not in cronjob_ids: + ic("REMOVE: ", job.id) + self.scheduler.remove_job(job.id) + + # Add jobs that are not in cronjobs.json + for cronjob in cronjobs: + # ic("check-add", cronjob.id) + existing_job_ids = [job.id for job in existing_jobs] + # ic(existing_job_ids) + if cronjob.enabled and str(cronjob.id) not in existing_job_ids: + # ic("ADD: ", cronjob.id) + minute, hour, day, month, day_of_week = cronjob.crontab.split() + job = self.scheduler.add_job( + my_job, + "cron", + minute=minute, + hour=hour, + day=day, + month=month, + day_of_week=day_of_week, + id=str(cronjob.id), + jobstore="default", + args=[cronjob.job], + ) + next_run_time = job.trigger.get_next_fire_time(None, dt.datetime.now()) + ic("ADDED: ", cronjob.job, next_run_time) + + # Reschedule existing jobs + for cronjob in cronjobs: + if cronjob.enabled: + # ic(cronjob.id) + minute, hour, day, month, day_of_week = cronjob.crontab.split() + job = self.scheduler.reschedule_job( + str(cronjob.id), + trigger="cron", + minute=minute, + hour=hour, + day=day, + month=month, + day_of_week=day_of_week, + ) + ic("RESCHED: ", cronjob.job, job.next_run_time) diff --git a/jobs.py b/jobs.py index b76fc7e..3e9dadf 100644 --- a/jobs.py +++ b/jobs.py @@ -1,17 +1,16 @@ -from icecream import ic - import random import time +from icecream import ic -def my_job(job="select 1"): +def my_job(job="select 1"): # Report about job start. start = time.strftime("%H:%M:%S", time.localtime()) ic("JOB-START", job, start) # Emulate a computing workload. - random_number = random.randint(5, 10) + random_number = random.randint(5, 10) # noqa: S311 time.sleep(random_number) # Report about job end. diff --git a/models.py b/models.py index 6d53062..dcd82a1 100644 --- a/models.py +++ b/models.py @@ -1,9 +1,9 @@ import dataclasses - -from pydantic import BaseModel, Field, validator -from typing import Optional, Union -from datetime import datetime import re +from datetime import datetime +from typing import Optional, Union + +from pydantic import BaseModel @dataclasses.dataclass @@ -11,22 +11,23 @@ class JobStoreLocation: """ Manage the triple of database address, schema name, and table name. """ + address: str schema: str = "ext" table: str = "jobs" class CronJob(BaseModel): - id: int + id: int # noqa: A003 crontab: str job: str enabled: bool last_run: Optional[Union[datetime, None]] = None last_status: Optional[Union[str, None]] = None - #@validator('crontab') - it is more complex than this + # @validator('crontab') - it is more complex than this def validate_crontab(cls, v): pattern = r"(((\d+,)+\d+|(\d+(\/|-)\d+)|\d+|\*) ?){5,7}" if not re.match(pattern, v): - raise ValueError('Invalid crontab syntax') + raise ValueError("Invalid crontab syntax") return v diff --git a/pyproject.toml b/pyproject.toml index 22c1317..f2d75b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,9 +6,8 @@ version = "0.0.0" py-modules = ["cli", "core", "cronjob_routes", "database", "job_seeder", "jobs", "jobstore_sqlalchemy", "models", "settings", "util"] [project.scripts] -supertask = "cli:cli" st = "cli:cli" - +supertask = "cli:cli" [tool.black] line-length = 120 @@ -25,7 +24,7 @@ fail_under = 0 show_missing = true [tool.mypy] -packages = ["supertask"] +packages = ["cli", "core", "cronjob_routes", "database", "job_seeder", "jobs", "jobstore_sqlalchemy", "models", "settings", "util"] exclude = [ ] check_untyped_defs = true @@ -120,14 +119,14 @@ format = [ # Configure Ruff not to auto-fix (remove!): # unused imports (F401), unused variables (F841), `print` statements (T201), and commented-out code (ERA001). { cmd = "ruff --fix --ignore=ERA --ignore=F401 --ignore=F841 --ignore=T20 --ignore=ERA001 ." }, - # { cmd = "pyproject-fmt pyproject.toml" }, + { cmd = "pyproject-fmt pyproject.toml" }, ] lint = [ { cmd = "ruff ." }, { cmd = "black --check ." }, - # { cmd = "validate-pyproject pyproject.toml" }, - # { cmd = "mypy" }, + { cmd = "validate-pyproject pyproject.toml" }, + { cmd = "mypy" }, ] release = [ diff --git a/settings.py b/settings.py index b7cd6ca..d43069f 100644 --- a/settings.py +++ b/settings.py @@ -1,4 +1,5 @@ import dataclasses +import typing as t from models import JobStoreLocation @@ -8,6 +9,7 @@ class Settings: """ Bundle settings for propagating them from the environment to the FastAPI domain. """ + store_location: JobStoreLocation pre_delete_jobs: bool - pre_seed_jobs: str + pre_seed_jobs: t.Optional[str] diff --git a/test_cli.py b/test_cli.py index fb79f20..0a0b72e 100644 --- a/test_cli.py +++ b/test_cli.py @@ -60,4 +60,4 @@ def test_cli_http_service(mocker, st_wait_noop): ) assert result.exit_code == 0 - start_http_service_mock.assert_called_once_with('localhost:3333') + start_http_service_mock.assert_called_once_with("localhost:3333") diff --git a/test_http.py b/test_http.py index 865bcfa..23d7f6d 100644 --- a/test_http.py +++ b/test_http.py @@ -17,7 +17,9 @@ def foo(cronjobs_json_file): # Inject settings as dependency to FastAPI. Thanks, @Mause. # https://github.com/tiangolo/fastapi/issues/2372#issuecomment-732492116 - app.dependency_overrides[Settings] = lambda: Settings(store_location=None, pre_delete_jobs=None, pre_seed_jobs=cronjobs_json_file) + app.dependency_overrides[Settings] = lambda: Settings( + store_location=None, pre_delete_jobs=None, pre_seed_jobs=cronjobs_json_file + ) @pytest.fixture