Skip to content

Commit

Permalink
Update subpackage name references
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanjli committed Dec 26, 2024
1 parent 7b4de1f commit 658ea2d
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 34 deletions.
2 changes: 1 addition & 1 deletion control/adafruithat/planktoscope/imager/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
"""imagernew provides high-level functionality for performing image acquisition."""
"""imager provides high-level functionality for performing image acquisition."""
63 changes: 47 additions & 16 deletions control/adafruithat/planktoscope/imager/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from planktoscope import identity, integrity, mqtt
from planktoscope.camera import mqtt as camera
from planktoscope.imagernew import stopflow
from planktoscope.imager import stopflow

loguru.logger.info("planktoscope.imager is loaded")

Expand Down Expand Up @@ -62,7 +62,9 @@ def run(self) -> None:
couldn't be started (e.g. because the camera is missing), it will clean up and then wait
until the `stop_event` event is set before quitting.
"""
loguru.logger.info(f"The imager control thread has been started in process {os.getpid()}")
loguru.logger.info(
f"The imager control thread has been started in process {os.getpid()}"
)
self._mqtt = mqtt.MQTT_Client(topic="imager/#", name="imager_client")
self._mqtt.client.publish("status/imager", '{"status":"Starting up"}')

Expand All @@ -75,10 +77,16 @@ def run(self) -> None:
self._camera = camera.Worker()
self._camera.start()
if self._camera.camera is None:
loguru.logger.error("Missing camera - maybe it's disconnected or it never started?")
loguru.logger.error(
"Missing camera - maybe it's disconnected or it never started?"
)
# TODO(ethanjli): officially add this error status to the MQTT API!
self._mqtt.client.publish("status/imager", '{"status": "Error: missing camera"}')
loguru.logger.success("Preemptively preparing to shut down since there's no camera...")
self._mqtt.client.publish(
"status/imager", '{"status": "Error: missing camera"}'
)
loguru.logger.success(
"Preemptively preparing to shut down since there's no camera..."
)
self._cleanup()
# TODO(ethanjli): currently we just wait and do nothing until we receive the shutdown
# signal, because if we return early then the hardware controller will either shut down
Expand All @@ -95,7 +103,10 @@ def run(self) -> None:
self._mqtt.client.publish("status/imager", '{"status":"Ready"}')
try:
while not self._stop_event_loop.is_set():
if self._active_routine is not None and not self._active_routine.is_alive():
if (
self._active_routine is not None
and not self._active_routine.is_alive()
):
# Garbage-collect any finished image-acquisition routine threads so that we're
# ready for the next configuration update command which arrives:
self._active_routine.stop()
Expand Down Expand Up @@ -163,8 +174,12 @@ def _update_metadata(self, latest_message: dict[str, typing.Any]) -> None:
return

if "config" not in latest_message:
loguru.logger.error(f"Received message is missing field 'config': {latest_message}")
self._mqtt.client.publish("status/imager", '{"status":"Configuration message error"}')
loguru.logger.error(
f"Received message is missing field 'config': {latest_message}"
)
self._mqtt.client.publish(
"status/imager", '{"status":"Configuration message error"}'
)
return

loguru.logger.info("Updating configuration...")
Expand All @@ -178,16 +193,22 @@ def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None:
assert self._pump is not None
assert self._camera is not None

if (acquisition_settings := _parse_acquisition_settings(latest_message)) is None:
if (
acquisition_settings := _parse_acquisition_settings(latest_message)
) is None:
self._mqtt.client.publish("status/imager", '{"status":"Error"}')
return
if self._camera.camera is None:
loguru.logger.error("Missing camera - maybe it was closed?")
# TODO(ethanjli): officially add this error status to the MQTT API!
self._mqtt.client.publish("status/imager", '{"status": "Error: missing camera"}')
self._mqtt.client.publish(
"status/imager", '{"status": "Error: missing camera"}'
)
raise RuntimeError("Camera is not available")

assert (capture_size := self._camera.camera.stream_config.capture_size) is not None
assert (
capture_size := self._camera.camera.stream_config.capture_size
) is not None
camera_settings = self._camera.camera.settings
assert (image_gain := camera_settings.image_gain) is not None
machine_name = identity.load_machine_name()
Expand Down Expand Up @@ -216,7 +237,9 @@ def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None:
return

self._active_routine = ImageAcquisitionRoutine(
stopflow.Routine(output_path, acquisition_settings, self._pump, self._camera.camera),
stopflow.Routine(
output_path, acquisition_settings, self._pump, self._camera.camera
),
self._mqtt,
)
self._active_routine.start()
Expand Down Expand Up @@ -250,7 +273,9 @@ def _parse_acquisition_settings(
total_images=int(latest_message["nb_frame"]),
stabilization_duration=float(latest_message["sleep"]),
pump=stopflow.DiscretePumpSettings(
direction=stopflow.PumpDirection(latest_message.get("pump_direction", "FORWARD")),
direction=stopflow.PumpDirection(
latest_message.get("pump_direction", "FORWARD")
),
flowrate=float(latest_message.get("pump_flowrate", 2)),
volume=float(latest_message["volume"]),
),
Expand Down Expand Up @@ -317,7 +342,9 @@ class ImageAcquisitionRoutine(threading.Thread):
# TODO(ethanjli): instead of taking an arg of type mqtt.MQTT_CLIENT, just take an arg of
# whatever `mqtt_client.client`'s type is supposed to be. Or maybe we should just initialize
# our own MQTT client in here?
def __init__(self, routine: stopflow.Routine, mqtt_client: mqtt.MQTT_Client) -> None:
def __init__(
self, routine: stopflow.Routine, mqtt_client: mqtt.MQTT_Client
) -> None:
"""Initialize the thread.
Args:
Expand All @@ -335,7 +362,9 @@ def run(self) -> None:
if (result := self._routine.run_step()) is None:
if self._routine.interrupted:
loguru.logger.debug("Image-acquisition routine was interrupted!")
self._mqtt_client.publish("status/imager", '{"status":"Interrupted"}')
self._mqtt_client.publish(
"status/imager", '{"status":"Interrupted"}'
)
break
loguru.logger.debug("Image-acquisition routine ran to completion!")
self._mqtt_client.publish("status/imager", '{"status":"Done"}')
Expand Down Expand Up @@ -415,7 +444,9 @@ def _receive_messages(self) -> None:
continue

