Skip to content

Commit

Permalink
feat: Reset request cache before handling event
Browse files Browse the repository at this point in the history
This is a just-in-case measure -- we don't know of any specific code that
handles events and relies on the cache. But it's likely to come up at some
point.

Ticket: openedx/openedx-events#235
  • Loading branch information
timmc-edx committed Sep 20, 2023
1 parent 17fd387 commit c4c20e0
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 3 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Change Log
Unreleased
**********

[5.5.0] - 2023-09-21
********************
Changed
=======
* Reset edx-django-utils RequestCache before handling each event

[5.4.0] - 2023-08-28
********************
Changed
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '5.4.0'
__version__ = '5.5.0'
32 changes: 30 additions & 2 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from django.db import connection
from django.dispatch import receiver
from django.test.signals import setting_changed
from edx_django_utils.cache import RequestCache
from edx_django_utils.monitoring import function_trace, record_exception, set_custom_attribute
from edx_toggles.toggles import SettingToggle
from openedx_events.event_bus import EventBusConsumer
Expand Down Expand Up @@ -106,6 +107,31 @@ def _reconnect_to_db_if_needed():
connection.connect()


def _clear_request_cache():
"""
Clear the RequestCache so that each event consumption starts fresh.
Signal handlers may be written with the assumption that they are called in the context
of a web request, so we clear the request cache just in case.
"""
RequestCache.clear_all_namespaces()


def _prepare_for_new_work_cycle():
"""
Ensure that the application state is appropriate for performing a new unit of work.
This mimics some setup/teardown that is normally performed by Django in its
request/response based architecture and that is needed for ensuring a clean and
usable state in this worker-based application.
"""
# Ensure that the database connection is active and usable.
_reconnect_to_db_if_needed()

# Clear the request cache, in case anything in the signal handlers rely on it.
_clear_request_cache()


class KafkaEventConsumer(EventBusConsumer):
"""
Construct consumer for the given topic and group. The consumer can then
Expand Down Expand Up @@ -278,8 +304,10 @@ def _consume_indefinitely(self):
msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT)
if msg is not None:
with function_trace('_consume_indefinitely_consume_single_message'):
# Before processing, make sure our db connection is still active
_reconnect_to_db_if_needed()
# Before processing, make sure our application state is similar to
# that of a new Django request. (Mimic setup/teardown.)
_prepare_for_new_work_cycle()

signal = self.determine_signal(msg)
msg.set_value(self._deserialize_message_value(msg, signal))
self.emit_signals_from_message(msg, signal)
Expand Down

0 comments on commit c4c20e0

Please sign in to comment.