-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
de36be0
commit 24f8b31
Showing
4 changed files
with
118 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
from __future__ import annotations | ||
|
||
from celery import shared_task | ||
from sqlalchemy import select | ||
from sqlalchemy.orm import Session, sessionmaker | ||
|
||
from core.celery.job import Job | ||
from core.celery.task import Task | ||
from core.model import Collection | ||
|
||
|
||
class CollectionDeleteJob(Job): | ||
def __init__(self, session_maker: sessionmaker[Session], collection_id: int): | ||
super().__init__(session_maker) | ||
self.collection_id = collection_id | ||
|
||
@staticmethod | ||
def collection(session: Session, collection_id: int) -> Collection | None: | ||
return ( | ||
session.execute(select(Collection).where(Collection.id == collection_id)) | ||
.scalars() | ||
.one_or_none() | ||
) | ||
|
||
@staticmethod | ||
def collection_name(collection: Collection) -> str: | ||
return f"{collection.name}/{collection.protocol} ({collection.id})" | ||
|
||
def run(self) -> None: | ||
with self.transaction() as session: | ||
collection = self.collection(session, self.collection_id) | ||
if collection is None: | ||
self.log.error( | ||
f"Collection with id {self.collection_id} not found. Unable to delete." | ||
) | ||
return | ||
|
||
self.log.info(f"Deleting collection {self.collection_name(collection)}") | ||
collection.delete() | ||
|
||
|
||
@shared_task(key="high", bind=True) | ||
def collection_delete(task: Task, collection_id: int) -> None: | ||
CollectionDeleteJob(task.session_maker, collection_id).run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
from logging import INFO | ||
|
||
from _pytest.logging import LogCaptureFixture | ||
from sqlalchemy import select | ||
from sqlalchemy.orm import sessionmaker | ||
|
||
from core.celery.tasks.collection_delete import CollectionDeleteJob, collection_delete | ||
from core.model import Collection | ||
from tests.fixtures.celery import CeleryFixture | ||
from tests.fixtures.database import DatabaseTransactionFixture | ||
|
||
|
||
def test_collection_delete_job_collection(db: DatabaseTransactionFixture): | ||
# A non-existent collection should return None | ||
assert CollectionDeleteJob.collection(db.session, 1) is None | ||
|
||
collection = db.collection(name="collection1") | ||
assert collection.id is not None | ||
assert CollectionDeleteJob.collection(db.session, collection.id) == collection | ||
|
||
|
||
def test_collection_delete_job_collection_name(db: DatabaseTransactionFixture): | ||
collection = db.collection(name="collection1") | ||
assert ( | ||
CollectionDeleteJob.collection_name(collection) | ||
== f"{collection.name}/{collection.protocol} ({collection.id})" | ||
) | ||
|
||
|
||
def test_collection_delete_job_run( | ||
db: DatabaseTransactionFixture, | ||
mock_session_maker: sessionmaker, | ||
caplog: LogCaptureFixture, | ||
): | ||
# A non-existent collection should log an error | ||
caplog.set_level(INFO) | ||
CollectionDeleteJob(mock_session_maker, 1).run() | ||
assert "Collection with id 1 not found. Unable to delete." in caplog.text | ||
|
||
collection = db.collection(name="collection1") | ||
collection.marked_for_deletion = True | ||
query = select(Collection).where(Collection.id == collection.id) | ||
|
||
assert db.session.execute(query).scalar_one_or_none() == collection | ||
|
||
assert collection.id is not None | ||
job = CollectionDeleteJob(mock_session_maker, collection.id) | ||
job.run() | ||
assert db.session.execute(query).scalar_one_or_none() is None | ||
assert f"Deleting collection" in caplog.text | ||
|
||
|
||
def test_collection_delete_task( | ||
db: DatabaseTransactionFixture, celery_fixture: CeleryFixture | ||
): | ||
collection = db.collection(name="collection1") | ||
collection.marked_for_deletion = True | ||
query = select(Collection).where(Collection.id == collection.id) | ||
assert db.session.execute(query).scalar_one_or_none() == collection | ||
collection_delete.delay(collection.id).wait() | ||
assert db.session.execute(query).scalar_one_or_none() is None |