Skip to content

Commit

Permalink
fix: Cache producers so that they don't lose data (#21)
Browse files Browse the repository at this point in the history
Also needed to break one test into two so that caching did not interfere.
  • Loading branch information
timmc-edx authored and whuang1202 committed Aug 9, 2022
1 parent 2675e99 commit 5e9747a
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 6 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,16 @@ Unreleased

*

[0.2.0] - 2022-08-09
~~~~~~~~~~~~~~~~~~~~

Fixed
_____

* Cache producers so that they don't lose data.

[0.1.0] - 2022-06-16
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~

Added
_____
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Kafka implementation for Open edX event bus.
"""

__version__ = '0.1.1'
__version__ = '0.2.0'
5 changes: 5 additions & 0 deletions edx_event_bus_kafka/publishing/event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer:
return AvroSignalSerializer(signal)


# Note: This caching is required, since otherwise the Producer will
# fall out of scope and be garbage-collected, destroying the
# outbound-message queue and threads. The use of this cache allows the
# producers to be long-lived.
@lru_cache
def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) -> Optional[SerializingProducer]:
"""
Create the producer for a signal and a key field path.
Expand Down
14 changes: 10 additions & 4 deletions edx_event_bus_kafka/publishing/test_event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
class TestEventProducer(TestCase):
"""Test producer."""

def setUp(self):
super().setUp()
ep.get_producer_for_signal.cache_clear()
ep.get_serializer.cache_clear()

def test_extract_event_key(self):
event_data = {
'user': UserData(
Expand Down Expand Up @@ -56,17 +61,18 @@ def test_extract_key_schema(self):
schema = ep.extract_key_schema(AvroSignalSerializer(signal), 'user.pii.username')
assert schema == '{"name": "username", "type": "string"}'

def test_get_producer_for_signal(self):
def test_get_producer_for_signal_unconfigured(self):
"""With missing essential settings, just warn and return None."""
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED

# With missing essential settings, just warn and return None
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter('always')
assert ep.get_producer_for_signal(signal, 'user.id') is None
assert len(caught_warnings) == 1
assert str(caught_warnings[0].message).startswith("Cannot configure event-bus-kafka: Missing setting ")

# Creation succeeds when all settings are present
def test_get_producer_for_signal_configured(self):
"""Creation succeeds when all settings are present."""
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED
with override_settings(
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345',
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY='some_key',
Expand Down

0 comments on commit 5e9747a

Please sign in to comment.