Skip to content

Commit

Permalink
Use an enum for connection state
Browse files Browse the repository at this point in the history
  • Loading branch information
dpup committed Oct 7, 2024
1 parent 6db7d36 commit 58a1202
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
18 changes: 9 additions & 9 deletions check-mate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import meshtastic.tcp_interface
from meshtastic.protobuf import portnums_pb2

from status import StatusManager
from status import StatusManager, Status
from quality import classifyQuality
from radiocheck import getResponse

Expand Down Expand Up @@ -71,7 +71,7 @@ def start(self):
if lastUpdate > PROBE_TIMEOUT:
self.sendProbe()
if lastUpdate > UNHEALTHY_TIMEOUT:
self.setStatus("unknown")
self.setStatus(Status.UNKNOWN)

except Exception as ex:
self.logger.error(
Expand All @@ -80,21 +80,21 @@ def start(self):
extra={"host": self.host, "error": ex},
)
self.logger.info("Retrying in 5 seconds...")
self.setStatus("restarting")
self.setStatus(Status.RESTARTING)
time.sleep(5)

except KeyboardInterrupt:
self.logger.info("Shutting down...", extra={"host": self.host})
self.setStatus("shutdown")
self.setStatus(Status.SHUTDOWN)
return 0

def sendProbe(self):
self.logger.info("Sending probe...")
# TODO: See if this is enough. Might want to actually send a test packet
# though that could potentially add noise to the network.
self.setStatus("probing")
self.setStatus(Status.PROBING)
self.iface.sendHeartbeat()
self.setStatus("active", True)
self.setStatus(Status.ACTIVE)
# self.iface.sendData("probe", portNum=portnums_pb2.PortNum.PRIVATE_APP)

def setStatus(self, status, ping=False):
Expand All @@ -113,19 +113,19 @@ def onConnect(self, interface, topic=pub.AUTO_TOPIC):
for node in interface.nodes.values():
self.updateUser(node["user"])
self.logger.info("Connected...")
self.setStatus("connected", ping=True)
self.setStatus(Status.CONNECTED, ping=True)

def onDisconnect(self, interface, topic=pub.AUTO_TOPIC):
"""called when we disconnect from the radio"""
self.logger.info("Disconnected... waiting for reconnect...")
self.connected = False
self.setStatus("disconnected")
self.setStatus(Status.DISCONNECTED)

def onReceive(self, packet, interface):
"""called when a packet arrives"""

self.reportHealth()
self.setStatus("active", ping=True)
self.setStatus(Status.ACTIVE, ping=True)

# TODO: Turn this back off or demote to DEBUG level.
extra = packet
Expand Down
2 changes: 1 addition & 1 deletion quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass


class QualityLevel(Enum):
class QualityLevel(str, Enum):
EXCELLENT = "Excellent"
VERY_GOOD = "Very Good"
GOOD = "Good"
Expand Down
15 changes: 13 additions & 2 deletions status.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
from enum import Enum
import platform
from pathlib import Path
from typing import Dict
import json


class Status(str, Enum):
UNKNOWN = "unknown"
CONNECTED = "connected"
ACTIVE = "active"
DISCONNECTED = "disconnected"
PROBING = "probing"
RESTARTING = "restarting"
SHUTDOWN = "shutdown"


class StatusManager:
def __init__(self, baseDir=None):
if baseDir is None or baseDir == "":
Expand All @@ -27,12 +38,12 @@ def writeStatus(self, status: Dict[str, any]):
def readStatus(self) -> Dict[str, any]:
"""Read the current status from the status file."""
if not self.statusFile.exists():
return {"status": "unknown"}
return {"status": Status.UNKNOWN}
with open(self.statusFile, "r") as f:
return json.load(f)

def dump(self):
"""Print the status file."""
status = self.readStatus()
print(json.dumps(status))
return 0 if status["status"] == "active" else 1
return 0 if status["status"] == Status.ACTIVE else 1

0 comments on commit 58a1202

Please sign in to comment.