-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fallback for kafka failure initial implementation
- Loading branch information
Showing
4 changed files
with
89 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,52 @@ | ||
import json | ||
from kafka import KafkaProducer | ||
from portality.core import app | ||
from portality import app_email | ||
from portality.events.shortcircuit import send_event as shortcircuit_send_event | ||
from portality.events.system_status_check import KafkaStatusCheck | ||
|
||
from portality.core import app as doajapp | ||
bootstrap_server = doajapp.config.get("KAFKA_BOOTSTRAP_SERVER") | ||
|
||
producer = KafkaProducer(bootstrap_servers=bootstrap_server, value_serializer=lambda v: json.dumps(v).encode('utf-8')) | ||
def handle_exception(error_msg, exception): | ||
app.logger.exception(error_msg + str(exception)) | ||
app_email.send_mail( | ||
to=[app.config.get('ADMIN_EMAIL', '[email protected]')], | ||
fro=app.config.get('SYSTEM_EMAIL_FROM', '[email protected]'), | ||
subject='Alert: DOAJ Kafka send event error', | ||
msg_body=error_msg + ": \n" + str(exception) | ||
) | ||
|
||
producer = None | ||
|
||
def kafka_producer(): | ||
global producer | ||
try: | ||
if producer is None: | ||
producer = KafkaProducer(bootstrap_servers=bootstrap_server, value_serializer=lambda v: json.dumps(v).encode('utf-8')) | ||
except Exception as exp: | ||
producer = None | ||
handle_exception("Error in setting up kafka connection", exp) | ||
return producer | ||
|
||
try: | ||
kafka_status = KafkaStatusCheck() | ||
except Exception as exp: | ||
kafka_status = None | ||
handle_exception("Error in setting up Redis for kafka events", exp) | ||
|
||
|
||
def send_event(event): | ||
future = producer.send('events', value=event.serialise()) | ||
future.get(timeout=60) | ||
try: | ||
if kafka_status and kafka_status.is_active() and kafka_producer(): | ||
future = producer.send('events', value=event.serialise()) | ||
future.get(timeout=60) | ||
else: | ||
shortcircuit_send_event(event) | ||
except Exception as e: | ||
try: | ||
kafka_status.set_kafka_inactive_redis() | ||
except Exception as exp: | ||
handle_exception("Failed to set kafka inactive status in Redis", exp) | ||
shortcircuit_send_event(event) | ||
handle_exception("Failed to send event to Kafka.", e) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import redis | ||
from portality import constants | ||
from portality.core import app | ||
from portality.lib import dates | ||
|
||
class KafkaStatusCheck: | ||
|
||
def __init__(self): | ||
self.is_kafka_active = True | ||
self.time_gap = app.config['TIME_GAP_REDIS_KAFKA'] | ||
self.last_time = dates.now() | ||
redis_host = app.config['HUEY_REDIS_HOST'] | ||
redis_port = app.config['HUEY_REDIS_PORT'] | ||
self.redis_conn = redis.StrictRedis(host=redis_host, port=redis_port, db=0) | ||
|
||
def is_active(self): | ||
if self.can_check_in_redis(): | ||
self.is_kafka_active = self.is_kafka_active_redis() | ||
return self.is_kafka_active | ||
|
||
def can_check_in_redis(self): | ||
time_diff = dates.now() - self.last_time | ||
if time_diff.seconds > self.time_gap: | ||
self.last_time = dates.now() | ||
return True | ||
|
||
return False | ||
|
||
def is_kafka_active_redis(self): | ||
value = self.redis_conn.get(constants.KAFKA_ACTIVE_STATUS) | ||
# set the key value if not set | ||
if value is None: | ||
self.set_default_kafka_status() | ||
return True | ||
return value.decode('utf-8').lower() == "true" | ||
|
||
def set_default_kafka_status(self): | ||
self.redis_conn.set(constants.KAFKA_ACTIVE_STATUS, "true") | ||
|
||
def set_kafka_inactive_redis(self): | ||
self.redis_conn.set(constants.KAFKA_ACTIVE_STATUS, "false") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters