From 658ea2d40b4050e9bada5fcb694e0c1983306623 Mon Sep 17 00:00:00 2001 From: Ethan Li Date: Thu, 26 Dec 2024 10:20:55 -0800 Subject: [PATCH] Update subpackage name references --- .../planktoscope/imager/__init__.py | 2 +- .../adafruithat/planktoscope/imager/mqtt.py | 63 ++++++++++++++----- .../planktoscope/imager/__init__.py | 2 +- .../planktoscope/imager/mqtt.py | 63 ++++++++++++++----- 4 files changed, 96 insertions(+), 34 deletions(-) diff --git a/control/adafruithat/planktoscope/imager/__init__.py b/control/adafruithat/planktoscope/imager/__init__.py index 89756e2a..77d0bb6e 100644 --- a/control/adafruithat/planktoscope/imager/__init__.py +++ b/control/adafruithat/planktoscope/imager/__init__.py @@ -1 +1 @@ -"""imagernew provides high-level functionality for performing image acquisition.""" +"""imager provides high-level functionality for performing image acquisition.""" diff --git a/control/adafruithat/planktoscope/imager/mqtt.py b/control/adafruithat/planktoscope/imager/mqtt.py index cf4ff9f6..59b110ec 100644 --- a/control/adafruithat/planktoscope/imager/mqtt.py +++ b/control/adafruithat/planktoscope/imager/mqtt.py @@ -12,7 +12,7 @@ from planktoscope import identity, integrity, mqtt from planktoscope.camera import mqtt as camera -from planktoscope.imagernew import stopflow +from planktoscope.imager import stopflow loguru.logger.info("planktoscope.imager is loaded") @@ -62,7 +62,9 @@ def run(self) -> None: couldn't be started (e.g. because the camera is missing), it will clean up and then wait until the `stop_event` event is set before quitting. """ - loguru.logger.info(f"The imager control thread has been started in process {os.getpid()}") + loguru.logger.info( + f"The imager control thread has been started in process {os.getpid()}" + ) self._mqtt = mqtt.MQTT_Client(topic="imager/#", name="imager_client") self._mqtt.client.publish("status/imager", '{"status":"Starting up"}') @@ -75,10 +77,16 @@ def run(self) -> None: self._camera = camera.Worker() self._camera.start() if self._camera.camera is None: - loguru.logger.error("Missing camera - maybe it's disconnected or it never started?") + loguru.logger.error( + "Missing camera - maybe it's disconnected or it never started?" + ) # TODO(ethanjli): officially add this error status to the MQTT API! - self._mqtt.client.publish("status/imager", '{"status": "Error: missing camera"}') - loguru.logger.success("Preemptively preparing to shut down since there's no camera...") + self._mqtt.client.publish( + "status/imager", '{"status": "Error: missing camera"}' + ) + loguru.logger.success( + "Preemptively preparing to shut down since there's no camera..." + ) self._cleanup() # TODO(ethanjli): currently we just wait and do nothing until we receive the shutdown # signal, because if we return early then the hardware controller will either shut down @@ -95,7 +103,10 @@ def run(self) -> None: self._mqtt.client.publish("status/imager", '{"status":"Ready"}') try: while not self._stop_event_loop.is_set(): - if self._active_routine is not None and not self._active_routine.is_alive(): + if ( + self._active_routine is not None + and not self._active_routine.is_alive() + ): # Garbage-collect any finished image-acquisition routine threads so that we're # ready for the next configuration update command which arrives: self._active_routine.stop() @@ -163,8 +174,12 @@ def _update_metadata(self, latest_message: dict[str, typing.Any]) -> None: return if "config" not in latest_message: - loguru.logger.error(f"Received message is missing field 'config': {latest_message}") - self._mqtt.client.publish("status/imager", '{"status":"Configuration message error"}') + loguru.logger.error( + f"Received message is missing field 'config': {latest_message}" + ) + self._mqtt.client.publish( + "status/imager", '{"status":"Configuration message error"}' + ) return loguru.logger.info("Updating configuration...") @@ -178,16 +193,22 @@ def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None: assert self._pump is not None assert self._camera is not None - if (acquisition_settings := _parse_acquisition_settings(latest_message)) is None: + if ( + acquisition_settings := _parse_acquisition_settings(latest_message) + ) is None: self._mqtt.client.publish("status/imager", '{"status":"Error"}') return if self._camera.camera is None: loguru.logger.error("Missing camera - maybe it was closed?") # TODO(ethanjli): officially add this error status to the MQTT API! - self._mqtt.client.publish("status/imager", '{"status": "Error: missing camera"}') + self._mqtt.client.publish( + "status/imager", '{"status": "Error: missing camera"}' + ) raise RuntimeError("Camera is not available") - assert (capture_size := self._camera.camera.stream_config.capture_size) is not None + assert ( + capture_size := self._camera.camera.stream_config.capture_size + ) is not None camera_settings = self._camera.camera.settings assert (image_gain := camera_settings.image_gain) is not None machine_name = identity.load_machine_name() @@ -216,7 +237,9 @@ def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None: return self._active_routine = ImageAcquisitionRoutine( - stopflow.Routine(output_path, acquisition_settings, self._pump, self._camera.camera), + stopflow.Routine( + output_path, acquisition_settings, self._pump, self._camera.camera + ), self._mqtt, ) self._active_routine.start() @@ -250,7 +273,9 @@ def _parse_acquisition_settings( total_images=int(latest_message["nb_frame"]), stabilization_duration=float(latest_message["sleep"]), pump=stopflow.DiscretePumpSettings( - direction=stopflow.PumpDirection(latest_message.get("pump_direction", "FORWARD")), + direction=stopflow.PumpDirection( + latest_message.get("pump_direction", "FORWARD") + ), flowrate=float(latest_message.get("pump_flowrate", 2)), volume=float(latest_message["volume"]), ), @@ -317,7 +342,9 @@ class ImageAcquisitionRoutine(threading.Thread): # TODO(ethanjli): instead of taking an arg of type mqtt.MQTT_CLIENT, just take an arg of # whatever `mqtt_client.client`'s type is supposed to be. Or maybe we should just initialize # our own MQTT client in here? - def __init__(self, routine: stopflow.Routine, mqtt_client: mqtt.MQTT_Client) -> None: + def __init__( + self, routine: stopflow.Routine, mqtt_client: mqtt.MQTT_Client + ) -> None: """Initialize the thread. Args: @@ -335,7 +362,9 @@ def run(self) -> None: if (result := self._routine.run_step()) is None: if self._routine.interrupted: loguru.logger.debug("Image-acquisition routine was interrupted!") - self._mqtt_client.publish("status/imager", '{"status":"Interrupted"}') + self._mqtt_client.publish( + "status/imager", '{"status":"Interrupted"}' + ) break loguru.logger.debug("Image-acquisition routine ran to completion!") self._mqtt_client.publish("status/imager", '{"status":"Done"}') @@ -415,7 +444,9 @@ def _receive_messages(self) -> None: continue if self._mqtt.msg["payload"]["status"] not in {"Done", "Interrupted"}: - loguru.logger.debug(f"Ignoring pump status update: {self._mqtt.msg['payload']}") + loguru.logger.debug( + f"Ignoring pump status update: {self._mqtt.msg['payload']}" + ) self._mqtt.read_message() continue diff --git a/control/planktoscopehat/planktoscope/imager/__init__.py b/control/planktoscopehat/planktoscope/imager/__init__.py index 89756e2a..77d0bb6e 100644 --- a/control/planktoscopehat/planktoscope/imager/__init__.py +++ b/control/planktoscopehat/planktoscope/imager/__init__.py @@ -1 +1 @@ -"""imagernew provides high-level functionality for performing image acquisition.""" +"""imager provides high-level functionality for performing image acquisition.""" diff --git a/control/planktoscopehat/planktoscope/imager/mqtt.py b/control/planktoscopehat/planktoscope/imager/mqtt.py index cf4ff9f6..59b110ec 100644 --- a/control/planktoscopehat/planktoscope/imager/mqtt.py +++ b/control/planktoscopehat/planktoscope/imager/mqtt.py @@ -12,7 +12,7 @@ from planktoscope import identity, integrity, mqtt from planktoscope.camera import mqtt as camera -from planktoscope.imagernew import stopflow +from planktoscope.imager import stopflow loguru.logger.info("planktoscope.imager is loaded") @@ -62,7 +62,9 @@ def run(self) -> None: couldn't be started (e.g. because the camera is missing), it will clean up and then wait until the `stop_event` event is set before quitting. """ - loguru.logger.info(f"The imager control thread has been started in process {os.getpid()}") + loguru.logger.info( + f"The imager control thread has been started in process {os.getpid()}" + ) self._mqtt = mqtt.MQTT_Client(topic="imager/#", name="imager_client") self._mqtt.client.publish("status/imager", '{"status":"Starting up"}') @@ -75,10 +77,16 @@ def run(self) -> None: self._camera = camera.Worker() self._camera.start() if self._camera.camera is None: - loguru.logger.error("Missing camera - maybe it's disconnected or it never started?") + loguru.logger.error( + "Missing camera - maybe it's disconnected or it never started?" + ) # TODO(ethanjli): officially add this error status to the MQTT API! - self._mqtt.client.publish("status/imager", '{"status": "Error: missing camera"}') - loguru.logger.success("Preemptively preparing to shut down since there's no camera...") + self._mqtt.client.publish( + "status/imager", '{"status": "Error: missing camera"}' + ) + loguru.logger.success( + "Preemptively preparing to shut down since there's no camera..." + ) self._cleanup() # TODO(ethanjli): currently we just wait and do nothing until we receive the shutdown # signal, because if we return early then the hardware controller will either shut down @@ -95,7 +103,10 @@ def run(self) -> None: self._mqtt.client.publish("status/imager", '{"status":"Ready"}') try: while not self._stop_event_loop.is_set(): - if self._active_routine is not None and not self._active_routine.is_alive(): + if ( + self._active_routine is not None + and not self._active_routine.is_alive() + ): # Garbage-collect any finished image-acquisition routine threads so that we're # ready for the next configuration update command which arrives: self._active_routine.stop() @@ -163,8 +174,12 @@ def _update_metadata(self, latest_message: dict[str, typing.Any]) -> None: return if "config" not in latest_message: - loguru.logger.error(f"Received message is missing field 'config': {latest_message}") - self._mqtt.client.publish("status/imager", '{"status":"Configuration message error"}') + loguru.logger.error( + f"Received message is missing field 'config': {latest_message}" + ) + self._mqtt.client.publish( + "status/imager", '{"status":"Configuration message error"}' + ) return loguru.logger.info("Updating configuration...") @@ -178,16 +193,22 @@ def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None: assert self._pump is not None assert self._camera is not None - if (acquisition_settings := _parse_acquisition_settings(latest_message)) is None: + if ( + acquisition_settings := _parse_acquisition_settings(latest_message) + ) is None: self._mqtt.client.publish("status/imager", '{"status":"Error"}') return if self._camera.camera is None: loguru.logger.error("Missing camera - maybe it was closed?") # TODO(ethanjli): officially add this error status to the MQTT API! - self._mqtt.client.publish("status/imager", '{"status": "Error: missing camera"}') + self._mqtt.client.publish( + "status/imager", '{"status": "Error: missing camera"}' + ) raise RuntimeError("Camera is not available") - assert (capture_size := self._camera.camera.stream_config.capture_size) is not None + assert ( + capture_size := self._camera.camera.stream_config.capture_size + ) is not None camera_settings = self._camera.camera.settings assert (image_gain := camera_settings.image_gain) is not None machine_name = identity.load_machine_name() @@ -216,7 +237,9 @@ def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None: return self._active_routine = ImageAcquisitionRoutine( - stopflow.Routine(output_path, acquisition_settings, self._pump, self._camera.camera), + stopflow.Routine( + output_path, acquisition_settings, self._pump, self._camera.camera + ), self._mqtt, ) self._active_routine.start() @@ -250,7 +273,9 @@ def _parse_acquisition_settings( total_images=int(latest_message["nb_frame"]), stabilization_duration=float(latest_message["sleep"]), pump=stopflow.DiscretePumpSettings( - direction=stopflow.PumpDirection(latest_message.get("pump_direction", "FORWARD")), + direction=stopflow.PumpDirection( + latest_message.get("pump_direction", "FORWARD") + ), flowrate=float(latest_message.get("pump_flowrate", 2)), volume=float(latest_message["volume"]), ), @@ -317,7 +342,9 @@ class ImageAcquisitionRoutine(threading.Thread): # TODO(ethanjli): instead of taking an arg of type mqtt.MQTT_CLIENT, just take an arg of # whatever `mqtt_client.client`'s type is supposed to be. Or maybe we should just initialize # our own MQTT client in here? - def __init__(self, routine: stopflow.Routine, mqtt_client: mqtt.MQTT_Client) -> None: + def __init__( + self, routine: stopflow.Routine, mqtt_client: mqtt.MQTT_Client + ) -> None: """Initialize the thread. Args: @@ -335,7 +362,9 @@ def run(self) -> None: if (result := self._routine.run_step()) is None: if self._routine.interrupted: loguru.logger.debug("Image-acquisition routine was interrupted!") - self._mqtt_client.publish("status/imager", '{"status":"Interrupted"}') + self._mqtt_client.publish( + "status/imager", '{"status":"Interrupted"}' + ) break loguru.logger.debug("Image-acquisition routine ran to completion!") self._mqtt_client.publish("status/imager", '{"status":"Done"}') @@ -415,7 +444,9 @@ def _receive_messages(self) -> None: continue if self._mqtt.msg["payload"]["status"] not in {"Done", "Interrupted"}: - loguru.logger.debug(f"Ignoring pump status update: {self._mqtt.msg['payload']}") + loguru.logger.debug( + f"Ignoring pump status update: {self._mqtt.msg['payload']}" + ) self._mqtt.read_message() continue