diff --git a/.github/workflows/sync_branch.yml b/.github/workflows/sync_branch.yml new file mode 100644 index 0000000000..297adcff1a --- /dev/null +++ b/.github/workflows/sync_branch.yml @@ -0,0 +1,24 @@ +name: Sync Feature Branch with Develop + +on: + pull_request: + types: + - opened + - synchronize + +jobs: + sync-with-develop: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Fetch and rebase develop + run: | + git fetch origin + git rebase origin/develop + + - name: Push changes back to feature branch + run: | + git push origin HEAD \ No newline at end of file diff --git a/doajtest/unit/test_kafka_producer.py b/doajtest/unit/test_kafka_producer.py new file mode 100644 index 0000000000..9fd547d5c5 --- /dev/null +++ b/doajtest/unit/test_kafka_producer.py @@ -0,0 +1,98 @@ +from redis import StrictRedis +from unittest.mock import patch, Mock, MagicMock +from doajtest.helpers import DoajTestCase +from portality.events.system_status_check import KafkaStatusCheck +import portality.events.kafka_producer as kafka_events +class TestKafkaStatusCheck(DoajTestCase): + + def setUp(self): + super(TestKafkaStatusCheck, self).setUp() + self.kafka_status = KafkaStatusCheck() + + @patch.object(StrictRedis, "get", return_value=b'True') + @patch.object(KafkaStatusCheck, "can_check_redis", return_value=True) + def test_active_status(self, mock_kafka_status_check, mock_strict_redis): + status = self.kafka_status.is_active() + self.assertTrue(status) + mock_kafka_status_check.assert_called_once() + mock_strict_redis.assert_called_once() + + @patch.object(StrictRedis, "get", return_value=b'False') + @patch.object(KafkaStatusCheck, "can_check_redis", return_value=True) + def test_inactive_status(self, mock_kafka_status_check, mock_strict_redis): + status = self.kafka_status.is_active() + self.assertFalse(status) + mock_kafka_status_check.assert_called_once() + mock_strict_redis.assert_called_once() + +class TestKafkaProducer(DoajTestCase): + + # Test for handle_exception + @patch('portality.core.app.logger.exception') + @patch('portality.app_email.send_mail') + def test_handle_exception(self, mock_send_mail, mock_logger_exception): + error_msg = "Sample error" + exception = Exception("Sample exception") + kafka_events.handle_exception(error_msg, exception) + + mock_logger_exception.assert_called_once_with(error_msg + str(exception)) + mock_send_mail.assert_called_once() + + # Test for kafka_producer when producer is None and no exception raised + @patch('kafka.KafkaProducer') + def test_kafka_producer_new(self, mock_kafka_producer): + kafka_events.producer = None + result = kafka_events.kafka_producer() + + self.assertIsNotNone(result) + mock_kafka_producer.assert_called_once() + + # Test for kafka_producer when producer is already set + def test_kafka_producer_existing(self): + kafka_events.producer = Mock() + result = kafka_events.kafka_producer() + + self.assertEqual(result, kafka_events.producer) + + # Test for kafka_producer when exception raised + @patch('kafka.KafkaProducer', side_effect=Exception("Kafka error")) + @patch('portality.events.kafka_producer.handle_exception') + def test_kafka_producer_exception(self, mock_handle_exception, _): + kafka_events.producer = None + result = kafka_events.kafka_producer() + + self.assertIsNone(result) + mock_handle_exception.assert_called_once() + + # Test for send_event when kafka_status is None + @patch('portality.events.kafka_producer.shortcircuit_send_event') + def test_send_event_status_none(self, mock_shortcircuit_send_event): + kafka_events.kafka_status = None + + kafka_events.send_event(Mock()) + mock_shortcircuit_send_event.assert_called_once() + + # Test for send_event when everything is operational + @patch.object(KafkaStatusCheck, 'is_active', return_value=True) + @patch('portality.events.kafka_producer.kafka_producer', return_value=Mock()) + @patch('portality.events.shortcircuit') + def test_send_event_operational(self, mock_shortcircuit, _, __): + kafka_events.kafka_status = KafkaStatusCheck() + + kafka_events.send_event(Mock()) + mock_shortcircuit.assert_not_called() + + # Test for send_event when exception occurs + @patch('kafka.KafkaProducer', return_value=Mock(send=MagicMock(side_effect=Exception("Send error")))) + @patch.object(KafkaStatusCheck, 'set_kafka_inactive_redis') + @patch.object(KafkaStatusCheck, 'is_active', return_value=True) + @patch('portality.events.kafka_producer.shortcircuit_send_event') + @patch('portality.events.kafka_producer.handle_exception') + @patch('portality.events.kafka_producer.producer', new=None) + def test_send_event_exception(self, __, mock_handle_exception, mock_shortcircuit, _, mock_kafka_producer): + kafka_events.kafka_status = KafkaStatusCheck() + + kafka_events.send_event(Mock()) + mock_handle_exception.assert_called() + mock_shortcircuit.assert_called_once() + mock_kafka_producer.assert_called_once() diff --git a/portality/constants.py b/portality/constants.py index 2f2bb566b6..6ad42111a5 100644 --- a/portality/constants.py +++ b/portality/constants.py @@ -99,6 +99,9 @@ BG_STATUS_STABLE = 'stable' BG_STATUS_UNSTABLE = 'unstable' +# Redis Keys Constants +KAFKA_ACTIVE_STATUS = 'KAFKA_ACTIVE_STATUS' + class ConstantList: @classmethod diff --git a/portality/events/kafka_consumer.py b/portality/events/kafka_consumer.py index 0ce1e1120e..5eae733a5a 100644 --- a/portality/events/kafka_consumer.py +++ b/portality/events/kafka_consumer.py @@ -3,6 +3,7 @@ from portality.app import app as doajapp from portality.bll import DOAJ +from portality import util from portality.models import Event broker = doajapp.config.get("KAFKA_BROKER") @@ -12,6 +13,7 @@ topic = app.topic(topic_name) event_counter = 0 +event_logger = util.custom_timed_rotating_logger('consumer_log.log') @app.agent(topic) @@ -20,6 +22,7 @@ async def handle_event(stream): with doajapp.test_request_context("/"): svc = DOAJ.eventsService() async for event in stream: + event_logger.info(event) event_counter += 1 doajapp.logger.info(f"Kafka event count {event_counter}") # TODO uncomment the following line once the Event model is fixed to Kafka diff --git a/portality/events/kafka_producer.py b/portality/events/kafka_producer.py index 4dfef4aa51..e59946db85 100644 --- a/portality/events/kafka_producer.py +++ b/portality/events/kafka_producer.py @@ -1,12 +1,54 @@ import json -from kafka import KafkaProducer +import kafka +from portality.core import app +from portality import app_email, util +from portality.events.shortcircuit import send_event as shortcircuit_send_event +from portality.events.system_status_check import KafkaStatusCheck from portality.core import app as doajapp bootstrap_server = doajapp.config.get("KAFKA_BOOTSTRAP_SERVER") -producer = KafkaProducer(bootstrap_servers=bootstrap_server, value_serializer=lambda v: json.dumps(v).encode('utf-8')) +def handle_exception(error_msg, exception): + app.logger.exception(error_msg + str(exception)) + app_email.send_mail( + to=[app.config.get('ADMIN_EMAIL', 'sysadmin@cottagelabs.com')], + fro=app.config.get('SYSTEM_EMAIL_FROM', 'helpdesk@doaj.org'), + subject='Alert: DOAJ Kafka send event error', + msg_body=error_msg + ": \n" + str(exception) + ) + +producer = None +event_logger = util.custom_timed_rotating_logger('producer_log.log') + +def kafka_producer(): + global producer + try: + if producer is None: + producer = kafka.KafkaProducer(bootstrap_servers=bootstrap_server, value_serializer=lambda v: json.dumps(v).encode('utf-8')) + except Exception as exp: + producer = None + handle_exception("Error in setting up kafka connection", exp) + return producer + +try: + kafka_status = KafkaStatusCheck() +except Exception as exp: + kafka_status = None + handle_exception("Error in setting up Redis for kafka events", exp) def send_event(event): - future = producer.send('events', value=event.serialise()) - future.get(timeout=60) \ No newline at end of file + try: + event_logger.info(event.data) + if kafka_status and kafka_status.is_active() and kafka_producer(): + future = producer.send('events', value=event.serialise()) + future.get(timeout=60) + else: + shortcircuit_send_event(event) + except Exception as e: + try: + kafka_status.set_kafka_inactive_redis() + except Exception as exp: + handle_exception("Failed to set kafka inactive status in Redis", exp) + shortcircuit_send_event(event) + handle_exception("Failed to send event to Kafka.", e) diff --git a/portality/events/shortcircuit.py b/portality/events/shortcircuit.py index 104a9f3756..9077c4f099 100644 --- a/portality/events/shortcircuit.py +++ b/portality/events/shortcircuit.py @@ -1,6 +1,9 @@ from portality.bll import DOAJ +from portality import util +event_logger = util.custom_timed_rotating_logger('shortcircuit_log.log') def send_event(event): + event_logger.info(event.data) svc = DOAJ.eventsService() svc.consume(event) \ No newline at end of file diff --git a/portality/events/system_status_check.py b/portality/events/system_status_check.py new file mode 100644 index 0000000000..ff3b66f57f --- /dev/null +++ b/portality/events/system_status_check.py @@ -0,0 +1,45 @@ +import redis +from portality import constants +from portality.core import app +from portality.lib import dates + +class KafkaStatusCheck: + """ + useful to set Kafka status to false when there is any issue connecting to Kafka. + If kafka status set to false in Redis, it has to be set to true manually in Radis after fixing the issue. + """ + + def __init__(self): + self.is_kafka_active = True + self.time_gap = app.config['TIME_GAP_REDIS_KAFKA'] + self.last_time = dates.now() + redis_host = app.config['HUEY_REDIS_HOST'] + redis_port = app.config['HUEY_REDIS_PORT'] + self.redis_conn = redis.StrictRedis(host=redis_host, port=redis_port, db=0) + + def is_active(self): + if self.can_check_redis(): + self.is_kafka_active = self.is_kafka_active_redis() + return self.is_kafka_active + + def can_check_redis(self): + time_diff = dates.now() - self.last_time + if time_diff.seconds > self.time_gap: + self.last_time = dates.now() + return True + + return False + + def is_kafka_active_redis(self): + value = self.redis_conn.get(constants.KAFKA_ACTIVE_STATUS) + # set the key value if not set + if value is None: + self.set_default_kafka_status() + return True + return value.decode('utf-8').lower() == "true" + + def set_default_kafka_status(self): + self.redis_conn.set(constants.KAFKA_ACTIVE_STATUS, "true") + + def set_kafka_inactive_redis(self): + self.redis_conn.set(constants.KAFKA_ACTIVE_STATUS, "false") diff --git a/portality/settings.py b/portality/settings.py index 2aba27aead..3a7a96f225 100644 --- a/portality/settings.py +++ b/portality/settings.py @@ -102,6 +102,8 @@ KAFKA_BROKER = "kafka://localhost:9092" KAFKA_EVENTS_TOPIC = "events" KAFKA_BOOTSTRAP_SERVER = "localhost:9092" +# Time gap interval in seconds to query redis server +TIME_GAP_REDIS_KAFKA = 60 * 60 ########################################### # Read Only Mode diff --git a/portality/util.py b/portality/util.py index 2a4e1f36f3..0de0dd81c1 100644 --- a/portality/util.py +++ b/portality/util.py @@ -187,6 +187,35 @@ def get_full_url_safe(endpoint): app.logger.warning(f'endpoint not found -- [{endpoint}]') return None + def no_op(*args, **kwargs): """ noop (no operation) function """ - pass \ No newline at end of file + pass + + +def custom_timed_rotating_logger(file_name): + """Custom Logger to log to specified file name""" + import os + import logging + from logging.handlers import TimedRotatingFileHandler + # Create a logger + logger = logging.getLogger(os.path.splitext(file_name)[0]) + logger.setLevel(logging.DEBUG) # Set the logging level + + # Get the user's home directory + user_home = os.path.expanduser("~") + log_dir = os.path.join(user_home, 'appdata', 'doaj') + if not os.path.exists(log_dir): + os.makedirs(log_dir) + log_filename = os.path.join(log_dir, file_name) + # Rotate every day. Keep 30 days worth of backup. + handler = TimedRotatingFileHandler(log_filename, when="D", interval=1, backupCount=30) + + # Create a formatter and add it to the handler + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + + # Add the handler to the logger + logger.addHandler(handler) + + return logger