diff --git a/portality/constants.py b/portality/constants.py index 2f2bb566b6..6ad42111a5 100644 --- a/portality/constants.py +++ b/portality/constants.py @@ -99,6 +99,9 @@ BG_STATUS_STABLE = 'stable' BG_STATUS_UNSTABLE = 'unstable' +# Redis Keys Constants +KAFKA_ACTIVE_STATUS = 'KAFKA_ACTIVE_STATUS' + class ConstantList: @classmethod diff --git a/portality/events/kafka_producer.py b/portality/events/kafka_producer.py index 4dfef4aa51..03d0a90c24 100644 --- a/portality/events/kafka_producer.py +++ b/portality/events/kafka_producer.py @@ -1,12 +1,52 @@ import json from kafka import KafkaProducer +from portality.core import app +from portality import app_email +from portality.events.shortcircuit import send_event as shortcircuit_send_event +from portality.events.system_status_check import KafkaStatusCheck from portality.core import app as doajapp bootstrap_server = doajapp.config.get("KAFKA_BOOTSTRAP_SERVER") -producer = KafkaProducer(bootstrap_servers=bootstrap_server, value_serializer=lambda v: json.dumps(v).encode('utf-8')) +def handle_exception(error_msg, exception): + app.logger.exception(error_msg + str(exception)) + app_email.send_mail( + to=[app.config.get('ADMIN_EMAIL', 'sysadmin@cottagelabs.com')], + fro=app.config.get('SYSTEM_EMAIL_FROM', 'helpdesk@doaj.org'), + subject='Alert: DOAJ Kafka send event error', + msg_body=error_msg + ": \n" + str(exception) + ) + +producer = None + +def kafka_producer(): + global producer + try: + if producer is None: + producer = KafkaProducer(bootstrap_servers=bootstrap_server, value_serializer=lambda v: json.dumps(v).encode('utf-8')) + except Exception as exp: + producer = None + handle_exception("Error in setting up kafka connection", exp) + return producer + +try: + kafka_status = KafkaStatusCheck() +except Exception as exp: + kafka_status = None + handle_exception("Error in setting up Redis for kafka events", exp) def send_event(event): - future = producer.send('events', value=event.serialise()) - future.get(timeout=60) \ No newline at end of file + try: + if kafka_status and kafka_status.is_active() and kafka_producer(): + future = producer.send('events', value=event.serialise()) + future.get(timeout=60) + else: + shortcircuit_send_event(event) + except Exception as e: + try: + kafka_status.set_kafka_inactive_redis() + except Exception as exp: + handle_exception("Failed to set kafka inactive status in Redis", exp) + shortcircuit_send_event(event) + handle_exception("Failed to send event to Kafka.", e) diff --git a/portality/events/system_status_check.py b/portality/events/system_status_check.py new file mode 100644 index 0000000000..e0facc3fbb --- /dev/null +++ b/portality/events/system_status_check.py @@ -0,0 +1,41 @@ +import redis +from portality import constants +from portality.core import app +from portality.lib import dates + +class KafkaStatusCheck: + + def __init__(self): + self.is_kafka_active = True + self.time_gap = app.config['TIME_GAP_REDIS_KAFKA'] + self.last_time = dates.now() + redis_host = app.config['HUEY_REDIS_HOST'] + redis_port = app.config['HUEY_REDIS_PORT'] + self.redis_conn = redis.StrictRedis(host=redis_host, port=redis_port, db=0) + + def is_active(self): + if self.can_check_in_redis(): + self.is_kafka_active = self.is_kafka_active_redis() + return self.is_kafka_active + + def can_check_in_redis(self): + time_diff = dates.now() - self.last_time + if time_diff.seconds > self.time_gap: + self.last_time = dates.now() + return True + + return False + + def is_kafka_active_redis(self): + value = self.redis_conn.get(constants.KAFKA_ACTIVE_STATUS) + # set the key value if not set + if value is None: + self.set_default_kafka_status() + return True + return value.decode('utf-8').lower() == "true" + + def set_default_kafka_status(self): + self.redis_conn.set(constants.KAFKA_ACTIVE_STATUS, "true") + + def set_kafka_inactive_redis(self): + self.redis_conn.set(constants.KAFKA_ACTIVE_STATUS, "false") diff --git a/portality/settings.py b/portality/settings.py index 2bffdbab8a..f4c645782c 100644 --- a/portality/settings.py +++ b/portality/settings.py @@ -102,6 +102,8 @@ KAFKA_BROKER = "kafka://localhost:9092" KAFKA_EVENTS_TOPIC = "events" KAFKA_BOOTSTRAP_SERVER = "localhost:9092" +# Time gap interval in seconds to query redis server +TIME_GAP_REDIS_KAFKA = 60 * 60 ########################################### # Read Only Mode