Skip to content

Commit

Permalink
Paginate merge endpoint to return paginated jobs by state
Browse files Browse the repository at this point in the history
  • Loading branch information
dfitchett committed Jan 2, 2024
1 parent c936f5f commit 6bc1438
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 32 deletions.
24 changes: 14 additions & 10 deletions domain-ee/ee-ep-merge-app/src/python_src/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import logging
import sys
from contextlib import asynccontextmanager
from typing import Annotated
from uuid import UUID, uuid4

import uvicorn
from fastapi import BackgroundTasks, FastAPI, HTTPException, status
from fastapi import BackgroundTasks, FastAPI, HTTPException, Query, status
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from pydantic_models import (
Expand All @@ -14,7 +15,7 @@
MergeJobResponse,
MergeJobsResponse,
)
from schema.merge_job import MergeJob
from schema.merge_job import JobState, MergeJob
from service.ep_merge_machine import EpMergeMachine
from service.hoppy_service import HOPPY
from service.job_store import job_store
Expand Down Expand Up @@ -130,14 +131,17 @@ async def get_merge_request_by_job_id(job_id: UUID):
status.HTTP_200_OK: {"description": "Find all jobs"}
},
response_model_exclude_none=True)
async def get_all_merge_jobs(show_successful: bool = False):
if show_successful:
jobs = list(job_store.get_all_merge_jobs())
else:
jobs = list(job_store.get_merge_jobs_in_progress())
logging.info(f"event=getAllMergeJobs show_successful={sanitize(show_successful)} size={len(jobs)}")

return {"jobs": jobs}
async def get_merge_jobs(state: Annotated[list[JobState], Query()] = JobState.incomplete(),
page: int = 1,
size: int = 10):
jobs, total = job_store.query(state, page, size)
logging.info(f"event=getMergeJobs "
f"total={total} "
f"page={sanitize(page)} "
f"size={sanitize(size)} "
f"states={sanitize(state)}")

return MergeJobsResponse(states=state, total=total, page=page, size=size, jobs=jobs)


if __name__ == "__main__":
Expand Down
8 changes: 7 additions & 1 deletion domain-ee/ee-ep-merge-app/src/python_src/db/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from config import SQLALCHEMY_DATABASE_URI
from fastapi.encoders import jsonable_encoder
from sqlalchemy import create_engine, inspect
from sqlalchemy import create_engine, desc, inspect
from sqlalchemy.orm import sessionmaker

engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_pre_ping=True)
Expand All @@ -25,6 +25,12 @@ def clear(self, model, db):
def query_all(self, model, filter, db):
return db.query(model).filter(filter).all()

@with_connection
def query(self, model, filter, order_by, offset, limit, db):
total = db.query(model).filter(filter).count()
results = db.query(model).filter(filter).order_by(desc(order_by)).offset((offset - 1) * limit).limit(limit).all()
return results, total

@with_connection
def query_first(self, model, filter, db):
return db.query(model).filter(filter).first()
Expand Down
6 changes: 5 additions & 1 deletion domain-ee/ee-ep-merge-app/src/python_src/pydantic_models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from uuid import UUID

from pydantic import BaseModel, Field
from schema.merge_job import MergeJob
from schema.merge_job import JobState, MergeJob
from typing_extensions import Annotated


Expand All @@ -15,6 +15,10 @@ class MergeJobResponse(BaseModel):


class MergeJobsResponse(BaseModel):
states: list[JobState]
total: int
page: int
size: int
jobs: list[MergeJob] = []


Expand Down
7 changes: 7 additions & 0 deletions domain-ee/ee-ep-merge-app/src/python_src/schema/merge_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ class JobState(str, Enum):
RUNNING_CANCEL_CLAIM_FAILED_REVERT_TEMP_STATION_OF_JURISDICTION = 'RUNNING_CANCEL_CLAIM_FAILED_REVERT_TEMP_STATION_OF_JURISDICTION'
COMPLETED_ERROR = 'COMPLETED_ERROR'

@classmethod
def incomplete(cls):
return [state for state in cls if state != JobState.COMPLETED_SUCCESS and state != JobState.COMPLETED_ERROR]

def __str__(self):
return self.value


class MergeJob(BaseModel):
_init_time: ClassVar[datetime] = datetime.now()
Expand Down
15 changes: 8 additions & 7 deletions domain-ee/ee-ep-merge-app/src/python_src/service/job_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def init(self) -> list[MergeJob]:
self.update_merge_job(job)
elif job.state == schema.JobState.RUNNING_CANCEL_EP400_CLAIM or job.state == schema.JobState.RUNNING_ADD_CLAIM_NOTE_TO_EP400:
jobs_to_restart.append(job)
elif job.state != schema.JobState.COMPLETED_ERROR:
else:
job.state = schema.JobState.PENDING
self.update_merge_job(job)
jobs_to_restart.append(job)
Expand All @@ -27,14 +27,15 @@ def init(self) -> list[MergeJob]:
def clear(self):
self.db.clear(MergeJob)

