Skip to content

Commit

Permalink
v2.0.4 - Add MQTT health monitoring
Browse files Browse the repository at this point in the history
v2.0.4 - Add MQTT health monitoring
  • Loading branch information
TheHolyRoger authored Jun 2, 2023
2 parents 0c5e2c0 + e553d1d commit 265c7be
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 38 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.3
2.0.4
24 changes: 20 additions & 4 deletions rogerthat/app/rogerthat.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,29 @@ def __init__(self):
self._ev_loop = None
self._serv_task = None

def ensure_future(self, coro, with_timeout=None, *args, **kwargs):
return safe_ensure_future(
coro,
with_timeout=with_timeout,
loop=self._ev_loop,
*args,
**kwargs
)

def call_soon_threadsafe(self, *args, **kwargs):
return self._ev_loop.call_soon_threadsafe(*args, **kwargs)

async def async_run_in_executor(self, *args, **kwargs):
return await self._ev_loop.run_in_executor(*args, **kwargs)

async def Initialise(self):
logger.info("Initialising database.")
logger.debug("Initialising database.")
db_started = await database_init.initialise()
if not db_started:
await asyncio.sleep(0.1)
self.shutdown()
return
logger.info("Finished initialising database.")
logger.debug("Finished initialising database.")
logger.info(splash_msg)

def _signal_handler(self, *_): # noqa: N803
Expand Down Expand Up @@ -73,12 +88,13 @@ async def exit_loop(self):
def shutdown(self):
logger.info("Stopping RogerThat Server.")
self.shutdown_event.set()
safe_ensure_future(self.exit_loop(), loop=self._ev_loop)
self.ensure_future(self.exit_loop())

def start_queues(self):
self._request_queue = request_processing_queue.get_instance()
logger.info("Starting Broadcast Queues.")
logger.debug("Starting Broadcast Queues.")
self._mqtt_queue = mqtt_queue.get_instance()
self._mqtt_queue.start()

def start_server(self):
logger.info(f"RogerThat v{Config.get_inst().version} starting.")
Expand Down
14 changes: 7 additions & 7 deletions rogerthat/db/database_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@ class database_init():

@classmethod
async def create_db(cls):
logger.info("Database does not exist yet, creating.")
logger.warning("Database does not exist yet, creating.")
async with db_engine.db().engine_root.connect() as conn:
await conn.execute(text(f"CREATE DATABASE {Config.get_inst().database_name}"))
return True

@classmethod
async def create_tables(cls):
logger.info("Creating new or missing db tables.")
logger.debug("Creating new or missing db tables.")
async with db_engine.db().engine.begin() as conn:
await conn.run_sync(cls._meta.create_all)
logger.info("Done creating tables.")
logger.info("Done creating missing database tables.")
return True

@classmethod
async def initialise(cls):
logger.info("Database init.")
logger.debug("Database init.")
try:
await cls.create_tables()
except Exception:
Expand All @@ -55,17 +55,17 @@ async def initialise(cls):
logger.error("Failed to connect to SQL database. Check host and port.")
return None

logger.info("Checking alembic.")
logger.debug("Checking alembic.")
try:
current_revision = fetch_alembic_revision()
except Exception as e:
logger.error(f"Alembic exception: {e}")
current_revision = None
if not current_revision:
logger.info("First time init, stamping revision.")
logger.warning("First time init, stamping revision.")
alembic_cmd.stamp(cls._alembic_cfg, "head")
else:
logger.info("Running alembic migration.")
logger.warning("Running alembic migration.")
alembic_cmd.upgrade(cls._alembic_cfg, "head")
return True

Expand Down
132 changes: 131 additions & 1 deletion rogerthat/mqtt/mqtt.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#!/usr/bin/env python

import asyncio
import threading
from typing import TYPE_CHECKING

from commlib.node import Node
from commlib.transports.mqtt import ConnectionParameters as MQTTConnectionParameters

from rogerthat.app.delegate import App
from rogerthat.config.config import Config
from rogerthat.logging.configure import AsyncioLogger
from rogerthat.mqtt.messages import TradingviewMessage
Expand All @@ -21,6 +24,7 @@ class MQTTPublisher:
def __init__(self,
topic: str,
mqtt_node: Node):
self._initial_connection_completed = False

