Skip to content

Commit

Permalink
fix: Add timeouts to _read_pending_msgs
Browse files Browse the repository at this point in the history
These redis methods don't timeout and can cause infinite hangs when
redis goes down while reading the pending messages. We wrap them in our
own timeouts now and make those trigger consumer restarts to force
reconnects and prevent the hangs.
  • Loading branch information
bmtcril authored and navinkarkera committed Oct 12, 2024
1 parent 7c8da18 commit e2542d4
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 5 deletions.
2 changes: 1 addition & 1 deletion edx_event_bus_redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
from edx_event_bus_redis.internal.consumer import RedisEventConsumer
from edx_event_bus_redis.internal.producer import create_producer

__version__ = '0.5.0'
__version__ = '0.5.1'

default_app_config = 'edx_event_bus_redis.apps.EdxEventBusRedisConfig' # pylint: disable=invalid-name
16 changes: 12 additions & 4 deletions edx_event_bus_redis/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from edx_event_bus_redis.internal.message import RedisMessage

from .config import get_full_topic, load_common_settings
from .utils import AUDIT_LOGGING_ENABLED
from .utils import AUDIT_LOGGING_ENABLED, Timeout

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -148,11 +148,19 @@ def _shut_down(self):
def _read_pending_msgs(self) -> Optional[tuple]:
"""
Read pending messages, if no messages found return None.
These redis calls don't have timout args, and we've seen that they
can hang indefinitely when redis goes down. So we wrap them in a
timeout context manager.
"""
logger.debug("Consuming pending msgs first.")

if self.claim_msgs_older_than is not None:
self.consumer.autoclaim(self.consumer_name, min_idle_time=self.claim_msgs_older_than, count=1)
msg_meta = self.consumer.pending(count=1, consumer=self.consumer_name)
with Timeout(5):
self.consumer.autoclaim(self.consumer_name, min_idle_time=self.claim_msgs_older_than, count=1)
with Timeout(5):
msg_meta = self.consumer.pending(count=1, consumer=self.consumer_name)

if msg_meta:
return self.consumer[msg_meta[0]['message_id']]
logger.debug("No more pending messages.")
Expand Down Expand Up @@ -426,7 +434,7 @@ def _is_fatal_redis_error(self, error: Optional[Exception]) -> bool:
Arguments:
error: An exception instance, or None if no error.
"""
if error and isinstance(error, RedisConnectionError):
if error and isinstance(error, (RedisConnectionError, TimeoutError)):
return True

return False
15 changes: 15 additions & 0 deletions edx_event_bus_redis/internal/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Test header conversion utils
"""
import time
from datetime import datetime, timezone
from unittest.mock import Mock, patch
from uuid import uuid1
Expand All @@ -16,6 +17,7 @@
HEADER_ID,
HEADER_SOURCELIB,
HEADER_TIME,
Timeout,
encode,
get_headers_from_metadata,
get_metadata_from_headers,
Expand Down Expand Up @@ -139,3 +141,16 @@ def test_generate_metadata_from_missing_or_bad_headers(self, msg_id, msg_time, s
expected_metadata = EventsMetadata(event_type="abc", id=TEST_UUID)
generated_metadata = get_metadata_from_headers(headers)
self.assertDictEqual(attr.asdict(generated_metadata), attr.asdict(expected_metadata))


class TestTimeout(TestCase):
"""
Test the timeout context manager
"""
def test_timeout(self):
"""
Test that the timeout decorator raises a TimeoutError if the function takes too long
"""
with pytest.raises(TimeoutError):
with Timeout(1):
time.sleep(2)
29 changes: 29 additions & 0 deletions edx_event_bus_redis/internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import logging
import signal
from datetime import datetime
from typing import Tuple
from uuid import UUID
Expand Down Expand Up @@ -115,3 +116,31 @@ def get_headers_from_metadata(event_metadata: oed.EventsMetadata):
values[encode(header.message_header_key)] = encode(header.from_metadata(event_metadata_value))

return values


class Timeout:
"""
Context manager to raise a TimeoutError after a specified number of seconds.
Some redis calls don't have a timeout parameter, so this can be used to enforce a timeout.
"""
def __init__(self, timeout_seconds):
self.timeout_seconds = timeout_seconds

def __enter__(self):
"""
Start the timer, if we don't __exit__ in self.seconds it will raise the TimeoutError.
"""
signal.signal(signal.SIGALRM, Timeout._raise_timeout)
signal.alarm(self.timeout_seconds)
return self

def __exit__(self, exc_type, exc_value, traceback):
"""
Stop the signal timer on context exit.
"""
signal.alarm(0)

@staticmethod
def _raise_timeout(signum, frame):
raise TimeoutError

0 comments on commit e2542d4

Please sign in to comment.