if self._mqtt.msg["payload"]["status"] not in {"Done", "Interrupted"}:
loguru.logger.debug(f"Ignoring pump status update: {self._mqtt.msg['payload']}")
loguru.logger.debug(
f"Ignoring pump status update: {self._mqtt.msg['payload']}"
)
self._mqtt.read_message()
continue

Expand Down
2 changes: 1 addition & 1 deletion control/planktoscopehat/planktoscope/imager/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
"""imagernew provides high-level functionality for performing image acquisition."""
"""imager provides high-level functionality for performing image acquisition."""
63 changes: 47 additions & 16 deletions control/planktoscopehat/planktoscope/imager/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from planktoscope import identity, integrity, mqtt
from planktoscope.camera import mqtt as camera
from planktoscope.imagernew import stopflow
from planktoscope.imager import stopflow

loguru.logger.info("planktoscope.imager is loaded")

Expand Down Expand Up @@ -62,7 +62,9 @@ def run(self) -> None:
couldn't be started (e.g. because the camera is missing), it will clean up and then wait
until the `stop_event` event is set before quitting.
"""
loguru.logger.info(f"The imager control thread has been started in process {os.getpid()}")
loguru.logger.info(
f"The imager control thread has been started in process {os.getpid()}"
)
self._mqtt = mqtt.MQTT_Client(topic="imager/#", name="imager_client")
self._mqtt.client.publish("status/imager", '{"status":"Starting up"}')

Expand All @@ -75,10 +77,16 @@ def run(self) -> None:
self._camera = camera.Worker()
self._camera.start()
if self._camera.camera is None:
loguru.logger.error("Missing camera - maybe it's disconnected or it never started?")
loguru.logger.error(
"Missing camera - maybe it's disconnected or it never started?"
)
# TODO(ethanjli): officially add this error status to the MQTT API!
self._mqtt.client.publish("status/imager", '{"status": "Error: missing camera"}')
loguru.logger.success("Preemptively preparing to shut down since there's no camera...")
self._mqtt.client.publish(
"status/imager", '{"status": "Error: missing camera"}'
)
loguru.logger.success(
"Preemptively preparing to shut down since there's no camera..."
)
self._cleanup()
# TODO(ethanjli): currently we just wait and do nothing until we receive the shutdown
# signal, because if we return early then the hardware controller will either shut down
Expand All @@ -95,7 +103,10 @@ def run(self) -> None:
self._mqtt.client.publish("status/imager", '{"status":"Ready"}')
try:
while not self._stop_event_loop.is_set():
if self._active_routine is not None and not self._active_routine.is_alive():
if (
self._active_routine is not None
and not self._active_routine.is_alive()
):
# Garbage-collect any finished image-acquisition routine threads so that we're
# ready for the next configuration update command which arrives:
self._active_routine.stop()
Expand Down Expand Up @@ -163,8 +174,12 @@ def _update_metadata(self, latest_message: dict[str, typing.Any]) -> None:
return

if "config" not in latest_message:
loguru.logger.error(f"Received message is missing field 'config': {latest_message}")
self._mqtt.client.publish("status/imager", '{"status":"Configuration message error"}')
loguru.logger.error(
f"Received message is missing field 'config': {latest_message}"
)
self._mqtt.client.publish(
"status/imager", '{"status":"Configuration message error"}'
)
return

loguru.logger.info("Updating configuration...")
Expand All @@ -178,16 +193,22 @@ def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None:
assert self._pump is not None
assert self._camera is not None

if (acquisition_settings := _parse_acquisition_settings(latest_message)) is None:
if (
acquisition_settings := _parse_acquisition_settings(latest_message)
) is None:
self._mqtt.client.publish("status/imager", '{"status":"Error"}')
return
if self._camera.camera is None:
loguru.logger.error("Missing camera - maybe it was closed?")
# TODO(ethanjli): officially add this error status to the MQTT API!
self._mqtt.client.publish("status/imager", '{"status": "Error: missing camera"}')
self._mqtt.client.publish(
"status/imager", '{"status": "Error: missing camera"}'
)
raise RuntimeError("Camera is not available")

assert (capture_size := self._camera.camera.stream_config.capture_size) is not None
assert (
capture_size := self._camera.camera.stream_config.capture_size
) is not None
camera_settings = self._camera.camera.settings
assert (image_gain := camera_settings.image_gain) is not None
machine_name = identity.load_machine_name()
Expand Down Expand Up @@ -216,7 +237,9 @@ def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None:
return

self._active_routine = ImageAcquisitionRoutine(
stopflow.Routine(output_path, acquisition_settings, self._pump, self._camera.camera),
stopflow.Routine(
output_path, acquisition_settings, self._pump, self._camera.camera
),
self._mqtt,
)
self._active_routine.start()
Expand Down Expand Up @@ -250,7 +273,9 @@ def _parse_acquisition_settings(
total_images=int(latest_message["nb_frame"]),
stabilization_duration=float(latest_message["sleep"]),
pump=stopflow.DiscretePumpSettings(
direction=stopflow.PumpDirection(latest_message.get("pump_direction", "FORWARD")),
direction=stopflow.PumpDirection(
latest_message.get("pump_direction", "FORWARD")
),
flowrate=float(latest_message.get("pump_flowrate", 2)),
volume=float(latest_message["volume"]),
),
Expand Down Expand Up @@ -317,7 +342,9 @@ class ImageAcquisitionRoutine(threading.Thread):
# TODO(ethanjli): instead of taking an arg of type mqtt.MQTT_CLIENT, just take an arg of
# whatever `mqtt_client.client`'s type is supposed to be. Or maybe we should just initialize
# our own MQTT client in here?
def __init__(self, routine: stopflow.Routine, mqtt_client: mqtt.MQTT_Client) -> None:
def __init__(
self, routine: stopflow.Routine, mqtt_client: mqtt.MQTT_Client
) -> None:
"""Initialize the thread.
Args:
Expand All @@ -335,7 +362,9 @@ def run(self) -> None:
if (result := self._routine.run_step()) is None:
if self._routine.interrupted:
loguru.logger.debug("Image-acquisition routine was interrupted!")
self._mqtt_client.publish("status/imager", '{"status":"Interrupted"}')
self._mqtt_client.publish(
"status/imager", '{"status":"Interrupted"}'
)
break
loguru.logger.debug("Image-acquisition routine ran to completion!")
self._mqtt_client.publish("status/imager", '{"status":"Done"}')
Expand Down Expand Up @@ -415,7 +444,9 @@ def _receive_messages(self) -> None:
continue

if self._mqtt.msg["payload"]["status"] not in {"Done", "Interrupted"}:
loguru.logger.debug(f"Ignoring pump status update: {self._mqtt.msg['payload']}")
loguru.logger.debug(
f"Ignoring pump status update: {self._mqtt.msg['payload']}"
)
self._mqtt.read_message()
continue

Expand Down

0 comments on commit 658ea2d

Please sign in to comment.