self._node = mqtt_node

Expand All @@ -29,18 +33,41 @@ def __init__(self,
self.publisher = self._node.create_publisher(
topic=self._topic, msg_type=TradingviewMessage
)
self.publisher.run()

@property
def is_ready(self):
return self._initial_connection_completed

@property
def is_connected(self):
is_connected = self.publisher._transport.is_connected

if not self._initial_connection_completed and is_connected:
self._initial_connection_completed = True

return is_connected

def broadcast(self, event: "tradingview_event"):
logger.debug(f"Broadcasting MQTT event on {self._topic}: {event}")
self.publisher.publish(event)

def __del__(self):
self.publisher.stop()


class MQTTGateway(Node):
NODE_NAME = "$APP.$UID"
HEARTBEAT_URI = "$APP/$UID/hb"

def __init__(self,
*args, **kwargs):
self._initial_connection_completed = False
self._health = False
self._gateway_ready = asyncio.Event()
self._stop_event_async = asyncio.Event()
self._restart_heartbeat_event_async = asyncio.Event()

self.mqtt_publisher = None

self.HEARTBEAT_URI = f"{Config.get_inst().app_name}/{Config.get_inst().mqtt_instance_name}/hb"
Expand All @@ -60,9 +87,17 @@ def __init__(self,
**kwargs
)

@property
def health(self):
return self._health

async def async_set_ready(self):
await asyncio.sleep(5)
self._gateway_ready.set()

def get_publisher_for(self, topic: str):
if topic not in self._topic_publishers:
logger.info(f"Starting MQTT Publisher for {topic}")
logger.debug(f"Starting MQTT Publisher for {topic}")
self._topic_publishers[topic] = MQTTPublisher(topic=topic, mqtt_node=self)
return self._topic_publishers[topic]

Expand All @@ -74,3 +109,98 @@ def _create_mqtt_params_from_conf(self):
password=Config.get_inst().mqtt_password,
ssl=Config.get_inst().mqtt_ssl
)

def _start_health_monitoring_loop(self):
if threading.current_thread() != threading.main_thread(): # pragma: no cover
App.get_instance().call_soon_threadsafe(self._start_health_monitoring_loop)
return
self._stop_event_async.clear()
App.get_instance().ensure_future(self._monitor_health_loop())

async def _restart_heartbeat(self):
await asyncio.sleep(3)

if not self._hb_thread._heartbeat_pub._transport.is_connected:
logger.warning("Restarting heartbeat thread.")
self._hb_thread.stop()
await asyncio.sleep(3)

try:
self._init_heartbeat_thread()
await asyncio.sleep(5)
logger.warning("Heartbeat thread restarted.")

except Exception:
await asyncio.sleep(5)

self._restart_heartbeat_event_async.clear()

def _check_connections(self) -> bool:
connected = True

# Check heartbeat
if (
not self._hb_thread or
self._hb_thread.stopped() or
not self._hb_thread._heartbeat_pub._transport.is_connected
):
if (
self._initial_connection_completed and
not self._restart_heartbeat_event_async.is_set() and
self._hb_thread and (
self._hb_thread.stopped() or
not self._hb_thread._heartbeat_pub._transport.is_connected
)
):
self._restart_heartbeat_event_async.set()
App.get_instance().ensure_future(self._restart_heartbeat())

connected = False

# Check Publishers
topic_keys = list(self._topic_publishers.keys())

for topic in topic_keys:

p = self._topic_publishers[topic]

if not p.is_connected:
if p.is_ready and self._initial_connection_completed:
logger.debug(f"Restarting publisher thread on {topic}.")
del self._topic_publishers[topic]

connected = False

return connected

async def _monitor_health_loop(self, period: float = 3.0):
await self._gateway_ready.wait()
logger.debug("Started MQTT health monitoring.")
while not self._stop_event_async.is_set():
self._health = await App.get_instance().async_run_in_executor(
None, self._check_connections)

