Skip to content

Commit

Permalink
Delay report handlers to Celery task
Browse files Browse the repository at this point in the history
  • Loading branch information
medihack committed Feb 27, 2024
1 parent 482cf6d commit f244dc1
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 37 deletions.
45 changes: 23 additions & 22 deletions notebooks/radis_api.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -19,7 +19,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 26,
"metadata": {},
"outputs": [
{
Expand All @@ -32,7 +32,7 @@
{
"data": {
"text/plain": [
"{'id': 101,\n",
"{'id': 108,\n",
" 'document_id': 'gepacs_3dfidii5858-6633i4-ii398841',\n",
" 'language': 'en',\n",
" 'pacs_aet': 'gepacs',\n",
Expand All @@ -49,8 +49,8 @@
" 'accession_number': '345348389',\n",
" 'series_instance_uid': '34343-676556-3343',\n",
" 'sop_instance_uid': '35858-384834-3843'},\n",
" 'created_at': '2024-02-24T00:00:26.628118+01:00',\n",
" 'updated_at': '2024-02-24T00:00:26.628127+01:00',\n",
" 'created_at': '2024-02-27T23:07:02.255160+01:00',\n",
" 'updated_at': '2024-02-27T23:07:02.255167+01:00',\n",
" 'groups': [2]}"
]
},
Expand Down Expand Up @@ -91,13 +91,13 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": 27,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'id': 101,\n",
"{'id': 108,\n",
" 'document_id': 'gepacs_3dfidii5858-6633i4-ii398841',\n",
" 'language': 'en',\n",
" 'pacs_aet': 'gepacs',\n",
Expand All @@ -114,12 +114,12 @@
" 'accession_number': '345348389',\n",
" 'series_instance_uid': '34343-676556-3343',\n",
" 'sop_instance_uid': '35858-384834-3843'},\n",
" 'created_at': '2024-02-24T00:00:26.628118+01:00',\n",
" 'updated_at': '2024-02-24T00:00:32.829693+01:00',\n",
" 'created_at': '2024-02-27T23:07:02.255160+01:00',\n",
" 'updated_at': '2024-02-27T23:07:02.691615+01:00',\n",
" 'groups': [2]}"
]
},
"execution_count": 3,
"execution_count": 27,
"metadata": {},
"output_type": "execute_result"
}
Expand Down Expand Up @@ -159,13 +159,13 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": 28,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'id': 101,\n",
"{'id': 108,\n",
" 'document_id': 'gepacs_3dfidii5858-6633i4-ii398841',\n",
" 'language': 'en',\n",
" 'pacs_aet': 'gepacs',\n",
Expand All @@ -182,8 +182,8 @@
" 'sop_instance_uid': '35858-384834-3843',\n",
" 'study_instance_uid': '34343-34343-34343',\n",
" 'series_instance_uid': '34343-676556-3343'},\n",
" 'created_at': '2024-02-24T00:00:26.628118+01:00',\n",
" 'updated_at': '2024-02-24T00:00:32.829693+01:00',\n",
" 'created_at': '2024-02-27T23:07:02.255160+01:00',\n",
" 'updated_at': '2024-02-27T23:07:02.691615+01:00',\n",
" 'groups': [2],\n",
" 'documents': {'vespa': {'pathId': '/document/v1/report/report/docid/gepacs_3dfidii5858-6633i4-ii398841',\n",
" 'id': 'id:report:report::gepacs_3dfidii5858-6633i4-ii398841',\n",
Expand All @@ -195,12 +195,13 @@
" 'patient_sex': 'M',\n",
" 'study_description': 'CT of the Thorax',\n",
" 'groups': [2],\n",
" 'patient_age': 24,\n",
" 'links': ['http://gepacs.com/34343-34343-34343'],\n",
" 'pacs_aet': 'gepacs',\n",
" 'study_datetime': 965858400}}}}"
]
},
"execution_count": 4,
"execution_count": 28,
"metadata": {},
"output_type": "execute_result"
}
Expand All @@ -217,7 +218,7 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": 29,
"metadata": {},
"outputs": [
{
Expand All @@ -237,13 +238,13 @@
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": 30,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'id': 102,\n",
"{'id': 109,\n",
" 'document_id': 'gepacs_3dfidii5858-6633i4-ii398841',\n",
" 'language': 'en',\n",
" 'pacs_aet': 'gepacs',\n",
Expand All @@ -260,12 +261,12 @@
" 'accession_number': '345348389',\n",
" 'series_instance_uid': '34343-676556-3343',\n",
" 'sop_instance_uid': '35858-384834-3843'},\n",
" 'created_at': '2024-02-24T00:00:43.528647+01:00',\n",
" 'updated_at': '2024-02-24T00:00:43.528660+01:00',\n",
" 'created_at': '2024-02-27T23:07:04.039672+01:00',\n",
" 'updated_at': '2024-02-27T23:07:04.039679+01:00',\n",
" 'groups': [2]}"
]
},
"execution_count": 6,
"execution_count": 30,
"metadata": {},
"output_type": "execute_result"
}
Expand Down Expand Up @@ -305,7 +306,7 @@
},
{
"cell_type": "code",
"execution_count": 7,
"execution_count": 31,
"metadata": {},
"outputs": [
{
Expand Down
14 changes: 7 additions & 7 deletions radis/reports/api/viewsets.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any

from django.db import transaction
from django.http import Http404
from rest_framework import mixins, status, viewsets
from rest_framework.exceptions import MethodNotAllowed
Expand All @@ -8,8 +9,10 @@
from rest_framework.response import Response
from rest_framework.serializers import BaseSerializer

from radis.reports.tasks import report_created, report_deleted, report_updated

from ..models import Report
from ..site import document_fetchers, report_event_handlers
from ..site import document_fetchers
from .serializers import ReportSerializer


Expand Down Expand Up @@ -55,8 +58,7 @@ def perform_create(self, serializer: BaseSerializer) -> None:
super().perform_create(serializer)
assert serializer.instance
report: Report = serializer.instance
for handler in report_event_handlers:
handler("created", report)
transaction.on_commit(lambda: report_created.delay(report.document_id))

def update(self, request: Request, *args: Any, **kwargs: Any) -> Response:
# DRF itself does not support upsert (create the object to update if it does not exist).
Expand Down Expand Up @@ -89,14 +91,12 @@ def perform_update(self, serializer: BaseSerializer) -> None:
super().perform_update(serializer)
assert serializer.instance
report: Report = serializer.instance
for handler in report_event_handlers:
handler("updated", report)
transaction.on_commit(lambda: report_updated.delay(report.document_id))

def partial_update(self, request: Request, *args: Any, **kwargs: Any) -> Response:
assert request.method
raise MethodNotAllowed(request.method)

def perform_destroy(self, instance: Report) -> None:
super().perform_destroy(instance)
for handler in report_event_handlers:
handler("deleted", instance)
transaction.on_commit(lambda: report_deleted.delay(instance.document_id))
2 changes: 1 addition & 1 deletion radis/reports/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from .models import Report

ReportEventType = Literal["created", "updated", "deleted"]
ReportEventHandler = Callable[[ReportEventType, Report], None]
ReportEventHandler = Callable[[ReportEventType, str], None]

report_event_handlers: list[ReportEventHandler] = []

Expand Down
21 changes: 21 additions & 0 deletions radis/reports/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from celery import shared_task

from .site import report_event_handlers


@shared_task
def report_created(document_id: str) -> None:
for handler in report_event_handlers:
handler("created", document_id)


@shared_task
def report_updated(document_id: str) -> None:
for handler in report_event_handlers:
handler("updated", document_id)


@shared_task
def report_deleted(document_id: str) -> None:
for handler in report_event_handlers:
handler("deleted", document_id)
17 changes: 10 additions & 7 deletions radis/vespa/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ def register_app():
update_document,
)

def handle_report(event_type: ReportEventType, report: Report):
# Sync reports with Vespa
if event_type == "created":
create_document(report.document_id, report)
elif event_type == "updated":
update_document(report.document_id, report)
def handle_report(event_type: ReportEventType, document_id: str):
if event_type in ("created", "updated"):
report = Report.objects.get(document_id=document_id)
if event_type == "created":
create_document(document_id, report)
elif event_type == "updated":
update_document(document_id, report)
elif event_type == "deleted":
delete_document(report.document_id)
delete_document(document_id)
else:
raise ValueError(f"Invalid report event type: {event_type}")

register_report_handler(handle_report)

Expand Down

0 comments on commit f244dc1

Please sign in to comment.