def get_merge_jobs(self, filter) -> list[MergeJob]:
return self.db.query_all(MergeJob, filter)

def get_all_merge_jobs(self) -> list[MergeJob]:
return self.get_merge_jobs(True)
def query(self, states: list[schema.JobState], offset, limit) -> list[MergeJob]:
return self.db.query(MergeJob,
MergeJob.state.in_(states),
MergeJob.updated_at,
offset,
limit)

def get_merge_jobs_in_progress(self) -> list[MergeJob]:
return self.get_merge_jobs(MergeJob.state != schema.JobState.COMPLETED_SUCCESS)
return self.db.query_all(MergeJob, MergeJob.state.in_((schema.JobState.incomplete())))

def get_merge_job(self, job_id) -> MergeJob:
return self.db.query_first(MergeJob, MergeJob.job_id == job_id)
Expand Down
2 changes: 2 additions & 0 deletions domain-ee/ee-ep-merge-app/src/python_src/util/sanitizer.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
def sanitize(obj) -> str:
if isinstance(obj, list):
return str([sanitize(item) for item in obj])
return str(obj).replace('\r\n', '').replace('\n', '')
46 changes: 33 additions & 13 deletions domain-ee/ee-ep-merge-app/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def mock_job_store(mocker):
return mocker.patch('src.python_src.api.job_store', return_value=Mock())


@pytest.fixture(autouse=True)
def submitted_job():
return MergeJob(
job_id=JOB_ID,
Expand All @@ -42,8 +41,8 @@ def submitted_job():


@pytest.fixture(autouse=True)
def mock_job_submit(mock_job_store, submitted_job):
mock_job_store.submit_merge_job.return_value = submitted_job
def mock_job_submit(mock_job_store):
mock_job_store.submit_merge_job.return_value = submitted_job()


def test_health(client: TestClient):
Expand Down Expand Up @@ -98,9 +97,9 @@ def test_get_job_by_job_id_job_not_found(client: TestClient, mock_job_store):
assert response.status_code == 404


def test_get_job_by_job_id_job_found(client: TestClient, mock_job_store, submitted_job):
def test_get_job_by_job_id_job_found(client: TestClient, mock_job_store):
job_id = make_merge_request(client)
mock_job_store.get_merge_job.return_value = submitted_job
mock_job_store.get_merge_job.return_value = submitted_job()

response = client.get(MERGE + f'/{job_id}')
assert response.status_code == 200
Expand All @@ -123,20 +122,41 @@ def make_merge_request(client: TestClient):
return job_id


def test_get_all_jobs_in_progress(client: TestClient, mock_job_store, submitted_job):
expected_job_ids = [make_merge_request(client), make_merge_request(client)]
expected_jobs = [submitted_job, submitted_job]
mock_job_store.get_merge_jobs_in_progress = Mock(return_value=expected_jobs)

response = client.get(MERGE)
@pytest.mark.parametrize("page,size",
[
pytest.param(None, None, id="defaults"),
pytest.param(1, None, id="first page, 10 items"),
pytest.param(1, None, id="last page, 1 item"),
pytest.param(1, 2, id="first page, 2 items"),
pytest.param(6, 2, id="last page, 2 items"),
])
def test_get_merge_jobs_pagination(client: TestClient, mock_job_store, page, size):
# Set defaults for missing values:
expected_page = page if page else 1
expected_size = size if size else 10

expected_jobs = [submitted_job() for i in range(expected_size)]
mock_job_store.query = Mock(return_value=(expected_jobs, 11))

params = {}
if page:
params['page'] = page
if size:
params['size'] = size

response = client.get(MERGE, params=params)
assert response.status_code == 200

response_json = response.json()
assert response_json['total'] == 11
assert response_json['states'] == JobState.incomplete()
assert response_json['page'] == expected_page
assert response_json['size'] == expected_size
results = response_json['jobs']
assert len(results) == len(expected_job_ids)
assert len(results) == len(expected_jobs)

for job in results:
assert job['job_id'] in expected_job_ids
assert job['job_id'] == str(JOB_ID)
assert job['pending_claim_id'] == 1
assert job['ep400_claim_id'] == 2
assert job['state'] == JobState.PENDING.value
Expand Down

0 comments on commit 6bc1438

Please sign in to comment.