if self._health:
if not self._initial_connection_completed:
self._initial_connection_completed = True

await asyncio.sleep(period)
else:
if self._initial_connection_completed:
logger.warning("MQTT Health check failed. Services should be restarting.")

await asyncio.sleep(10.0)

def _stop_health_monitoring_loop(self):
self._stop_event_async.set()

def start(self) -> None:
self.run()
self._start_health_monitoring_loop()

def stop(self):
self._stop_health_monitoring_loop()
super().stop()

def __del__(self):
self.stop()
68 changes: 43 additions & 25 deletions rogerthat/queues/mqtt_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,48 +27,63 @@ def __init__(self):
self._failure_msg = "Failed to connect to MQTT Broker!"
self._is_ready = False

if Config.get_inst().mqtt_enable:
try:
self._mqtt = MQTTGateway()
self._mqtt.run()
self.start()
except (ConnectionRefusedError, gaierror, OSError) as e:
if self._mqtt is not None:
extra_debug = f" Parameters: {self._mqtt._params}"
else:
extra_debug = ""
logger.error(f"{self._failure_msg} Check host and port! - {e}.{extra_debug}")
except SSLEOFError:
logger.error(f"{self._failure_msg} Using plain HTTP port with SSL enabled!")
except SSLCertVerificationError:
logger.error(f"{self._failure_msg} You need to set up your SSL certificates correctly!")
except Exception as e:
logger.error(e)
raise e
if self._is_ready:
logger.info("MQTT Gateway is ready.")

def _create_queue(self):
self._mqtt_queue = asyncio.Queue()

def start(self):
if not Config.get_inst().mqtt_enable:
return

try:
self._mqtt = MQTTGateway()
self._mqtt.start()
self._start_queue_tasks()
except (ConnectionRefusedError, gaierror, OSError) as e:
if self._mqtt is not None:
extra_debug = f" Parameters: {self._mqtt._params}"
else:
extra_debug = ""
logger.error(f"{self._failure_msg} Check host and port! - {e}.{extra_debug}")
except SSLEOFError:
logger.error(f"{self._failure_msg} Using plain HTTP port with SSL enabled!")
except SSLCertVerificationError:
logger.error(f"{self._failure_msg} You need to set up your SSL certificates correctly!")
except Exception as e:
logger.error(e)
raise e
if self._is_ready:
logger.debug("MQTT Queue is ready.")

def _start_queue_tasks(self):
if self._mqtt:
self._create_queue()
self._mqtt_queue_task = safe_ensure_future(
self._listen_for_broadcasts()
)
self._is_ready = True
safe_ensure_future(self._mqtt.async_set_ready())

def stop(self):
if self._mqtt_queue_task is not None:
self._mqtt_queue_task.cancel()
self._mqtt_queue_task = None
logger.debug("MQTT Queue stopped.")
if self._mqtt:
self._mqtt.stop()

async def _listen_for_broadcasts(self):
if not self._mqtt:
raise Exception("listen_for_broadcasts called but mqtt is not enabled!")
while True:
if not self._mqtt.health:
if not self._mqtt_queue.empty():
logger.warning("MQTT not ready for broadcast, sleeping 5s before retry.")

await asyncio.sleep(5)
continue

msg = None

try:
msg = await self._mqtt_queue.get()
publisher = self._mqtt.get_publisher_for(msg.topic)
Expand All @@ -79,10 +94,13 @@ async def _listen_for_broadcasts(self):
tb = "".join(traceback.TracebackException.from_exception(e).format())
logger.error(f"Error in mqtt_queue: {e}\n{tb}")

if msg and self._mqtt_queue:
self._mqtt_queue.put_nowait(msg)

def broadcast(self, event):
if not self._mqtt_queue:
logger.error("Cannot broadcast, MQTT not started!")
logger.error("Cannot broadcast, MQTT Queue not started!")
return
if self._mqtt:
logger.debug("Adding event to MQTT queue.")
self._mqtt_queue.put_nowait(event)

logger.debug("Adding event to MQTT queue.")
self._mqtt_queue.put_nowait(event)

0 comments on commit 265c7be

Please sign in to comment.