diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 30fed93..6cf1162 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,12 @@ Change Log Unreleased ********** +[5.5.0] - 2023-09-21 +******************** +Changed +======= +* Reset edx-django-utils RequestCache before handling each event + [5.4.0] - 2023-08-28 ******************** Changed diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index be17940..aa63692 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -9,4 +9,4 @@ from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer -__version__ = '5.4.0' +__version__ = '5.5.0' diff --git a/edx_event_bus_kafka/internal/consumer.py b/edx_event_bus_kafka/internal/consumer.py index 9cb050a..9e037fd 100644 --- a/edx_event_bus_kafka/internal/consumer.py +++ b/edx_event_bus_kafka/internal/consumer.py @@ -10,6 +10,7 @@ from django.db import connection from django.dispatch import receiver from django.test.signals import setting_changed +from edx_django_utils.cache import RequestCache from edx_django_utils.monitoring import function_trace, record_exception, set_custom_attribute from edx_toggles.toggles import SettingToggle from openedx_events.event_bus import EventBusConsumer @@ -106,6 +107,31 @@ def _reconnect_to_db_if_needed(): connection.connect() +def _clear_request_cache(): + """ + Clear the RequestCache so that each event consumption starts fresh. + + Signal handlers may be written with the assumption that they are called in the context + of a web request, so we clear the request cache just in case. + """ + RequestCache.clear_all_namespaces() + + +def _prepare_for_new_work_cycle(): + """ + Ensure that the application state is appropriate for performing a new unit of work. + + This mimics some setup/teardown that is normally performed by Django in its + request/response based architecture and that is needed for ensuring a clean and + usable state in this worker-based application. + """ + # Ensure that the database connection is active and usable. + _reconnect_to_db_if_needed() + + # Clear the request cache, in case anything in the signal handlers rely on it. + _clear_request_cache() + + class KafkaEventConsumer(EventBusConsumer): """ Construct consumer for the given topic and group. The consumer can then @@ -278,8 +304,10 @@ def _consume_indefinitely(self): msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT) if msg is not None: with function_trace('_consume_indefinitely_consume_single_message'): - # Before processing, make sure our db connection is still active - _reconnect_to_db_if_needed() + # Before processing, make sure our application state is similar to + # that of a new Django request. (Mimic setup/teardown.) + _prepare_for_new_work_cycle() + signal = self.determine_signal(msg) msg.set_value(self._deserialize_message_value(msg, signal)) self.emit_signals_from_message(msg, signal)