From 58a12024107bd084a95e8a4d09b87013ebcc9255 Mon Sep 17 00:00:00 2001 From: Daniel Pupius Date: Mon, 7 Oct 2024 14:17:45 -0700 Subject: [PATCH] Use an enum for connection state --- check-mate.py | 18 +++++++++--------- quality.py | 2 +- status.py | 15 +++++++++++++-- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/check-mate.py b/check-mate.py index 2cd4418..54351de 100644 --- a/check-mate.py +++ b/check-mate.py @@ -14,7 +14,7 @@ import meshtastic.tcp_interface from meshtastic.protobuf import portnums_pb2 -from status import StatusManager +from status import StatusManager, Status from quality import classifyQuality from radiocheck import getResponse @@ -71,7 +71,7 @@ def start(self): if lastUpdate > PROBE_TIMEOUT: self.sendProbe() if lastUpdate > UNHEALTHY_TIMEOUT: - self.setStatus("unknown") + self.setStatus(Status.UNKNOWN) except Exception as ex: self.logger.error( @@ -80,21 +80,21 @@ def start(self): extra={"host": self.host, "error": ex}, ) self.logger.info("Retrying in 5 seconds...") - self.setStatus("restarting") + self.setStatus(Status.RESTARTING) time.sleep(5) except KeyboardInterrupt: self.logger.info("Shutting down...", extra={"host": self.host}) - self.setStatus("shutdown") + self.setStatus(Status.SHUTDOWN) return 0 def sendProbe(self): self.logger.info("Sending probe...") # TODO: See if this is enough. Might want to actually send a test packet # though that could potentially add noise to the network. - self.setStatus("probing") + self.setStatus(Status.PROBING) self.iface.sendHeartbeat() - self.setStatus("active", True) + self.setStatus(Status.ACTIVE) # self.iface.sendData("probe", portNum=portnums_pb2.PortNum.PRIVATE_APP) def setStatus(self, status, ping=False): @@ -113,19 +113,19 @@ def onConnect(self, interface, topic=pub.AUTO_TOPIC): for node in interface.nodes.values(): self.updateUser(node["user"]) self.logger.info("Connected...") - self.setStatus("connected", ping=True) + self.setStatus(Status.CONNECTED, ping=True) def onDisconnect(self, interface, topic=pub.AUTO_TOPIC): """called when we disconnect from the radio""" self.logger.info("Disconnected... waiting for reconnect...") self.connected = False - self.setStatus("disconnected") + self.setStatus(Status.DISCONNECTED) def onReceive(self, packet, interface): """called when a packet arrives""" self.reportHealth() - self.setStatus("active", ping=True) + self.setStatus(Status.ACTIVE, ping=True) # TODO: Turn this back off or demote to DEBUG level. extra = packet diff --git a/quality.py b/quality.py index 2f9fdac..f61d32f 100644 --- a/quality.py +++ b/quality.py @@ -2,7 +2,7 @@ from dataclasses import dataclass -class QualityLevel(Enum): +class QualityLevel(str, Enum): EXCELLENT = "Excellent" VERY_GOOD = "Very Good" GOOD = "Good" diff --git a/status.py b/status.py index 68f88cf..500a372 100644 --- a/status.py +++ b/status.py @@ -1,9 +1,20 @@ +from enum import Enum import platform from pathlib import Path from typing import Dict import json +class Status(str, Enum): + UNKNOWN = "unknown" + CONNECTED = "connected" + ACTIVE = "active" + DISCONNECTED = "disconnected" + PROBING = "probing" + RESTARTING = "restarting" + SHUTDOWN = "shutdown" + + class StatusManager: def __init__(self, baseDir=None): if baseDir is None or baseDir == "": @@ -27,7 +38,7 @@ def writeStatus(self, status: Dict[str, any]): def readStatus(self) -> Dict[str, any]: """Read the current status from the status file.""" if not self.statusFile.exists(): - return {"status": "unknown"} + return {"status": Status.UNKNOWN} with open(self.statusFile, "r") as f: return json.load(f) @@ -35,4 +46,4 @@ def dump(self): """Print the status file.""" status = self.readStatus() print(json.dumps(status)) - return 0 if status["status"] == "active" else 1 + return 0 if status["status"] == Status.ACTIVE else 1