Skip to content

Commit

Permalink
feat: Implement CelerySignalProcessor (#44)
Browse files Browse the repository at this point in the history
The `CelerySignalProcessor` allows automatic updates on the index as delayed
background tasks using Celery.

---------

Co-authored-by: Quentin Coumes <[email protected]>
Co-authored-by: Rasika Amaratissa <[email protected]>
Co-authored-by: Andrii Rusanov <[email protected]>
Co-authored-by: bidaya0 <[email protected]>
  • Loading branch information
4 people committed Mar 20, 2024
1 parent 5b34468 commit 4d4f437
Show file tree
Hide file tree
Showing 13 changed files with 290 additions and 134 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/tests_and_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ jobs:
run: tox -e py-django${{ matrix.django-version }}-opensearch${{ matrix.opensearch-version }}

- name: Upload coverage to Codecov
if: matrix.python-version == 3.10 && matrix.django-version == 41
if: matrix.python-version == 3.12 && matrix.django-version == 50
uses: codecov/codecov-action@v1
with:
file: ./coverage.xml
Expand All @@ -149,10 +149,10 @@ jobs:
steps:
- uses: actions/checkout@master

- name: Set up Python 3.10
- name: Set up Python 3.12
uses: actions/setup-python@v1
with:
python-version: '3.10'
python-version: '3.12'

- name: Creating Built Distributions
run: python setup.py sdist
Expand Down
40 changes: 34 additions & 6 deletions django_opensearch_dsl/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@ def ready(self):
self.module.autodiscover()
connections.configure(**settings.OPENSEARCH_DSL)

# Setup the signal processor.
# Set up the signal processor.
if not self.signal_processor:
signal_processor_path = getattr(
settings, "OPENSEARCH_DSL_SIGNAL_PROCESSOR", "django_opensearch_dsl.signals.RealTimeSignalProcessor"
)
signal_processor_class = import_string(signal_processor_path)
self.signal_processor = signal_processor_class(connections)
self.signal_processor = self.signal_processor_class()(connections)

@classmethod
def autosync_enabled(cls):
Expand All @@ -43,3 +39,35 @@ def auto_refresh_enabled(cls):
def default_queryset_pagination(cls):
"""Return `OPENSEARCH_DSL_QUERYSET_PAGINATION`."""
return getattr(settings, "OPENSEARCH_DSL_QUERYSET_PAGINATION", 4096)

@classmethod
def signal_processor_class(cls):
"""Import and return the target of `OPENSEARCH_SIGNAL_PROCESSOR_CLASS`."""
path = getattr(
settings, "OPENSEARCH_DSL_SIGNAL_PROCESSOR", "django_opensearch_dsl.signals.RealTimeSignalProcessor"
)
return import_string(path)

@classmethod
def signal_processor_serializer_class(cls):
"""Import and return the target of `OPENSEARCH_DSL_SIGNAL_PROCESSOR_SERIALIZER_CLASS`."""
path = getattr(
settings,
"OPENSEARCH_DSL_SIGNAL_PROCESSOR_SERIALIZER_CLASS",
"django.core.serializers.json.DjangoJSONEncoder",
)
return import_string(path)

@classmethod
def signal_processor_deserializer_class(cls):
"""Import and return the target of `OPENSEARCH_DSL_SIGNAL_PROCESSOR_SERIALIZER_CLASS`."""
path = getattr(
settings,
"OPENSEARCH_DSL_SIGNAL_PROCESSOR_DESERIALIZER_CLASS",
getattr(
settings,
"OPENSEARCH_DSL_SIGNAL_PROCESSOR_SERIALIZER_CLASS",
"django.core.serializers.json.DjangoJSONEncoder",
),
)
return import_string(path)
1 change: 0 additions & 1 deletion django_opensearch_dsl/registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def update_related(self, instance, action="index", **kwargs):
"""
if not DODConfig.autosync_enabled():
return

for doc in self._get_related_doc(instance):
doc_instance = doc()
try:
Expand Down
128 changes: 70 additions & 58 deletions django_opensearch_dsl/signals.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
"""Attach django-opensearch-dsl to Django's signals and cause things to index."""

import abc

from django.apps import apps
from django.core.serializers import deserialize, serialize
from django.db import models
from django.dispatch import Signal

from .apps import DODConfig
from .registries import registry

# Sent after document indexing is completed
post_index = Signal()

class BaseSignalProcessor(object):

class BaseSignalProcessor(abc.ABC):
"""Base signal processor.
By default, does nothing with signals but provides underlying
Expand All @@ -17,25 +25,47 @@ def __init__(self, connections):
self.connections = connections
self.setup()

def setup(self):
"""Set up.
@abc.abstractmethod
def handle_save(self, sender, instance, **kwargs):
"""Update the instance in model and associated model indices."""

A hook for setting up anything necessary for `handle_save/handle_delete`
to be executed.
@abc.abstractmethod
def handle_pre_delete(self, sender, instance, **kwargs):
"""Delete the instance from model and associated model indices."""

Default behavior is to do nothing (`pass`).
"""
# Do nothing.
@abc.abstractmethod
def handle_m2m_changed(self, sender, instance, action, **kwargs):
"""Handle changes in ManyToMany relations."""

def setup(self):
"""Set up the SignalProcessor."""
models.signals.post_save.connect(self.handle_save)
models.signals.pre_delete.connect(self.handle_pre_delete)
models.signals.m2m_changed.connect(self.handle_m2m_changed)

def teardown(self):
"""Tear-down.
"""Tear down the SignalProcessor."""
models.signals.post_save.disconnect(self.handle_save)
models.signals.pre_delete.disconnect(self.handle_pre_delete)
models.signals.m2m_changed.disconnect(self.handle_m2m_changed)

A hook for tearing down anything necessary for `handle_save/handle_delete`
to no longer be executed.

Default behavior is to do nothing (`pass`).
"""
# Do nothing.
class RealTimeSignalProcessor(BaseSignalProcessor):
"""Real-time signal processor.
Allows for observing when saves/deletes fire and automatically updates the
search engine appropriately.
"""

def handle_save(self, sender, instance, **kwargs):
"""Update the instance in model and associated model indices."""
registry.update(instance)
registry.update_related(instance)

def handle_pre_delete(self, sender, instance, **kwargs):
"""Delete the instance from model and associated model indices."""
registry.delete(instance, raise_on_error=False)
registry.delete_related(instance, raise_on_error=False)

def handle_m2m_changed(self, sender, instance, action, **kwargs):
"""Handle changes in ManyToMany relations."""
Expand All @@ -44,56 +74,38 @@ def handle_m2m_changed(self, sender, instance, action, **kwargs):
elif action in ("pre_remove", "pre_clear"):
self.handle_pre_delete(sender, instance)

def handle_save(self, sender, instance, **kwargs):
"""Handle save.

Given an individual model instance, update the object in the index.
Update the related objects either.
"""
try:
from celery import shared_task
except ImportError:
pass
else:

@shared_task()
def handle_save_task(app_label, model, pk):
"""Handle the update on the registry as a Celery task."""
instance = apps.get_model(app_label, model).objects.get(pk=pk)
registry.update(instance)
registry.update_related(instance)

def handle_pre_delete(self, sender, instance, **kwargs):
"""Handle removing of instance object from related models instance.
We need to do this before the real delete otherwise the relation
doesn't exists anymore and we can't get the related models instance.
"""
registry.delete_related(instance)

def handle_delete(self, sender, instance, **kwargs):
"""Handle delete.
Given an individual model instance, delete the object from index.
"""
@shared_task()
def handle_pre_delete_task(data):
"""Delete the instance from model and associated model indices."""
instance = next(deserialize("json", data, cls=DODConfig.signal_processor_deserializer_class())).object
registry.delete(instance, raise_on_error=False)
registry.delete_related(instance, raise_on_error=False)

class CelerySignalProcessor(RealTimeSignalProcessor):
"""Celery signal processor.
class RealTimeSignalProcessor(BaseSignalProcessor):
"""Real-time signal processor.
Allows for observing when saves/deletes fire and automatically updates the
search engine appropriately.
"""

def setup(self):
"""Set up the SignalProcessor."""
# Listen to all model saves.
models.signals.post_save.connect(self.handle_save)
models.signals.post_delete.connect(self.handle_delete)

# Use to manage related objects update
models.signals.m2m_changed.connect(self.handle_m2m_changed)
models.signals.pre_delete.connect(self.handle_pre_delete)

def teardown(self):
"""Tear down the SignalProcessor."""
# Listen to all model saves.
models.signals.post_save.disconnect(self.handle_save)
models.signals.post_delete.disconnect(self.handle_delete)
models.signals.m2m_changed.disconnect(self.handle_m2m_changed)
models.signals.pre_delete.disconnect(self.handle_pre_delete)
Allows automatic updates on the index as delayed background tasks using
Celery.
"""

def handle_save(self, sender, instance, **kwargs):
"""Update the instance in model and associated model indices."""
handle_save_task(instance._meta.app_label, instance.__class__.__name__, instance.pk)

# Sent after document indexing is completed
post_index = Signal()
def handle_pre_delete(self, sender, instance, **kwargs):
"""Delete the instance from model and associated model indices."""
handle_pre_delete_task(serialize("json", [instance], cls=DODConfig.signal_processor_serializer_class()))
2 changes: 1 addition & 1 deletion docs/document.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ The `Django` subclass contains parameters related to Django's side of the docume
* `fields` (*optional*) - List model's field name that should be indexed. Do not add the fields you manually declare
into this list. See [Document Field Reference](fields.md) for how to manually define fields.
* `queryset_pagination` (*optional*) - Size of the chunk when indexing,
override [`OPENSEARCH_DSL_QUERYSET_PAGINATION`](settings.md#opensearch_dsl_queryset_pagination.md).
override [`OPENSEARCH_DSL_QUERYSET_PAGINATION`](settings.md#opensearch_dsl_queryset_pagination).
* `related_models` (*optional*) - List of related Django models. Specifies a relation between models that allows for
index updating based on these defined relationships.

Expand Down
35 changes: 35 additions & 0 deletions docs/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,38 @@ Default: `4096`
Size of the chunk used when indexing data. Can be overriden by setting `queryset_pagination` inside `Document`'
s [`Django` subclass](document.md).

## `OPENSEARCH_DSL_SIGNAL_PROCESSOR`

This (optional) setting controls what SignalProcessor class is used to handle Django’s signals and
keep the indices up-to-date. Default to `django_opensearch_dsl.signals.RealTimeSignalProcessor`.

Valid choices are:

* `django_opensearch_dsl.signals.RealTimeSignalProcessor`

Operation are processed synchronously as soon as the signal is emitted.

* `django_opensearch_dsl.signals.CelerySignalProcessor`

Uses Celery to process the operations asynchronously.

## `OPENSEARCH_DSL_SIGNAL_PROCESSOR_SERIALIZER_CLASS`

Default: `django.core.serializers.json.DjangoJSONEncoder`.

When using asynchronous signal processor such as `CelerySignalProcessor`, the instance will probably be deleted from the
database by the time the operation is processed. Since `django-opensearch-dsl` need a relies on the database to do most
of its operation, the instance will be serialized by the signal and deserialized by the processor to keep a valid
instance.

This serialization process can be customized using this setting.
See [Django's serialization documentation](https://docs.djangoproject.com/en/5.0/topics/serialization/#serialization-formats-json)
for more information.

## `OPENSEARCH_DSL_SIGNAL_PROCESSOR_DESERIALIZER_CLASS`

Default: `OPENSEARCH_DSL_SIGNAL_PROCESSOR_SERIALIZER_CLASS`.

Use by the processor to deserialize the data serialized by the signal.
See [`OPENSEARCH_DSL_SIGNAL_PROCESSOR_SERIALIZER_CLASS`](settings.md#opensearch_dsl_signal_processor_serializer_class)
for more information.
1 change: 1 addition & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
bandit>=1.7.7, <2.0.0
black>=24.1.1, <25.0.0
celery>=4.1.0, <6.0.0
coverage>=7.4.4, <8.0.0
flake8>=7.0.0, <8.0.0
isort>=5.13.0, <6.0.0
Expand Down
6 changes: 5 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
)
REQUIREMENTS = [
'opensearch-py>=2.2.0',
'dateutils'
'dateutils',
]
EXTRA_REQUIREMENTS = {
'celery': ["celery>=4.1.0"],
}

setup(
name='django-opensearch-dsl',
Expand All @@ -43,6 +46,7 @@
packages=['django_opensearch_dsl'],
include_package_data=True,
install_requires=REQUIREMENTS,
extras_require=EXTRA_REQUIREMENTS,
license="Apache Software License 2.0",
keywords='django elasticsearch elasticsearch-dsl opensearch opensearch-dsl opensearch-py',
classifiers=CLASSIFIERS,
Expand Down
24 changes: 8 additions & 16 deletions tests/django_dummy_app/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ class Index:

class Django:
model = Continent
fields = [
"name",
]
related_models = [Country]
fields = ["name"]

id = fields.LongField()
countries = fields.NestedField(
Expand All @@ -30,6 +29,9 @@ class Django:
}
)

def get_instances_from_related(self, related: Country) -> Continent:
return related.continent


@registry.register_document
class CountryDocument(Document):
Expand All @@ -39,11 +41,7 @@ class Index:

class Django:
model = Country
fields = [
"name",
"area",
"population",
]
fields = ["name", "area", "population"]

id = fields.LongField()
continent = fields.ObjectField(
Expand All @@ -53,7 +51,7 @@ class Django:
}
)
events_id = fields.LongField(multi=True)
event_count_property = fields.LongField(attr="event_count")
event_count_prop = fields.LongField(attr="event_count_prop")
event_count_func = fields.LongField(attr="event_count_func")

def prepare_events_id(self, obj):
Expand All @@ -73,13 +71,7 @@ class Index:
class Django:
model = Event
queryset_pagination = 512
fields = [
"name",
"date",
"source",
"comment",
"null_field",
]
fields = ["name", "date", "source", "comment", "null_field"]

country = fields.ObjectField(doc_class=CountryDocument)
unknown = fields.LongField(required=False)
Expand Down
2 changes: 1 addition & 1 deletion tests/django_dummy_app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __str__(self):
return f"{self.name}"

@property
def event_count(self):
def event_count_prop(self):
return self.events.all().count()

def event_count_func(self):
Expand Down
4 changes: 4 additions & 0 deletions tests/project/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,7 @@
STATIC_URL = "/static/"

DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"

CELERY_BROKER_URL = "memory://localhost/"
CELERY_TASK_ALWAYS_EAGER = True
CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
Loading

0 comments on commit 4d4f437

Please sign in to comment.