From ac34f6c44ef0aa2b7b041812963bb8c5593d3452 Mon Sep 17 00:00:00 2001 From: Ethan Li Date: Thu, 26 Dec 2024 09:08:05 -0800 Subject: [PATCH 1/3] Delete the camera controller for the old raspimjpeg-based imager --- .../planktoscope/imager/__init__.py | 761 +----------------- .../{imagernew => imager}/mqtt.py | 0 .../planktoscope/imager/raspimjpeg.py | 540 ------------- .../planktoscope/imager/state_machine.py | 88 -- .../{imagernew => imager}/stopflow.py | 0 .../planktoscope/imager/streamer.py | 77 -- .../planktoscope/imagernew/__init__.py | 1 - control/planktoscopehat/main.py | 25 +- .../planktoscope/imager/__init__.py | 732 +---------------- .../{imagernew => imager}/mqtt.py | 0 .../planktoscope/imager/raspimjpeg.py | 519 ------------ .../planktoscope/imager/state_machine.py | 71 -- .../{imagernew => imager}/stopflow.py | 0 .../planktoscope/imager/streamer.py | 79 -- .../planktoscope/imagernew/__init__.py | 1 - 15 files changed, 17 insertions(+), 2877 deletions(-) rename control/adafruithat/planktoscope/{imagernew => imager}/mqtt.py (100%) delete mode 100644 control/adafruithat/planktoscope/imager/raspimjpeg.py delete mode 100644 control/adafruithat/planktoscope/imager/state_machine.py rename control/adafruithat/planktoscope/{imagernew => imager}/stopflow.py (100%) delete mode 100644 control/adafruithat/planktoscope/imager/streamer.py delete mode 100644 control/adafruithat/planktoscope/imagernew/__init__.py rename control/planktoscopehat/planktoscope/{imagernew => imager}/mqtt.py (100%) delete mode 100644 control/planktoscopehat/planktoscope/imager/raspimjpeg.py delete mode 100644 control/planktoscopehat/planktoscope/imager/state_machine.py rename control/planktoscopehat/planktoscope/{imagernew => imager}/stopflow.py (100%) delete mode 100644 control/planktoscopehat/planktoscope/imager/streamer.py delete mode 100644 control/planktoscopehat/planktoscope/imagernew/__init__.py diff --git a/control/adafruithat/planktoscope/imager/__init__.py b/control/adafruithat/planktoscope/imager/__init__.py index c491c891..89756e2a 100644 --- a/control/adafruithat/planktoscope/imager/__init__.py +++ b/control/adafruithat/planktoscope/imager/__init__.py @@ -1,760 +1 @@ -# Copyright (C) 2021 Romain Bazile -# -# This file is part of the PlanktoScope software. -# -# PlanktoScope is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# PlanktoScope is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with PlanktoScope. If not, see . - -import datetime # needed to get date and time for folder name and filename -import time # needed to able to sleep for a given duration -import json -import os -import shutil -import multiprocessing -import threading # needed for the streaming server -import functools # needed for the streaming server - -from loguru import logger - - -import planktoscope.mqtt -import planktoscope.light -import planktoscope.imager.state_machine -import planktoscope.imager.raspimjpeg -import planktoscope.imager.streamer -import planktoscope.integrity -import planktoscope.identity - - -logger.info("planktoscope.imager is loaded") - -################################################################################ -# Main Imager class -################################################################################ -class ImagerProcess(multiprocessing.Process): - """This class contains the main definitions for the imager of the PlanktoScope""" - - def __init__(self, stop_event, iso=100, shutter_speed=1): - """Initialize the Imager class - - Args: - stop_event (multiprocessing.Event): shutdown event - iso (int, optional): ISO sensitivity. Defaults to 100. - shutter_speed (int, optional): Shutter speed of the camera. Defaults to 500. - """ - super(ImagerProcess, self).__init__(name="imager") - - logger.info("planktoscope.imager is initialising") - - if os.path.exists("/home/pi/PlanktoScope/hardware.json"): - # load hardware.json - with open("/home/pi/PlanktoScope/hardware.json", "r") as config_file: - configuration = json.load(config_file) - logger.debug(f"Hardware configuration loaded is {configuration}") - else: - logger.info( - "The hardware configuration file doesn't exists, using defaults" - ) - configuration = {} - - self.__camera_type = "v2.1" - - # parse the config data. If the key is absent, we are using the default value - self.__camera_type = configuration.get("camera_type", self.__camera_type) - - self.stop_event = stop_event - self.__imager = planktoscope.imager.state_machine.Imager() - self.__img_goal = 0 - self.__img_done = 0 - self.__sleep_before = None - self.__pump_volume = None - self.__pump_direction = "FORWARD" - self.__img_goal = None - self.imager_client = None - self.__error = 0 - - # Initialise the camera and the process - # Also starts the streaming to the temporary file - self.__camera = planktoscope.imager.raspimjpeg.raspimjpeg() - - try: - self.__camera.start() - except Exception as e: - logger.exception( - f"An exception has occured when starting up raspimjpeg: {e}" - ) - try: - self.__camera.start(True) - except Exception as e: - logger.exception( - f"A second exception has occured when starting up raspimjpeg: {e}" - ) - logger.error("This error can't be recovered from, terminating now") - raise e - - if self.__camera.sensor_name == "IMX219": # Camera v2.1 - self.__resolution = (3280, 2464) - elif self.__camera.sensor_name == "IMX477": # Camera HQ - self.__resolution = (4056, 3040) - else: - self.__resolution = (1280, 1024) - logger.error( - f"The connected camera {self.__camera.sensor_name} is not recognized, please check your camera" - ) - - self.__iso = iso - self.__shutter_speed = shutter_speed - self.__exposure_mode = "auto" - self.__white_balance = "off" - self.__white_balance_gain = ( - int(configuration.get("red_gain", 2.00) * 100), - int(configuration.get("blue_gain", 1.40) * 100), - ) - self.__image_gain = ( - int(configuration.get("analog_gain", 1.00) * 100), - int(configuration.get("digital_gain", 1.00) * 100), - ) - - self.__base_path = "/home/pi/data/img" - # Let's make sure the base path exists - if not os.path.exists(self.__base_path): - os.makedirs(self.__base_path) - - self.__export_path = "" - self.__global_metadata = None - - logger.info("Initialising the camera with the default settings") - try: - self.__camera.resolution = self.__resolution - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the resolution, trying again" - ) - self.__camera.resolution = self.__resolution - time.sleep(0.1) - - try: - self.__camera.iso = self.__iso - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the ISO number, trying again" - ) - self.__camera.iso = self.__iso - time.sleep(0.1) - - try: - self.__camera.shutter_speed = self.__shutter_speed - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the shutter speed, trying again" - ) - self.__camera.shutter_speed = self.__shutter_speed - time.sleep(0.1) - - try: - self.__camera.exposure_mode = self.__exposure_mode - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the exposure mode, trying again" - ) - self.__camera.exposure_mode = self.__exposure_mode - time.sleep(0.1) - - try: - self.__camera.white_balance = self.__white_balance - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the white balance mode, trying again" - ) - self.__camera.white_balance = self.__white_balance - time.sleep(0.1) - - try: - self.__camera.white_balance_gain = self.__white_balance_gain - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the white balance gain, trying again" - ) - self.__camera.white_balance_gain = self.__white_balance_gain - time.sleep(0.1) - - try: - self.__camera.image_gain = self.__image_gain - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the white balance gain, trying again" - ) - self.__camera.image_gain = self.__image_gain - - logger.success("planktoscope.imager is initialised and ready to go!") - - def __message_image(self, last_message): - """Actions for when we receive a message""" - if ( - "sleep" not in last_message - or "volume" not in last_message - or "nb_frame" not in last_message - or "pump_direction" not in last_message - ): - logger.error(f"The received message has the wrong argument {last_message}") - self.imager_client.client.publish("status/imager", '{"status":"Error"}') - return - self.__imager.change(planktoscope.imager.state_machine.Imaging) - - # Get duration to wait before an image from the different received arguments - self.__sleep_before = float(last_message["sleep"]) - - # Get volume in between two images from the different received arguments - self.__pump_volume = float(last_message["volume"]) - - # Get the pump direction message - self.__pump_direction = last_message["pump_direction"] - - # Get the number of frames to image from the different received arguments - self.__img_goal = int(last_message["nb_frame"]) - - # Reset the counter to 0 - self.__img_done = 0 - - self.imager_client.client.publish("status/imager", '{"status":"Started"}') - - def __message_stop(self): - self.imager_client.client.unsubscribe("status/pump") - - # Stops the pump - self.imager_client.client.publish("actuator/pump", '{"action": "stop"}') - - logger.info("The imaging has been interrupted.") - - # Publish the status "Interrupted" to via MQTT to Node-RED - self.imager_client.client.publish("status/imager", '{"status":"Interrupted"}') - - planktoscope.light.interrupted() - - self.__imager.change(planktoscope.imager.state_machine.Stop) - - def __message_update(self, last_message): - if self.__imager.state.name == "stop": - if "config" not in last_message: - logger.error( - f"The received message has the wrong argument {last_message}" - ) - self.imager_client.client.publish( - "status/imager", '{"status":"Configuration message error"}' - ) - return - - logger.info("Updating the configuration now with the received data") - # Updating the configuration with the passed parameter in payload["config"] - self.__global_metadata = last_message["config"] - - # Publish the status "Config updated" to via MQTT to Node-RED - self.imager_client.client.publish( - "status/imager", '{"status":"Config updated"}' - ) - logger.info("Configuration has been updated") - else: - logger.error("We can't update the configuration while we are imaging.") - # Publish the status "Interrupted" to via MQTT to Node-RED - self.imager_client.client.publish("status/imager", '{"status":"Busy"}') - - def __message_settings(self, last_message): - # TODO simplify this method, move timeout error check inside self.__camera.resolution/iso/etc. - if self.__imager.state.name == "stop": - if "settings" not in last_message: - logger.error( - f"The received message has the wrong argument {last_message}" - ) - self.imager_client.client.publish( - "status/imager", '{"status":"Camera settings error"}' - ) - return - logger.info("Updating the camera settings now with the received data") - # Updating the configuration with the passed parameter in payload["config"] - settings = last_message["settings"] - if "resolution" in settings: - self.__resolution = settings.get("resolution", self.__resolution) - logger.debug(f"Updating the camera resolution to {self.__resolution}") - try: - self.__camera.resolution = self.__resolution - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the resolution, trying again" - ) - self.__camera.resolution = self.__resolution - except ValueError as e: - logger.error("The requested resolution is not valid!") - self.imager_client.client.publish( - "status/imager", '{"status":"Error: Resolution not valid"}' - ) - return - - if "iso" in settings: - self.__iso = settings.get("iso", self.__iso) - logger.debug(f"Updating the camera iso to {self.__iso}") - try: - self.__camera.iso = self.__iso - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the ISO number, trying again" - ) - self.__camera.iso = self.__iso - except ValueError as e: - logger.error("The requested ISO number is not valid!") - self.imager_client.client.publish( - "status/imager", '{"status":"Error: Iso number not valid"}' - ) - return - - if "shutter_speed" in settings: - self.__shutter_speed = settings.get( - "shutter_speed", self.__shutter_speed - ) - logger.debug( - f"Updating the camera shutter speed to {self.__shutter_speed}" - ) - try: - self.__camera.shutter_speed = self.__shutter_speed - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the shutter speed, trying again" - ) - self.__camera.shutter_speed = self.__shutter_speed - except ValueError as e: - logger.error("The requested shutter speed is not valid!") - self.imager_client.client.publish( - "status/imager", '{"status":"Error: Shutter speed not valid"}' - ) - return - - if "white_balance_gain" in settings: - if "red" in settings["white_balance_gain"]: - logger.debug( - f"Updating the camera white balance red gain to {settings['white_balance_gain']}" - ) - self.__white_balance_gain = ( - settings["white_balance_gain"].get( - "red", self.__white_balance_gain[0] - ), - self.__white_balance_gain[1], - ) - if "blue" in settings["white_balance_gain"]: - logger.debug( - f"Updating the camera white balance blue gain to {settings['white_balance_gain']}" - ) - self.__white_balance_gain = ( - self.__white_balance_gain[0], - settings["white_balance_gain"].get( - "blue", self.__white_balance_gain[1] - ), - ) - logger.debug( - f"Updating the camera white balance gain to {self.__white_balance_gain}" - ) - try: - self.__camera.white_balance_gain = self.__white_balance_gain - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the white balance gain, trying again" - ) - self.__camera.white_balance_gain = self.__white_balance_gain - except ValueError as e: - logger.error("The requested white balance gain is not valid!") - self.imager_client.client.publish( - "status/imager", - '{"status":"Error: White balance gain not valid"}', - ) - return - - if "white_balance" in settings: - logger.debug( - f"Updating the camera white balance mode to {settings['white_balance']}" - ) - self.__white_balance = settings.get( - "white_balance", self.__white_balance - ) - logger.debug( - f"Updating the camera white balance mode to {self.__white_balance}" - ) - try: - self.__camera.white_balance = self.__white_balance - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the white balance, trying again" - ) - self.__camera.white_balance = self.__white_balance - except ValueError as e: - logger.error("The requested white balance is not valid!") - self.imager_client.client.publish( - "status/imager", - f'{"status":"Error: White balance mode {self.__white_balance} is not valid"}', - ) - return - - if "image_gain" in settings: - if "analog" in settings["image_gain"]: - logger.debug( - f"Updating the camera image analog gain to {settings['image_gain']}" - ) - self.__image_gain = ( - settings["image_gain"].get("analog", self.__image_gain[0]), - self.__image_gain[1], - ) - if "digital" in settings["image_gain"]: - logger.debug( - f"Updating the camera image digital gain to {settings['image_gain']}" - ) - self.__image_gain = ( - self.__image_gain[0], - settings["image_gain"].get("digital", self.__image_gain[1]), - ) - logger.debug(f"Updating the camera image gain to {self.__image_gain}") - try: - self.__camera.image_gain = self.__image_gain - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the white balance gain, trying again" - ) - self.__camera.image_gain = self.__image_gain - except ValueError as e: - logger.error("The requested image gain is not valid!") - self.imager_client.client.publish( - "status/imager", - '{"status":"Error: Image gain not valid"}', - ) - return - # Publish the status "Config updated" to via MQTT to Node-RED - self.imager_client.client.publish( - "status/imager", '{"status":"Camera settings updated"}' - ) - logger.info("Camera settings have been updated") - else: - logger.error("We can't update the camera settings while we are imaging.") - # Publish the status "Interrupted" to via MQTT to Node-RED - self.imager_client.client.publish("status/imager", '{"status":"Busy"}') - - @logger.catch - def treat_message(self): - action = "" - logger.info("We received a new message") - if self.imager_client.msg["topic"].startswith("imager/"): - last_message = self.imager_client.msg["payload"] - logger.debug(last_message) - action = self.imager_client.msg["payload"]["action"] - logger.debug(action) - elif self.imager_client.msg["topic"] == "status/pump": - logger.debug( - f"Status message payload is {self.imager_client.msg['payload']}" - ) - if self.__imager.state.name == "waiting": - if self.imager_client.msg["payload"]["status"] == "Done": - self.__imager.change(planktoscope.imager.state_machine.Capture) - self.imager_client.client.unsubscribe("status/pump") - else: - logger.info( - f"The pump is not done yet {self.imager_client.msg['payload']}" - ) - else: - logger.error( - "There is an error, we received an unexpected pump message" - ) - else: - logger.error( - f"The received message was not for us! Topic was {self.imager_client.msg['topic']}" - ) - self.imager_client.read_message() - - # If the command is "image" - if action == "image": - # {"action":"image","sleep":5,"volume":1,"nb_frame":200} - self.__message_image(last_message) - - elif action == "stop": - self.__message_stop() - - elif action == "update_config": - self.__message_update(last_message) - - elif action == "settings": - self.__message_settings(last_message) - - elif action not in ["image", "stop", "update_config", "settings", ""]: - logger.warning( - f"We did not understand the received request {action} - {last_message}" - ) - - def __pump_message(self): - """Sends a message to the pump process""" - - planktoscope.light.pumping() - - # Pump during a given volume - self.imager_client.client.publish( - "actuator/pump", - json.dumps( - { - "action": "move", - "direction": self.__pump_direction, - "volume": self.__pump_volume, - "flowrate": 2, - } - ), - ) - - def __state_imaging(self): - # subscribe to status/pump - self.imager_client.client.subscribe("status/pump") - - # Definition of the few important metadata - local_metadata = { - "acq_local_datetime": datetime.datetime.now().isoformat().split(".")[0], - "acq_camera_resolution": f"{self.__resolution[0]}x{self.__resolution[1]}", - "acq_camera_iso": self.__iso, - "acq_camera_shutter_speed": self.__shutter_speed, - "acq_uuid": planktoscope.identity.load_machine_name(), - "sample_uuid": planktoscope.identity.load_machine_name(), - } - - # Concat the local metadata and the metadata from Node-RED - self.__global_metadata = {**self.__global_metadata, **local_metadata} - - if "object_date" not in self.__global_metadata: - # If this path exists, then ids are reused when they should not - logger.error("The metadata did not contain object_date!") - self.imager_client.client.publish( - "status/imager", - '{"status":"Configuration update error: object_date is missing!"}', - ) - # Reset the counter to 0 - self.__img_done = 0 - # Change state towards stop - self.__imager.change(planktoscope.imager.state_machine.Stop) - planktoscope.light.error() - return - - logger.info("Setting up the directory structure for storing the pictures") - self.__export_path = os.path.join( - self.__base_path, - self.__global_metadata["object_date"], - str(self.__global_metadata["sample_id"]).replace(" ", "_").strip("'"), - str(self.__global_metadata["acq_id"]).replace(" ", "_").strip("'"), - ) - - if os.path.exists(self.__export_path): - # If this path exists, then ids are reused when they should not - logger.error(f"The export path at {self.__export_path} already exists") - self.imager_client.client.publish( - "status/imager", - '{"status":"Configuration update error: Chosen id are already in use!"}', - ) - # Reset the counter to 0 - self.__img_done = 0 - self.__imager.change(planktoscope.imager.state_machine.Stop) - planktoscope.light.error() - return - else: - # create the path! - os.makedirs(self.__export_path) - - # Export the metadata to a json file - logger.info("Exporting the metadata to a metadata.json") - metadata_filepath = os.path.join(self.__export_path, "metadata.json") - with open(metadata_filepath, "w") as metadata_file: - json.dump(self.__global_metadata, metadata_file, indent=4) - logger.debug( - f"Metadata dumped in {metadata_file} are {self.__global_metadata}" - ) - - # Create the integrity file in this export path - try: - planktoscope.integrity.create_integrity_file(self.__export_path) - except FileExistsError as e: - logger.info( - f"The integrity file already exists in this export path {self.__export_path}" - ) - # Add the metadata.json file to the integrity file - try: - planktoscope.integrity.append_to_integrity_file(metadata_filepath) - except FileNotFoundError as e: - logger.error( - f"{metadata_filepath} was not found, the metadata.json may not have been created properly!" - ) - - self.__pump_message() - - self.__imager.change(planktoscope.imager.state_machine.Waiting) - - def __state_capture(self): - planktoscope.light.imaging() - - filename = f"{datetime.datetime.now().strftime('%H_%M_%S_%f')}.jpg" - - # Define the filename of the image - filename_path = os.path.join(self.__export_path, filename) - - logger.info( - f"Capturing image {self.__img_done + 1}/{self.__img_goal} to {filename_path}" - ) - - # Sleep a duration before to start acquisition - time.sleep(self.__sleep_before) - - # Capture an image to the temporary file - try: - self.__camera.capture("", timeout=5) - except TimeoutError as e: - self.__capture_error("timeout during capture") - return - - logger.debug(f"Copying the image from the temp file to {filename_path}") - shutil.copy("/dev/shm/mjpeg/image.jpg", filename_path) # nosec - # TODO Try to stop the camera streaming and display instead each captured image - # os.rename("/dev/shm/mjpeg/image.jpg", "/dev/shm/mjpeg/cam.jpg") - logger.debug("Syncing the disk") - os.sync() - - # Add the checksum of the captured image to the integrity file - try: - planktoscope.integrity.append_to_integrity_file(filename_path) - except FileNotFoundError as e: - self.__capture_error(f"{filename_path} was not found") - return - - self.imager_client.client.publish( - "status/imager", - f'{{"status":"Image {self.__img_done + 1}/{self.__img_goal} has been imaged to {filename}"}}', - ) - - # Increment the counter - self.__img_done += 1 - self.__error = 0 - - # If counter reach the number of frame, break - if self.__img_done >= self.__img_goal: - self.__img_done = 0 - - self.imager_client.client.publish("status/imager", '{"status":"Done"}') - - self.__imager.change(planktoscope.imager.state_machine.Stop) - planktoscope.light.ready() - else: - # We have not reached the final stage, let's keep imaging - self.imager_client.client.subscribe("status/pump") - - self.__pump_message() - - self.__imager.change(planktoscope.imager.state_machine.Waiting) - - def __capture_error(self, message=""): - logger.error(f"An error occurred during the capture: {message}") - planktoscope.light.error() - if self.__error: - logger.error("This is a repeating problem, stopping the capture now") - self.imager_client.client.publish( - "status/imager", - f'{{"status":"Image {self.__img_done + 1}/{self.__img_goal} WAS NOT CAPTURED! STOPPING THE PROCESS!"}}', - ) - self.__img_done = 0 - self.__img_goal = 0 - self.__error = 0 - self.__imager.change(planktoscope.imager.state_machine.Stop) - else: - self.__error += 1 - self.imager_client.client.publish( - "status/imager", - f'{{"status":"Image {self.__img_done + 1}/{self.__img_goal} was not captured due to this error:{message}! Retrying once!"}}', - ) - time.sleep(1) - - @logger.catch - def state_machine(self): - if self.__imager.state.name == "imaging": - self.__state_imaging() - return - - elif self.__imager.state.name == "capture": - self.__state_capture() - return - - elif self.__imager.state.name == ["waiting", "stop"]: - return - - ################################################################################ - # While loop for capturing commands from Node-RED - ################################################################################ - @logger.catch - def run(self): - """This is the function that needs to be started to create a thread""" - logger.info( - f"The imager control thread has been started in process {os.getpid()}" - ) - # MQTT Service connection - self.imager_client = planktoscope.mqtt.MQTT_Client( - topic="imager/#", name="imager_client" - ) - - self.imager_client.client.publish("status/imager", '{"status":"Starting up"}') - - if self.__camera.sensor_name == "IMX219": # Camera v2.1 - self.imager_client.client.publish( - "status/imager", '{"camera_name":"Camera v2.1"}' - ) - elif self.__camera.sensor_name == "IMX477": # Camera HQ - self.imager_client.client.publish( - "status/imager", '{"camera_name":"HQ Camera"}' - ) - else: - self.imager_client.client.publish( - "status/imager", '{"camera_name":"Not recognized"}' - ) - - logger.info("Starting the streaming server thread") - address = ("", 8000) - fps = 15 - refresh_delay = 1 / fps - handler = functools.partial( - planktoscope.imager.streamer.StreamingHandler, refresh_delay - ) - server = planktoscope.imager.streamer.StreamingServer(address, handler) - self.streaming_thread = threading.Thread( - target=server.serve_forever, daemon=True - ) - self.streaming_thread.start() - - # Publish the status "Ready" to via MQTT to Node-RED - self.imager_client.client.publish("status/imager", '{"status":"Ready"}') - - logger.success("Camera is READY!") - - # This is the main loop - while not self.stop_event.is_set(): - if self.imager_client.new_message_received(): - self.treat_message() - self.state_machine() - time.sleep(0.001) - - logger.info("Shutting down the imager process") - self.imager_client.client.publish("status/imager", '{"status":"Dead"}') - logger.debug("Stopping the raspimjpeg process") - self.__camera.close() - logger.debug("Stopping the streaming thread") - server.shutdown() - logger.debug("Stopping MQTT") - self.imager_client.shutdown() - # self.streaming_thread.kill() - logger.success("Imager process shut down! See you!") - - -# TODO This should be a test suite for this library -if __name__ == "__main__": - pass +"""imagernew provides high-level functionality for performing image acquisition.""" diff --git a/control/adafruithat/planktoscope/imagernew/mqtt.py b/control/adafruithat/planktoscope/imager/mqtt.py similarity index 100% rename from control/adafruithat/planktoscope/imagernew/mqtt.py rename to control/adafruithat/planktoscope/imager/mqtt.py diff --git a/control/adafruithat/planktoscope/imager/raspimjpeg.py b/control/adafruithat/planktoscope/imager/raspimjpeg.py deleted file mode 100644 index fe240eee..00000000 --- a/control/adafruithat/planktoscope/imager/raspimjpeg.py +++ /dev/null @@ -1,540 +0,0 @@ -# Copyright (C) 2021 Romain Bazile -# -# This file is part of the PlanktoScope software. -# -# PlanktoScope is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# PlanktoScope is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with PlanktoScope. If not, see . - -################################################################################ -# Practical Libraries -################################################################################ - -# Logger library compatible with multiprocessing -from loguru import logger - -# Library for path and filesystem manipulations -import os - -# Library to get date and time for folder name and filename -import datetime -import time - -# Library to control the RaspiMJPEG process -import subprocess # nosec - - -################################################################################ -# Class for the communication with RaspiMJPEG -################################################################################ -class raspimjpeg(object): - def __init__(self, *args, **kwargs): - self.__configfile = "/home/pi/PlanktoScope/scripts/raspimjpeg/raspimjpeg.conf" - # FIXME: we shouldn't be distributing the raspimjpeg binary as part of our Git repo sources; - # instead, the binary should be built as part of the SD card image build process, or else it - # should be built elsewhere and then the binary can be downloaded to /home/pi/.local/bin by - # the SD card image build scripts. - self.__binary = "/home/pi/PlanktoScope/scripts/raspimjpeg/bin/raspimjpeg" - self.__statusfile = "/dev/shm/mjpeg/status_mjpeg.txt" # nosec - self.__pipe = "/dev/shm/mjpeg/FIFO" # nosec - self.__sensor_name = "" - - # make sure the status file exists and is empty - if not os.path.exists(self.__statusfile): - logger.debug("The status file does not exists, creating now") - # create the path! - os.makedirs(os.path.dirname(self.__statusfile), exist_ok=True) - - # If the file does not exists, creates it - # otherwise make sure it's empty - with open(self.__statusfile, "w") as file: - file.write("") - - # make sure the pipe exists - if not os.path.exists(self.__pipe): - logger.debug("The pipe does not exists, creating now") - os.makedirs(os.path.dirname(self.__pipe), exist_ok=True) - os.mkfifo(self.__pipe) - - # make sure the config file exists - if not os.path.exists(self.__configfile): - logger.error("The config file does not exists!") - - def start(self, force=False): - logger.debug("Starting up raspimjpeg") - if force: - # let's kill all rogue Raspimjpeg first - try: - self.killall() - except Exception as e: - logger.exception(f"Killing Raspimjpeg failed because of {e}") - # The input to this call are perfectly controlled - # hence the nosec comment to deactivate bandit error - self.__process = subprocess.Popen( # nosec - [self.__binary, "-c", self.__configfile], - stdout=subprocess.PIPE, - bufsize=1, # means line buffered - text=True, - ) - # self.__process.stdout can be read as a file - - # This will set the reads on stdout to be non-blocking - os.set_blocking(self.__process.stdout.fileno(), False) - - try: - name_string = self.__parse_output_for("Camera Name") - self.__sensor_name = name_string.rsplit(" ", 1)[1].upper().rstrip() - except TimeoutError as e: - logger.exception( - f"A timeout happened while waiting for RaspiMJPEG to start: {e}" - ) - raise e - - try: - width_string = self.__parse_output_for("Camera Max Width:") - self.__width = width_string.rsplit(" ", 1)[1] - except TimeoutError as e: - logger.exception( - f"A timeout happened while waiting for RaspiMJPEG to start: {e}" - ) - raise e - - try: - height_string = self.__parse_output_for("Camera Max Height") - self.__height = height_string.rsplit(" ", 1)[1] - except TimeoutError as e: - logger.exception( - f"A timeout happened while waiting for RaspiMJPEG to start: {e}" - ) - raise e - - try: - self.__wait_for_output("Starting command loop") - except TimeoutError as e: - logger.exception( - f"A timeout happened while waiting for RaspiMJPEG to start: {e}" - ) - raise e - - def status(self): - return self.__get_status() - - def __parse_output_for(self, text, timeout=5): - """Blocking, waiting for specific output from process - - Continously poll the process stdout file object. - - Args: - text (string): String to wait for - timeout (int, optional): Timeout duration in seconds. Defaults to 5. - - Raises: - TimeoutError: A timeout happened before the required output showed up - """ - logger.debug(f"Parsing the output for {text} for {timeout}s") - wait_until = datetime.datetime.now() + datetime.timedelta(seconds=timeout) - - break_loop = False - while True: - for nextline in self.__process.stdout: - logger.trace(f"last read line is {nextline}") - if nextline.startswith(text): - return nextline - - if wait_until < datetime.datetime.now(): - # The timeout has been reached! - logger.error("A timeout has occured waiting for a RaspiMJPEG answer") - raise TimeoutError - - time.sleep(0.1) - - def __wait_for_output(self, output, timeout=5): - """Blocking, waiting for specific output from process - - Continously poll the process stdout file object. - - Args: - output (string): String to wait for - timeout (int, optional): Timeout duration in seconds. Defaults to 5. - - Raises: - TimeoutError: A timeout happened before the required output showed up - """ - logger.debug(f"Waiting for {output} for {timeout}s") - wait_until = datetime.datetime.now() + datetime.timedelta(seconds=timeout) - - break_loop = False - while True: - for nextline in self.__process.stdout: - logger.trace(f"last read line is {nextline}") - if nextline.startswith("Error:"): - logger.error(f"RaspiMJPEG error: {nextline}") - elif nextline.startswith(output): - return - - if wait_until < datetime.datetime.now(): - # The timeout has been reached! - logger.error("A timeout has occured waiting for a RaspiMJPEG answer") - raise TimeoutError - - time.sleep(0.1) - - def __get_status(self): - """Open and return the status file content - - Returns: - string: status of the process - """ - logger.trace("Getting the status file") - try: - with open(self.__statusfile, "r") as status: - status = status.read() - logger.trace(f"Read {status} from {self.__statusfile}") - return status - except FileNotFoundError as e: - logger.error( - "The status file was not found, make sure the filesystem has not been corrupted" - ) - return "" - - def __wait_for_status(self, status, timeout=5): - """Wait for a specific status. Blocking, obviously. - - Args: - status (string): The status to wait for - """ - logger.debug(f"Waiting for {status} for {timeout}s") - wait_until = datetime.datetime.now() + datetime.timedelta(seconds=timeout) - - message = self.__get_status() - - while True: - if message.startswith(status): - return - - if wait_until < datetime.datetime.now(): - # The timeout has been reached! - logger.error("A timeout has occured waiting for a RaspiMJPEG answer") - raise TimeoutError - - time.sleep(0.1) - logger.debug(f"not {status} yet") - message = self.__get_status() - - def __send_command(self, command): - """Sends a command to the RaspiMJPEG process - - Args: - command (string): the command string to send - """ - # TODO add check to make sure the pipe is open on the other side, otherwise this is blocking. - # Maybe just check that self.__process is still alive? :-) - logger.debug(f"Sending the command [{command}] to raspimjpeg") - with open(self.__pipe, "w") as pipe: - pipe.write(f"{command}\n") - - @property - def sensor_name(self): - """Sensor name of the connected camera - - Returns: - string: Sensor name. One of OV5647 (cam v1), IMX219 (cam v2.1), IMX477(ca HQ) - """ - return self.__sensor_name - - @property - def width(self): - return self.__width - - @property - def height(self): - return self.__height - - @property - def resolution(self): - return self.__resolution - - @resolution.setter - def resolution(self, resolution): - """Change the camera image resolution - - For a full FOV, allowable resolutions are: - - (3280,2464), (1640,1232), (1640,922) for Camera V2.1 - - (2028,1520), (4056,3040) for HQ Camera - - - Args: - resolution (tuple of int): resolution to set the camera to - """ - logger.debug(f"Setting the resolution to {resolution}") - if resolution in [ - (3280, 2464), - (1640, 1232), - (1640, 922), - (2028, 1520), - (4056, 3040), - ]: - self.__resolution = resolution - self.__send_command( - f"px 1640 1232 15 15 {self.__resolution[0]} {self.__resolution[1]} 01" - ) - else: - logger.error(f"The resolution specified ({resolution}) is not valid") - raise ValueError - - @property - def iso(self): - return self.__iso - - @iso.setter - def iso(self, iso): - """Change the camera iso number - - Iso number will be rounded to the closest one of - 0, 100, 200, 320, 400, 500, 640, 800. - If 0, Iso number will be chosen automatically by the camera - - Args: - iso (int): Iso number - """ - logger.debug(f"Setting the iso number to {iso}") - - if 0 <= iso <= 800: - self.__iso = iso - self.__send_command(f"is {self.__iso}") - self.__wait_for_output("Change: iso") - else: - logger.error(f"The ISO number specified ({iso}) is not valid") - raise ValueError - - @property - def shutter_speed(self): - return self.__shutter_speed - - @shutter_speed.setter - def shutter_speed(self, shutter_speed): - """Change the camera shutter speed - - Args: - shutter_speed (int): shutter speed in µs - """ - logger.debug(f"Setting the shutter speed to {shutter_speed}") - if 0 < shutter_speed < 5000: - self.__shutter_speed = shutter_speed - self.__send_command(f"ss {self.__shutter_speed}") - self.__wait_for_output("Change: shutter_speed") - else: - logger.error(f"The shutter speed specified ({shutter_speed}) is not valid") - raise ValueError - - @property - def exposure_mode(self): - return self.__exposure_mode - - @exposure_mode.setter - def exposure_mode(self, mode): - """Change the camera exposure mode - - Is one of off, auto, night, nightpreview, backlight, spotlight, - sports, snow, beach, verylong, fixedfps, antishake, fireworks - - Args: - mode (string): exposure mode to use - """ - logger.debug(f"Setting the exposure mode to {mode}") - if mode in [ - "off", - "auto", - "night", - "nightpreview", - "backlight", - "spotlight", - "sports", - "snow", - "beach", - "verylong", - "fixedfps", - "antishake", - "fireworks", - ]: - self.__exposure_mode = mode - self.__send_command(f"em {self.__exposure_mode}") - else: - logger.error(f"The exposure mode specified ({mode}) is not valid") - raise ValueError - - @property - def white_balance(self): - return self.__white_balance - - @white_balance.setter - def white_balance(self, mode): - """Change the camera white balance mode - - Is one of off, auto, sun, cloudy, shade, tungsten, - fluorescent, incandescent, flash, horizon - - Args: - mode (string): white balance mode to use - """ - logger.debug(f"Setting the white balance mode to {mode}") - if mode in [ - "off", - "auto", - "sun", - "cloudy", - "shade", - "tungsten", - "fluorescent", - "incandescent", - "flash", - "horizon", - ]: - self.__white_balance = mode - self.__send_command(f"wb {self.__white_balance}") - else: - logger.error( - f"The camera white balance mode specified ({mode}) is not valid" - ) - raise ValueError - - @property - def white_balance_gain(self): - return self.__white_balance_gain - - @white_balance_gain.setter - def white_balance_gain(self, gain): - """Change the camera white balance gain - - The gain value should be a int between 0 and 300. By default the camera - is set to use 150 both for the red and the blue gain. - - Args: - gain (tuple of int): Red gain and blue gain to use - """ - logger.debug(f"Setting the white balance mode to {gain}") - if (0 < gain[0] < 800) and (0 < gain[1] < 800): - self.__white_balance_gain = gain - self.__send_command( - f"ag {self.__white_balance_gain[0]} {self.__white_balance_gain[1]}" - ) - else: - logger.error( - f"The camera white balance gain specified ({gain}) is not valid" - ) - raise ValueError - - @property - def image_gain(self): - return self.__image_gain - - @image_gain.setter - def image_gain(self, gain): - """Change the camera image gain - - The analog gain value should be an int between 100 and 1200 for the analog gain and - between 100 and 6400 for the digital gain. - By default the camera is set to use 1.0 both for the analog and the digital gain. - - Args: - gain (tuple of int): Image gain to use - """ - logger.debug(f"Setting the analog gain to {gain}") - if (100 <= gain[0] <= 1200) and (100 <= gain[1] < 6400): - self.__image_gain = gain - self.__send_command(f"ig {self.__image_gain[0]} {self.__image_gain[1]}") - else: - logger.error(f"The camera image gain specified ({gain}) is not valid") - raise ValueError - - @property - def image_quality(self): - return self.__image_quality - - @image_quality.setter - def image_quality(self, image_quality): - """Change the output image quality - - Args: - image_quality (int): image quality [0,100] - """ - logger.debug(f"Setting image quality to {image_quality}") - if 0 <= image_quality <= 100: - self.__image_quality = image_quality - self.__send_command(f"ss {self.__image_quality}") - else: - logger.error( - f"The output image quality specified ({image_quality}) is not valid" - ) - raise ValueError - - @property - def preview_quality(self): - return self.__preview_quality - - @preview_quality.setter - def preview_quality(self, preview_quality): - """Change the preview image quality - - Args: - preview_quality (int): image quality [0,100] - """ - logger.debug(f"Setting preview quality to {preview_quality}") - if 0 <= preview_quality <= 100: - self.__preview_quality = preview_quality - self.__send_command(f"pv {self.__preview_quality} 512 01") - else: - logger.error( - f"The preview image quality specified ({preview_quality}) is not valid" - ) - raise ValueError - - def capture(self, path="", timeout=5): - """Capture an image. Blocks for timeout seconds(5 by default) until the image is captured. - - Args: - path (str, optional): Path to image file. Defaults to "". - timeout (int, optional): Timeout duration in seconds. Defaults to 5. - - Raises: - TimeoutError: A timeout happened before the required output showed up - """ - logger.debug(f"Capturing an image to {path}") - if path == "": - self.__send_command("im") - else: - self.__send_command(f"im {path}") - time.sleep(0.1) - - self.__wait_for_output("Capturing image", timeout / 2) - self.__wait_for_output("Ready", timeout / 2) - - def stop(self): - """Halt and release the camera.""" - logger.debug("Releasing the camera now") - self.__send_command("ru 0") - - def close(self): - """Kill the process.""" - logger.debug("Killing raspimjpeg in a nice way") - self.__process.terminate() - self.__process.wait() - - def kill(self): - """Kill the process.""" - logger.debug("Killing raspimjpeg in a very dirty way") - self.__process.terminate() - - def killall(self): - """Literally erases the raspimjpeg process(es)""" - logger.debug("Killing raspimjpeg in a very ugly dirty way") - subprocess.run("sudo killall -q -9 raspimjpeg".split(), timeout=1) # nosec diff --git a/control/adafruithat/planktoscope/imager/state_machine.py b/control/adafruithat/planktoscope/imager/state_machine.py deleted file mode 100644 index e958bde4..00000000 --- a/control/adafruithat/planktoscope/imager/state_machine.py +++ /dev/null @@ -1,88 +0,0 @@ -# Copyright (C) 2021 Romain Bazile -# -# This file is part of the PlanktoScope software. -# -# PlanktoScope is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# PlanktoScope is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with PlanktoScope. If not, see . - -# Logger library compatible with multiprocessing -from loguru import logger - -# TODO rewrite this in PlantUML -# This works with https://www.diagram.codes/d/state-machine -# "wait for pump" as pump -# "start imager" as imager -# "capture image" as capture -# -# START->stop["init"] -# imager->pump["start pumping"] -# pump->stop["stop"] -# stop->imager["start"] -# pump->capture["pumping is done"] -# capture->pump["start pump"] -# capture->stop["stop or done"] - - -# State machine class -class ImagerState(object): - name = "state" - allowed = [] - - def switch(self, state): - """Switch to new state""" - if state.name in self.allowed: - logger.info(f"Current:{self} => switched to new state {state.name}") - self.__class__ = state - else: - logger.error(f"Current:{self} => switching to {state.name} not possible.") - - def __str__(self): - return self.name - - -class Stop(ImagerState): - name = "stop" - allowed = ["imaging"] - - -class Imaging(ImagerState): - """State of getting ready to start""" - - name = "imaging" - allowed = ["waiting", "stop"] - - -class Waiting(ImagerState): - """State of waiting for the pump to finish""" - - name = "waiting" - allowed = ["stop", "capture"] - - -class Capture(ImagerState): - """State of capturing image""" - - name = "capture" - allowed = ["stop", "waiting"] - - -class Imager(object): - """A class representing the imager""" - - def __init__(self): - # State of the imager - default is stop. - self.state = Stop() - - def change(self, state): - """Change state""" - self.state.switch(state) diff --git a/control/adafruithat/planktoscope/imagernew/stopflow.py b/control/adafruithat/planktoscope/imager/stopflow.py similarity index 100% rename from control/adafruithat/planktoscope/imagernew/stopflow.py rename to control/adafruithat/planktoscope/imager/stopflow.py diff --git a/control/adafruithat/planktoscope/imager/streamer.py b/control/adafruithat/planktoscope/imager/streamer.py deleted file mode 100644 index 9361cfcb..00000000 --- a/control/adafruithat/planktoscope/imager/streamer.py +++ /dev/null @@ -1,77 +0,0 @@ -# Copyright (C) 2021 Romain Bazile -# -# This file is part of the PlanktoScope software. -# -# PlanktoScope is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# PlanktoScope is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with PlanktoScope. If not, see . - -from loguru import logger - -import time - -import socketserver -import http.server - - -################################################################################ -# Classes for the PiCamera Streaming -################################################################################ -class StreamingHandler(http.server.BaseHTTPRequestHandler): - def __init__(self, delay, *args, **kwargs): - self.delay = delay - super(StreamingHandler, self).__init__(*args, **kwargs) - - @logger.catch - def do_GET(self): - if self.path == "/": - self.send_response(301) - self.send_header("Location", "/stream.mjpg") - self.end_headers() - elif self.path == "/stream.mjpg": - self.send_response(200) - self.send_header("Age", 0) - self.send_header("Cache-Control", "no-cache, private") - self.send_header("Pragma", "no-cache") - self.send_header( - "Content-Type", "multipart/x-mixed-replace; boundary=FRAME" - ) - self.end_headers() - try: - while True: - try: - with open("/dev/shm/mjpeg/cam.jpg", "rb") as jpeg: # nosec - frame = jpeg.read() - except FileNotFoundError as e: - logger.error("Camera has not been started yet") - time.sleep(5) - except Exception as e: - logger.exception(f"An exception occured {e}") - else: - self.wfile.write(b"--FRAME\r\n") - self.send_header("Content-Type", "image/jpeg") - self.send_header("Content-Length", len(frame)) - self.end_headers() - self.wfile.write(frame) - self.wfile.write(b"\r\n") - time.sleep(self.delay) - - except BrokenPipeError as e: - logger.info(f"Removed streaming client {self.client_address}") - else: - self.send_error(404) - self.end_headers() - - -class StreamingServer(socketserver.ThreadingMixIn, http.server.HTTPServer): - allow_reuse_address = True - daemon_threads = True diff --git a/control/adafruithat/planktoscope/imagernew/__init__.py b/control/adafruithat/planktoscope/imagernew/__init__.py deleted file mode 100644 index 89756e2a..00000000 --- a/control/adafruithat/planktoscope/imagernew/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""imagernew provides high-level functionality for performing image acquisition.""" diff --git a/control/planktoscopehat/main.py b/control/planktoscopehat/main.py index 45a91fbe..bca92580 100644 --- a/control/planktoscopehat/main.py +++ b/control/planktoscopehat/main.py @@ -8,13 +8,13 @@ import planktoscope.mqtt import planktoscope.stepper -import planktoscope.light # Fan HAT LEDs +import planktoscope.light # Fan HAT LEDs import planktoscope.identity -import planktoscope.uuidName # Note: this is deprecated. -import planktoscope.display # Fan HAT OLED screen -from planktoscope.imagernew import mqtt as imagernew +import planktoscope.uuidName # Note: this is deprecated. +import planktoscope.display # Fan HAT OLED screen +from planktoscope.imager import mqtt as imager -# enqueue=True is necessary so we can log accross modules +# enqueue=True is necessary so we can log across modules # rotation happens everyday at 01:00 if not restarted logger.add( # sys.stdout, @@ -40,6 +40,7 @@ run = True # global variable to enable clean shutdown from stop signals + def handler_stop_signals(signum, frame): """This handler simply stop the forever running loop in __main__""" global run @@ -49,7 +50,9 @@ def handler_stop_signals(signum, frame): if __name__ == "__main__": logger.info("Welcome!") - logger.info("Initialising signals handling and sanitizing the directories (step 1/5)") + logger.info( + "Initialising signals handling and sanitizing the directories (step 1/5)" + ) signal.signal(signal.SIGINT, handler_stop_signals) signal.signal(signal.SIGTERM, handler_stop_signals) @@ -75,7 +78,9 @@ def handler_stop_signals(signum, frame): # create the path! os.makedirs(img_path) - logger.info(f"This PlanktoScope's Raspberry Pi's serial number is {planktoscope.uuidName.getSerial()}") + logger.info( + f"This PlanktoScope's Raspberry Pi's serial number is {planktoscope.uuidName.getSerial()}" + ) logger.info( f"This PlanktoScope's machine name is {planktoscope.identity.load_machine_name()}" ) @@ -83,7 +88,7 @@ def handler_stop_signals(signum, frame): f"This PlanktoScope's deprecated name is {planktoscope.uuidName.machineName(machine=planktoscope.uuidName.getSerial())}" ) - # Prepare the event for a gracefull shutdown + # Prepare the event for a graceful shutdown shutdown_event = multiprocessing.Event() shutdown_event.clear() @@ -96,7 +101,7 @@ def handler_stop_signals(signum, frame): # Starts the imager control process logger.info("Starting the imager control process (step 3/5)") try: - imager_thread = imagernew.Worker(shutdown_event) + imager_thread = imager.Worker(shutdown_event) except Exception as e: logger.error(f"The imager control process could not be started: {e}") imager_thread = None @@ -107,7 +112,7 @@ def handler_stop_signals(signum, frame): logger.info("Starting the light control process (step 4/5)") try: light_thread = planktoscope.light.LightProcess(shutdown_event) - except Exception as e: + except Exception: logger.error("The light control process could not be started") light_thread = None else: diff --git a/control/planktoscopehat/planktoscope/imager/__init__.py b/control/planktoscopehat/planktoscope/imager/__init__.py index 7674d131..89756e2a 100644 --- a/control/planktoscopehat/planktoscope/imager/__init__.py +++ b/control/planktoscopehat/planktoscope/imager/__init__.py @@ -1,731 +1 @@ -import datetime # needed to get date and time for folder name and filename -import time # needed to able to sleep for a given duration -import json -import os -import shutil -import multiprocessing -import threading # needed for the streaming server -import functools # needed for the streaming server - -from loguru import logger - -import planktoscope.mqtt -import planktoscope.imager.state_machine -import planktoscope.imager.raspimjpeg -import planktoscope.imager.streamer -import planktoscope.integrity -import planktoscope.identity - - -logger.info("planktoscope.imager is loaded") - -################################################################################ -# Main Imager class -################################################################################ -class ImagerProcess(multiprocessing.Process): - """This class contains the main definitions for the imager of the PlanktoScope""" - - def __init__(self, stop_event, iso=100, shutter_speed=1): - """Initialize the Imager class - - Args: - stop_event (multiprocessing.Event): shutdown event - iso (int, optional): ISO sensitivity. Defaults to 100. - shutter_speed (int, optional): Shutter speed of the camera. Defaults to 500. - """ - super(ImagerProcess, self).__init__(name="imager") - - logger.info("planktoscope.imager is initialising") - - if os.path.exists("/home/pi/PlanktoScope/hardware.json"): - # load hardware.json - with open("/home/pi/PlanktoScope/hardware.json", "r") as config_file: - configuration = json.load(config_file) - logger.debug(f"Hardware configuration loaded is {configuration}") - else: - logger.info( - "The hardware configuration file doesn't exists, using defaults" - ) - configuration = {} - - self.__camera_type = "v2.1" - - # parse the config data. If the key is absent, we are using the default value - self.__camera_type = configuration.get("camera_type", self.__camera_type) - - self.stop_event = stop_event - self.__imager = planktoscope.imager.state_machine.Imager() - self.__img_goal = 0 - self.__img_done = 0 - self.__sleep_before = None - self.__pump_volume = None - self.__pump_direction = "FORWARD" - self.__img_goal = None - self.imager_client = None - self.__error = 0 - - # Initialise the camera and the process - # Also starts the streaming to the temporary file - self.__camera = planktoscope.imager.raspimjpeg.raspimjpeg() - - try: - self.__camera.start() - except Exception as e: - logger.exception( - f"An exception has occured when starting up raspimjpeg: {e}" - ) - try: - self.__camera.start(True) - except Exception as e: - logger.exception( - f"A second exception has occured when starting up raspimjpeg: {e}" - ) - logger.error("This error can't be recovered from, terminating now") - raise e - - if self.__camera.sensor_name == "IMX219": # Camera v2.1 - self.__resolution = (3280, 2464) - elif self.__camera.sensor_name == "IMX477": # Camera HQ - self.__resolution = (4056, 3040) - else: - self.__resolution = (1280, 1024) - logger.error( - f"The connected camera {self.__camera.sensor_name} is not recognized, please check your camera" - ) - - self.__iso = iso - self.__shutter_speed = shutter_speed - self.__exposure_mode = "auto" - self.__white_balance = "off" - self.__white_balance_gain = ( - int(configuration.get("red_gain", 2.00) * 100), - int(configuration.get("blue_gain", 1.40) * 100), - ) - self.__image_gain = ( - int(configuration.get("analog_gain", 1.00) * 100), - int(configuration.get("digital_gain", 1.00) * 100), - ) - - self.__base_path = "/home/pi/data/img" - # Let's make sure the base path exists - if not os.path.exists(self.__base_path): - os.makedirs(self.__base_path) - - self.__export_path = "" - self.__global_metadata = None - - logger.info("Initialising the camera with the default settings") - try: - self.__camera.resolution = self.__resolution - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the resolution, trying again" - ) - self.__camera.resolution = self.__resolution - time.sleep(0.1) - - try: - self.__camera.iso = self.__iso - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the ISO number, trying again" - ) - self.__camera.iso = self.__iso - time.sleep(0.1) - - try: - self.__camera.shutter_speed = self.__shutter_speed - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the shutter speed, trying again" - ) - self.__camera.shutter_speed = self.__shutter_speed - time.sleep(0.1) - - try: - self.__camera.exposure_mode = self.__exposure_mode - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the exposure mode, trying again" - ) - self.__camera.exposure_mode = self.__exposure_mode - time.sleep(0.1) - - try: - self.__camera.white_balance = self.__white_balance - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the white balance mode, trying again" - ) - self.__camera.white_balance = self.__white_balance - time.sleep(0.1) - - try: - self.__camera.white_balance_gain = self.__white_balance_gain - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the white balance gain, trying again" - ) - self.__camera.white_balance_gain = self.__white_balance_gain - time.sleep(0.1) - - try: - self.__camera.image_gain = self.__image_gain - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the white balance gain, trying again" - ) - self.__camera.image_gain = self.__image_gain - - logger.success("planktoscope.imager is initialised and ready to go!") - - def __message_image(self, last_message): - """Actions for when we receive a message""" - if ( - "sleep" not in last_message - or "volume" not in last_message - or "nb_frame" not in last_message - or "pump_direction" not in last_message - ): - logger.error(f"The received message has the wrong argument {last_message}") - self.imager_client.client.publish("status/imager", '{"status":"Error"}') - return - self.__imager.change(planktoscope.imager.state_machine.Imaging) - - # Get duration to wait before an image from the different received arguments - self.__sleep_before = float(last_message["sleep"]) - - # Get volume in between two images from the different received arguments - self.__pump_volume = float(last_message["volume"]) - - # Get the pump direction message - self.__pump_direction = last_message["pump_direction"] - - # Get the number of frames to image from the different received arguments - self.__img_goal = int(last_message["nb_frame"]) - - # Reset the counter to 0 - self.__img_done = 0 - - self.imager_client.client.publish("status/imager", '{"status":"Started"}') - - def __message_stop(self): - self.imager_client.client.unsubscribe("status/pump") - - # Stops the pump - self.imager_client.client.publish("actuator/pump", '{"action": "stop"}') - - logger.info("The imaging has been interrupted.") - - # Publish the status "Interrupted" to via MQTT to Node-RED - self.imager_client.client.publish("status/imager", '{"status":"Interrupted"}') - - self.__imager.change(planktoscope.imager.state_machine.Stop) - - def __message_update(self, last_message): - if self.__imager.state.name == "stop": - if "config" not in last_message: - logger.error( - f"The received message has the wrong argument {last_message}" - ) - self.imager_client.client.publish( - "status/imager", '{"status":"Configuration message error"}' - ) - return - - logger.info("Updating the configuration now with the received data") - # Updating the configuration with the passed parameter in payload["config"] - self.__global_metadata = last_message["config"] - - # Publish the status "Config updated" to via MQTT to Node-RED - self.imager_client.client.publish( - "status/imager", '{"status":"Config updated"}' - ) - logger.info("Configuration has been updated") - else: - logger.error("We can't update the configuration while we are imaging.") - # Publish the status "Interrupted" to via MQTT to Node-RED - self.imager_client.client.publish("status/imager", '{"status":"Busy"}') - - def __message_settings(self, last_message): - # TODO simplify this method, move timeout error check inside self.__camera.resolution/iso/etc. - if self.__imager.state.name == "stop": - if "settings" not in last_message: - logger.error( - f"The received message has the wrong argument {last_message}" - ) - self.imager_client.client.publish( - "status/imager", '{"status":"Camera settings error"}' - ) - return - logger.info("Updating the camera settings now with the received data") - # Updating the configuration with the passed parameter in payload["config"] - settings = last_message["settings"] - if "resolution" in settings: - self.__resolution = settings.get("resolution", self.__resolution) - logger.debug(f"Updating the camera resolution to {self.__resolution}") - try: - self.__camera.resolution = self.__resolution - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the resolution, trying again" - ) - self.__camera.resolution = self.__resolution - except ValueError as e: - logger.error("The requested resolution is not valid!") - self.imager_client.client.publish( - "status/imager", '{"status":"Error: Resolution not valid"}' - ) - return - - if "iso" in settings: - self.__iso = settings.get("iso", self.__iso) - logger.debug(f"Updating the camera iso to {self.__iso}") - try: - self.__camera.iso = self.__iso - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the ISO number, trying again" - ) - self.__camera.iso = self.__iso - except ValueError as e: - logger.error("The requested ISO number is not valid!") - self.imager_client.client.publish( - "status/imager", '{"status":"Error: Iso number not valid"}' - ) - return - - if "shutter_speed" in settings: - self.__shutter_speed = settings.get( - "shutter_speed", self.__shutter_speed - ) - logger.debug( - f"Updating the camera shutter speed to {self.__shutter_speed}" - ) - try: - self.__camera.shutter_speed = self.__shutter_speed - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the shutter speed, trying again" - ) - self.__camera.shutter_speed = self.__shutter_speed - except ValueError as e: - logger.error("The requested shutter speed is not valid!") - self.imager_client.client.publish( - "status/imager", '{"status":"Error: Shutter speed not valid"}' - ) - return - - if "white_balance_gain" in settings: - if "red" in settings["white_balance_gain"]: - logger.debug( - f"Updating the camera white balance red gain to {settings['white_balance_gain']}" - ) - self.__white_balance_gain = ( - settings["white_balance_gain"].get( - "red", self.__white_balance_gain[0] - ), - self.__white_balance_gain[1], - ) - if "blue" in settings["white_balance_gain"]: - logger.debug( - f"Updating the camera white balance blue gain to {settings['white_balance_gain']}" - ) - self.__white_balance_gain = ( - self.__white_balance_gain[0], - settings["white_balance_gain"].get( - "blue", self.__white_balance_gain[1] - ), - ) - logger.debug( - f"Updating the camera white balance gain to {self.__white_balance_gain}" - ) - try: - self.__camera.white_balance_gain = self.__white_balance_gain - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the white balance gain, trying again" - ) - self.__camera.white_balance_gain = self.__white_balance_gain - except ValueError as e: - logger.error("The requested white balance gain is not valid!") - self.imager_client.client.publish( - "status/imager", - '{"status":"Error: White balance gain not valid"}', - ) - return - - if "white_balance" in settings: - logger.debug( - f"Updating the camera white balance mode to {settings['white_balance']}" - ) - self.__white_balance = settings.get( - "white_balance", self.__white_balance - ) - logger.debug( - f"Updating the camera white balance mode to {self.__white_balance}" - ) - try: - self.__camera.white_balance = self.__white_balance - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the white balance, trying again" - ) - self.__camera.white_balance = self.__white_balance - except ValueError as e: - logger.error("The requested white balance is not valid!") - self.imager_client.client.publish( - "status/imager", - f'{"status":"Error: White balance mode {self.__white_balance} is not valid"}', - ) - return - - if "image_gain" in settings: - if "analog" in settings["image_gain"]: - logger.debug( - f"Updating the camera image analog gain to {settings['image_gain']}" - ) - self.__image_gain = ( - settings["image_gain"].get("analog", self.__image_gain[0]), - self.__image_gain[1], - ) - if "digital" in settings["image_gain"]: - logger.debug( - f"Updating the camera image digital gain to {settings['image_gain']}" - ) - self.__image_gain = ( - self.__image_gain[0], - settings["image_gain"].get("digital", self.__image_gain[1]), - ) - logger.debug(f"Updating the camera image gain to {self.__image_gain}") - try: - self.__camera.image_gain = self.__image_gain - except TimeoutError as e: - logger.error( - "A timeout has occured when setting the white balance gain, trying again" - ) - self.__camera.image_gain = self.__image_gain - except ValueError as e: - logger.error("The requested image gain is not valid!") - self.imager_client.client.publish( - "status/imager", - '{"status":"Error: Image gain not valid"}', - ) - return - # Publish the status "Config updated" to via MQTT to Node-RED - self.imager_client.client.publish( - "status/imager", '{"status":"Camera settings updated"}' - ) - logger.info("Camera settings have been updated") - else: - logger.error("We can't update the camera settings while we are imaging.") - # Publish the status "Interrupted" to via MQTT to Node-RED - self.imager_client.client.publish("status/imager", '{"status":"Busy"}') - - @logger.catch - def treat_message(self): - action = "" - logger.info("We received a new message") - if self.imager_client.msg["topic"].startswith("imager/"): - last_message = self.imager_client.msg["payload"] - logger.debug(last_message) - action = self.imager_client.msg["payload"]["action"] - logger.debug(action) - elif self.imager_client.msg["topic"] == "status/pump": - logger.debug( - f"Status message payload is {self.imager_client.msg['payload']}" - ) - if self.__imager.state.name == "waiting": - if self.imager_client.msg["payload"]["status"] == "Done": - self.__imager.change(planktoscope.imager.state_machine.Capture) - self.imager_client.client.unsubscribe("status/pump") - else: - logger.info( - f"The pump is not done yet {self.imager_client.msg['payload']}" - ) - else: - logger.error( - "There is an error, we received an unexpected pump message" - ) - else: - logger.error( - f"The received message was not for us! Topic was {self.imager_client.msg['topic']}" - ) - self.imager_client.read_message() - - # If the command is "image" - if action == "image": - # {"action":"image","sleep":5,"volume":1,"nb_frame":200} - self.__message_image(last_message) - - elif action == "stop": - self.__message_stop() - - elif action == "update_config": - self.__message_update(last_message) - - elif action == "settings": - self.__message_settings(last_message) - - elif action not in ["image", "stop", "update_config", "settings", ""]: - logger.warning( - f"We did not understand the received request {action} - {last_message}" - ) - - def __pump_message(self): - """Sends a message to the pump process""" - - # Pump during a given volume - self.imager_client.client.publish( - "actuator/pump", - json.dumps( - { - "action": "move", - "direction": self.__pump_direction, - "volume": self.__pump_volume, - "flowrate": 2, - } - ), - ) - - def __state_imaging(self): - # subscribe to status/pump - self.imager_client.client.subscribe("status/pump") - - # Definition of the few important metadata - local_metadata = { - "acq_local_datetime": datetime.datetime.now().isoformat().split(".")[0], - "acq_camera_resolution": f"{self.__resolution[0]}x{self.__resolution[1]}", - "acq_camera_iso": self.__iso, - "acq_camera_shutter_speed": self.__shutter_speed, - "acq_uuid": planktoscope.identity.load_machine_name(), - "sample_uuid": planktoscope.identity.load_machine_name(), - } - - # Concat the local metadata and the metadata from Node-RED - self.__global_metadata = {**self.__global_metadata, **local_metadata} - - if "object_date" not in self.__global_metadata: - # If this path exists, then ids are reused when they should not - logger.error("The metadata did not contain object_date!") - self.imager_client.client.publish( - "status/imager", - '{"status":"Configuration update error: object_date is missing!"}', - ) - # Reset the counter to 0 - self.__img_done = 0 - # Change state towards stop - self.__imager.change(planktoscope.imager.state_machine.Stop) - return - - logger.info("Setting up the directory structure for storing the pictures") - self.__export_path = os.path.join( - self.__base_path, - self.__global_metadata["object_date"], - str(self.__global_metadata["sample_id"]).replace(" ", "_").strip("'"), - str(self.__global_metadata["acq_id"]).replace(" ", "_").strip("'"), - ) - - if os.path.exists(self.__export_path): - # If this path exists, then ids are reused when they should not - logger.error(f"The export path at {self.__export_path} already exists") - self.imager_client.client.publish( - "status/imager", - '{"status":"Configuration update error: Chosen id are already in use!"}', - ) - # Reset the counter to 0 - self.__img_done = 0 - self.__imager.change(planktoscope.imager.state_machine.Stop) - return - else: - # create the path! - os.makedirs(self.__export_path) - - # Export the metadata to a json file - logger.info("Exporting the metadata to a metadata.json") - metadata_filepath = os.path.join(self.__export_path, "metadata.json") - with open(metadata_filepath, "w") as metadata_file: - json.dump(self.__global_metadata, metadata_file, indent=4) - logger.debug( - f"Metadata dumped in {metadata_file} are {self.__global_metadata}" - ) - - # Create the integrity file in this export path - try: - planktoscope.integrity.create_integrity_file(self.__export_path) - except FileExistsError as e: - logger.info( - f"The integrity file already exists in this export path {self.__export_path}" - ) - # Add the metadata.json file to the integrity file - try: - planktoscope.integrity.append_to_integrity_file(metadata_filepath) - except FileNotFoundError as e: - logger.error( - f"{metadata_filepath} was not found, the metadata.json may not have been created properly!" - ) - - self.__pump_message() - - self.__imager.change(planktoscope.imager.state_machine.Waiting) - - def __state_capture(self): - filename = f"{datetime.datetime.now().strftime('%H_%M_%S_%f')}.jpg" - - # Define the filename of the image - filename_path = os.path.join(self.__export_path, filename) - - logger.info( - f"Capturing image {self.__img_done + 1}/{self.__img_goal} to {filename_path}" - ) - - # Sleep a duration before to start acquisition - time.sleep(self.__sleep_before) - - # Capture an image to the temporary file - try: - self.__camera.capture("", timeout=5) - except TimeoutError as e: - self.__capture_error("timeout during capture") - return - - logger.debug(f"Copying the image from the temp file to {filename_path}") - shutil.copy("/dev/shm/mjpeg/image.jpg", filename_path) # nosec - # TODO Try to stop the camera streaming and display instead each captured image - # os.rename("/dev/shm/mjpeg/image.jpg", "/dev/shm/mjpeg/cam.jpg") - logger.debug("Syncing the disk") - os.sync() - - # Add the checksum of the captured image to the integrity file - try: - planktoscope.integrity.append_to_integrity_file(filename_path) - except FileNotFoundError as e: - self.__capture_error(f"{filename_path} was not found") - return - - self.imager_client.client.publish( - "status/imager", - f'{{"status":"Image {self.__img_done + 1}/{self.__img_goal} has been imaged to {filename}"}}', - ) - - # Increment the counter - self.__img_done += 1 - self.__error = 0 - - # If counter reach the number of frame, break - if self.__img_done >= self.__img_goal: - self.__img_done = 0 - - self.imager_client.client.publish("status/imager", '{"status":"Done"}') - - self.__imager.change(planktoscope.imager.state_machine.Stop) - else: - # We have not reached the final stage, let's keep imaging - self.imager_client.client.subscribe("status/pump") - - self.__pump_message() - - self.__imager.change(planktoscope.imager.state_machine.Waiting) - - def __capture_error(self, message=""): - logger.error(f"An error occurred during the capture: {message}") - if self.__error: - logger.error("This is a repeating problem, stopping the capture now") - self.imager_client.client.publish( - "status/imager", - f'{{"status":"Image {self.__img_done + 1}/{self.__img_goal} WAS NOT CAPTURED! STOPPING THE PROCESS!"}}', - ) - self.__img_done = 0 - self.__img_goal = 0 - self.__error = 0 - self.__imager.change(planktoscope.imager.state_machine.Stop) - else: - self.__error += 1 - self.imager_client.client.publish( - "status/imager", - f'{{"status":"Image {self.__img_done + 1}/{self.__img_goal} was not captured due to this error:{message}! Retrying once!"}}', - ) - time.sleep(1) - - @logger.catch - def state_machine(self): - if self.__imager.state.name == "imaging": - self.__state_imaging() - return - - elif self.__imager.state.name == "capture": - self.__state_capture() - return - - elif self.__imager.state.name == ["waiting", "stop"]: - return - - ################################################################################ - # While loop for capturing commands from Node-RED - ################################################################################ - @logger.catch - def run(self): - """This is the function that needs to be started to create a thread""" - logger.info( - f"The imager control thread has been started in process {os.getpid()}" - ) - # MQTT Service connection - self.imager_client = planktoscope.mqtt.MQTT_Client( - topic="imager/#", name="imager_client" - ) - - self.imager_client.client.publish("status/imager", '{"status":"Starting up"}') - - if self.__camera.sensor_name == "IMX219": # Camera v2.1 - self.imager_client.client.publish( - "status/imager", '{"camera_name":"Camera v2.1"}' - ) - elif self.__camera.sensor_name == "IMX477": # Camera HQ - self.imager_client.client.publish( - "status/imager", '{"camera_name":"HQ Camera"}' - ) - else: - self.imager_client.client.publish( - "status/imager", '{"camera_name":"Not recognized"}' - ) - - logger.info("Starting the streaming server thread") - address = ("", 8000) - fps = 15 - refresh_delay = 1 / fps - handler = functools.partial( - planktoscope.imager.streamer.StreamingHandler, refresh_delay - ) - server = planktoscope.imager.streamer.StreamingServer(address, handler) - self.streaming_thread = threading.Thread( - target=server.serve_forever, daemon=True - ) - self.streaming_thread.start() - - # Publish the status "Ready" to via MQTT to Node-RED - self.imager_client.client.publish("status/imager", '{"status":"Ready"}') - - logger.success("Camera is READY!") - - # This is the main loop - while not self.stop_event.is_set(): - if self.imager_client.new_message_received(): - self.treat_message() - self.state_machine() - time.sleep(0.001) - - logger.info("Shutting down the imager process") - self.imager_client.client.publish("status/imager", '{"status":"Dead"}') - logger.debug("Stopping the raspimjpeg process") - self.__camera.close() - logger.debug("Stopping the streaming thread") - server.shutdown() - logger.debug("Stopping MQTT") - self.imager_client.shutdown() - # self.streaming_thread.kill() - logger.success("Imager process shut down! See you!") - - -# TODO This should be a test suite for this library -if __name__ == "__main__": - pass +"""imagernew provides high-level functionality for performing image acquisition.""" diff --git a/control/planktoscopehat/planktoscope/imagernew/mqtt.py b/control/planktoscopehat/planktoscope/imager/mqtt.py similarity index 100% rename from control/planktoscopehat/planktoscope/imagernew/mqtt.py rename to control/planktoscopehat/planktoscope/imager/mqtt.py diff --git a/control/planktoscopehat/planktoscope/imager/raspimjpeg.py b/control/planktoscopehat/planktoscope/imager/raspimjpeg.py deleted file mode 100644 index 5acbf4f8..00000000 --- a/control/planktoscopehat/planktoscope/imager/raspimjpeg.py +++ /dev/null @@ -1,519 +0,0 @@ -################################################################################ -# Practical Libraries -################################################################################ - -# Logger library compatible with multiprocessing -from loguru import logger - -# Library for path and filesystem manipulations -import os - -# Library to get date and time for folder name and filename -import datetime -import time - -# Library to control the RaspiMJPEG process -import subprocess # nosec - - -################################################################################ -# Class for the communication with RaspiMJPEG -################################################################################ -class raspimjpeg(object): - def __init__(self, *args, **kwargs): - self.__configfile = "/home/pi/PlanktoScope/scripts/raspimjpeg/raspimjpeg.conf" - self.__binary = "/home/pi/PlanktoScope/scripts/raspimjpeg/bin/raspimjpeg" - self.__statusfile = "/dev/shm/mjpeg/status_mjpeg.txt" # nosec - self.__pipe = "/dev/shm/mjpeg/FIFO" # nosec - self.__sensor_name = "" - - # make sure the status file exists and is empty - if not os.path.exists(self.__statusfile): - logger.debug("The status file does not exists, creating now") - # create the path! - os.makedirs(os.path.dirname(self.__statusfile), exist_ok=True) - - # If the file does not exists, creates it - # otherwise make sure it's empty - with open(self.__statusfile, "w") as file: - file.write("") - - # make sure the pipe exists - if not os.path.exists(self.__pipe): - logger.debug("The pipe does not exists, creating now") - os.makedirs(os.path.dirname(self.__pipe), exist_ok=True) - os.mkfifo(self.__pipe) - - # make sure the config file exists - if not os.path.exists(self.__configfile): - logger.error("The config file does not exists!") - - def start(self, force=False): - logger.debug("Starting up raspimjpeg") - if force: - # let's kill all rogue Raspimjpeg first - try: - self.killall() - except Exception as e: - logger.exception(f"Killing Raspimjpeg failed because of {e}") - # The input to this call are perfectly controlled - # hence the nosec comment to deactivate bandit error - self.__process = subprocess.Popen( # nosec - [self.__binary, "-c", self.__configfile], - stdout=subprocess.PIPE, - bufsize=1, # means line buffered - text=True, - ) - # self.__process.stdout can be read as a file - - # This will set the reads on stdout to be non-blocking - os.set_blocking(self.__process.stdout.fileno(), False) - - try: - name_string = self.__parse_output_for("Camera Name") - self.__sensor_name = name_string.rsplit(" ", 1)[1].upper().rstrip() - except TimeoutError as e: - logger.exception( - f"A timeout happened while waiting for RaspiMJPEG to start: {e}" - ) - raise e - - try: - width_string = self.__parse_output_for("Camera Max Width:") - self.__width = width_string.rsplit(" ", 1)[1] - except TimeoutError as e: - logger.exception( - f"A timeout happened while waiting for RaspiMJPEG to start: {e}" - ) - raise e - - try: - height_string = self.__parse_output_for("Camera Max Height") - self.__height = height_string.rsplit(" ", 1)[1] - except TimeoutError as e: - logger.exception( - f"A timeout happened while waiting for RaspiMJPEG to start: {e}" - ) - raise e - - try: - self.__wait_for_output("Starting command loop") - except TimeoutError as e: - logger.exception( - f"A timeout happened while waiting for RaspiMJPEG to start: {e}" - ) - raise e - - def status(self): - return self.__get_status() - - def __parse_output_for(self, text, timeout=5): - """Blocking, waiting for specific output from process - - Continously poll the process stdout file object. - - Args: - text (string): String to wait for - timeout (int, optional): Timeout duration in seconds. Defaults to 5. - - Raises: - TimeoutError: A timeout happened before the required output showed up - """ - logger.debug(f"Parsing the output for {text} for {timeout}s") - wait_until = datetime.datetime.now() + datetime.timedelta(seconds=timeout) - - break_loop = False - while True: - for nextline in self.__process.stdout: - logger.trace(f"last read line is {nextline}") - if nextline.startswith(text): - return nextline - - if wait_until < datetime.datetime.now(): - # The timeout has been reached! - logger.error("A timeout has occured waiting for a RaspiMJPEG answer") - raise TimeoutError - - time.sleep(0.1) - - def __wait_for_output(self, output, timeout=5): - """Blocking, waiting for specific output from process - - Continously poll the process stdout file object. - - Args: - output (string): String to wait for - timeout (int, optional): Timeout duration in seconds. Defaults to 5. - - Raises: - TimeoutError: A timeout happened before the required output showed up - """ - logger.debug(f"Waiting for {output} for {timeout}s") - wait_until = datetime.datetime.now() + datetime.timedelta(seconds=timeout) - - break_loop = False - while True: - for nextline in self.__process.stdout: - logger.trace(f"last read line is {nextline}") - if nextline.startswith("Error:"): - logger.error(f"RaspiMJPEG error: {nextline}") - elif nextline.startswith(output): - return - - if wait_until < datetime.datetime.now(): - # The timeout has been reached! - logger.error("A timeout has occured waiting for a RaspiMJPEG answer") - raise TimeoutError - - time.sleep(0.1) - - def __get_status(self): - """Open and return the status file content - - Returns: - string: status of the process - """ - logger.trace("Getting the status file") - try: - with open(self.__statusfile, "r") as status: - status = status.read() - logger.trace(f"Read {status} from {self.__statusfile}") - return status - except FileNotFoundError as e: - logger.error( - "The status file was not found, make sure the filesystem has not been corrupted" - ) - return "" - - def __wait_for_status(self, status, timeout=5): - """Wait for a specific status. Blocking, obviously. - - Args: - status (string): The status to wait for - """ - logger.debug(f"Waiting for {status} for {timeout}s") - wait_until = datetime.datetime.now() + datetime.timedelta(seconds=timeout) - - message = self.__get_status() - - while True: - if message.startswith(status): - return - - if wait_until < datetime.datetime.now(): - # The timeout has been reached! - logger.error("A timeout has occured waiting for a RaspiMJPEG answer") - raise TimeoutError - - time.sleep(0.1) - logger.debug(f"not {status} yet") - message = self.__get_status() - - def __send_command(self, command): - """Sends a command to the RaspiMJPEG process - - Args: - command (string): the command string to send - """ - # TODO add check to make sure the pipe is open on the other side, otherwise this is blocking. - # Maybe just check that self.__process is still alive? :-) - logger.debug(f"Sending the command [{command}] to raspimjpeg") - with open(self.__pipe, "w") as pipe: - pipe.write(f"{command}\n") - - @property - def sensor_name(self): - """Sensor name of the connected camera - - Returns: - string: Sensor name. One of OV5647 (cam v1), IMX219 (cam v2.1), IMX477(ca HQ) - """ - return self.__sensor_name - - @property - def width(self): - return self.__width - - @property - def height(self): - return self.__height - - @property - def resolution(self): - return self.__resolution - - @resolution.setter - def resolution(self, resolution): - """Change the camera image resolution - - For a full FOV, allowable resolutions are: - - (3280,2464), (1640,1232), (1640,922) for Camera V2.1 - - (2028,1520), (4056,3040) for HQ Camera - - - Args: - resolution (tuple of int): resolution to set the camera to - """ - logger.debug(f"Setting the resolution to {resolution}") - if resolution in [ - (3280, 2464), - (1640, 1232), - (1640, 922), - (2028, 1520), - (4056, 3040), - ]: - self.__resolution = resolution - self.__send_command( - f"px 1640 1232 15 15 {self.__resolution[0]} {self.__resolution[1]} 01" - ) - else: - logger.error(f"The resolution specified ({resolution}) is not valid") - raise ValueError - - @property - def iso(self): - return self.__iso - - @iso.setter - def iso(self, iso): - """Change the camera iso number - - Iso number will be rounded to the closest one of - 0, 100, 200, 320, 400, 500, 640, 800. - If 0, Iso number will be chosen automatically by the camera - - Args: - iso (int): Iso number - """ - logger.debug(f"Setting the iso number to {iso}") - - if 0 <= iso <= 800: - self.__iso = iso - self.__send_command(f"is {self.__iso}") - self.__wait_for_output("Change: iso") - else: - logger.error(f"The ISO number specified ({iso}) is not valid") - raise ValueError - - @property - def shutter_speed(self): - return self.__shutter_speed - - @shutter_speed.setter - def shutter_speed(self, shutter_speed): - """Change the camera shutter speed - - Args: - shutter_speed (int): shutter speed in µs - """ - logger.debug(f"Setting the shutter speed to {shutter_speed}") - if 0 < shutter_speed < 5000: - self.__shutter_speed = shutter_speed - self.__send_command(f"ss {self.__shutter_speed}") - self.__wait_for_output("Change: shutter_speed") - else: - logger.error(f"The shutter speed specified ({shutter_speed}) is not valid") - raise ValueError - - @property - def exposure_mode(self): - return self.__exposure_mode - - @exposure_mode.setter - def exposure_mode(self, mode): - """Change the camera exposure mode - - Is one of off, auto, night, nightpreview, backlight, spotlight, - sports, snow, beach, verylong, fixedfps, antishake, fireworks - - Args: - mode (string): exposure mode to use - """ - logger.debug(f"Setting the exposure mode to {mode}") - if mode in [ - "off", - "auto", - "night", - "nightpreview", - "backlight", - "spotlight", - "sports", - "snow", - "beach", - "verylong", - "fixedfps", - "antishake", - "fireworks", - ]: - self.__exposure_mode = mode - self.__send_command(f"em {self.__exposure_mode}") - else: - logger.error(f"The exposure mode specified ({mode}) is not valid") - raise ValueError - - @property - def white_balance(self): - return self.__white_balance - - @white_balance.setter - def white_balance(self, mode): - """Change the camera white balance mode - - Is one of off, auto, sun, cloudy, shade, tungsten, - fluorescent, incandescent, flash, horizon - - Args: - mode (string): white balance mode to use - """ - logger.debug(f"Setting the white balance mode to {mode}") - if mode in [ - "off", - "auto", - "sun", - "cloudy", - "shade", - "tungsten", - "fluorescent", - "incandescent", - "flash", - "horizon", - ]: - self.__white_balance = mode - self.__send_command(f"wb {self.__white_balance}") - else: - logger.error( - f"The camera white balance mode specified ({mode}) is not valid" - ) - raise ValueError - - @property - def white_balance_gain(self): - return self.__white_balance_gain - - @white_balance_gain.setter - def white_balance_gain(self, gain): - """Change the camera white balance gain - - The gain value should be a int between 0 and 300. By default the camera - is set to use 150 both for the red and the blue gain. - - Args: - gain (tuple of int): Red gain and blue gain to use - """ - logger.debug(f"Setting the white balance mode to {gain}") - if (0 < gain[0] < 800) and (0 < gain[1] < 800): - self.__white_balance_gain = gain - self.__send_command( - f"ag {self.__white_balance_gain[0]} {self.__white_balance_gain[1]}" - ) - else: - logger.error( - f"The camera white balance gain specified ({gain}) is not valid" - ) - raise ValueError - - @property - def image_gain(self): - return self.__image_gain - - @image_gain.setter - def image_gain(self, gain): - """Change the camera image gain - - The analog gain value should be an int between 100 and 1200 for the analog gain and - between 100 and 6400 for the digital gain. - By default the camera is set to use 1.0 both for the analog and the digital gain. - - Args: - gain (tuple of int): Image gain to use - """ - logger.debug(f"Setting the analog gain to {gain}") - if (100 <= gain[0] <= 1200) and (100 <= gain[1] < 6400): - self.__image_gain = gain - self.__send_command(f"ig {self.__image_gain[0]} {self.__image_gain[1]}") - else: - logger.error(f"The camera image gain specified ({gain}) is not valid") - raise ValueError - - @property - def image_quality(self): - return self.__image_quality - - @image_quality.setter - def image_quality(self, image_quality): - """Change the output image quality - - Args: - image_quality (int): image quality [0,100] - """ - logger.debug(f"Setting image quality to {image_quality}") - if 0 <= image_quality <= 100: - self.__image_quality = image_quality - self.__send_command(f"ss {self.__image_quality}") - else: - logger.error( - f"The output image quality specified ({image_quality}) is not valid" - ) - raise ValueError - - @property - def preview_quality(self): - return self.__preview_quality - - @preview_quality.setter - def preview_quality(self, preview_quality): - """Change the preview image quality - - Args: - preview_quality (int): image quality [0,100] - """ - logger.debug(f"Setting preview quality to {preview_quality}") - if 0 <= preview_quality <= 100: - self.__preview_quality = preview_quality - self.__send_command(f"pv {self.__preview_quality} 512 01") - else: - logger.error( - f"The preview image quality specified ({preview_quality}) is not valid" - ) - raise ValueError - - def capture(self, path="", timeout=5): - """Capture an image. Blocks for timeout seconds(5 by default) until the image is captured. - - Args: - path (str, optional): Path to image file. Defaults to "". - timeout (int, optional): Timeout duration in seconds. Defaults to 5. - - Raises: - TimeoutError: A timeout happened before the required output showed up - """ - logger.debug(f"Capturing an image to {path}") - if path == "": - self.__send_command("im") - else: - self.__send_command(f"im {path}") - time.sleep(0.1) - - self.__wait_for_output("Capturing image", timeout / 2) - self.__wait_for_output("Ready", timeout / 2) - - def stop(self): - """Halt and release the camera.""" - logger.debug("Releasing the camera now") - self.__send_command("ru 0") - - def close(self): - """Kill the process.""" - logger.debug("Killing raspimjpeg in a nice way") - self.__process.terminate() - self.__process.wait() - - def kill(self): - """Kill the process.""" - logger.debug("Killing raspimjpeg in a very dirty way") - self.__process.terminate() - - def killall(self): - """Literally erases the raspimjpeg process(es)""" - logger.debug("Killing raspimjpeg in a very ugly dirty way") - subprocess.run("sudo killall -q -9 raspimjpeg".split(), timeout=1) # nosec diff --git a/control/planktoscopehat/planktoscope/imager/state_machine.py b/control/planktoscopehat/planktoscope/imager/state_machine.py deleted file mode 100644 index 483208e1..00000000 --- a/control/planktoscopehat/planktoscope/imager/state_machine.py +++ /dev/null @@ -1,71 +0,0 @@ -# Logger library compatible with multiprocessing -from loguru import logger - -# TODO rewrite this in PlantUML -# This works with https://www.diagram.codes/d/state-machine -# "wait for pump" as pump -# "start imager" as imager -# "capture image" as capture -# -# START->stop["init"] -# imager->pump["start pumping"] -# pump->stop["stop"] -# stop->imager["start"] -# pump->capture["pumping is done"] -# capture->pump["start pump"] -# capture->stop["stop or done"] - - -# State machine class -class ImagerState(object): - name = "state" - allowed = [] - - def switch(self, state): - """Switch to new state""" - if state.name in self.allowed: - logger.info(f"Current:{self} => switched to new state {state.name}") - self.__class__ = state - else: - logger.error(f"Current:{self} => switching to {state.name} not possible.") - - def __str__(self): - return self.name - - -class Stop(ImagerState): - name = "stop" - allowed = ["imaging"] - - -class Imaging(ImagerState): - """State of getting ready to start""" - - name = "imaging" - allowed = ["waiting", "stop"] - - -class Waiting(ImagerState): - """State of waiting for the pump to finish""" - - name = "waiting" - allowed = ["stop", "capture"] - - -class Capture(ImagerState): - """State of capturing image""" - - name = "capture" - allowed = ["stop", "waiting"] - - -class Imager(object): - """A class representing the imager""" - - def __init__(self): - # State of the imager - default is stop. - self.state = Stop() - - def change(self, state): - """Change state""" - self.state.switch(state) diff --git a/control/planktoscopehat/planktoscope/imagernew/stopflow.py b/control/planktoscopehat/planktoscope/imager/stopflow.py similarity index 100% rename from control/planktoscopehat/planktoscope/imagernew/stopflow.py rename to control/planktoscopehat/planktoscope/imager/stopflow.py diff --git a/control/planktoscopehat/planktoscope/imager/streamer.py b/control/planktoscopehat/planktoscope/imager/streamer.py deleted file mode 100644 index de00b464..00000000 --- a/control/planktoscopehat/planktoscope/imager/streamer.py +++ /dev/null @@ -1,79 +0,0 @@ -from loguru import logger - -import time - -import socketserver -import http.server - -PAGE = """\ - - - raspimjpeg streaming demo - - -

Raspimjpeg Streaming Demo

- - - -""" - -################################################################################ -# Classes for the PiCamera Streaming -################################################################################ -class StreamingHandler(http.server.BaseHTTPRequestHandler): - def __init__(self, delay, *args, **kwargs): - self.delay = delay - super(StreamingHandler, self).__init__(*args, **kwargs) - - @logger.catch - def do_GET(self): - if self.path == "/": - self.send_response(301) - self.send_header("Location", "/index.html") #stream.mjpg - self.end_headers() - elif self.path == '/index.html': - content = PAGE.encode('utf-8') - self.send_response(200) - self.send_header('Content-Type', 'text/html') - self.send_header('Content-Length', len(content)) - self.end_headers() - self.wfile.write(content) - elif self.path == "/stream.mjpg": - self.send_response(200) - self.send_header("Age", 0) - self.send_header("Cache-Control", "no-cache, private") - self.send_header("Pragma", "no-cache") - self.send_header( - "Content-Type", "multipart/x-mixed-replace; boundary=FRAME" - ) - self.send_header('Access-Control-Allow-Origin', '*') - self.end_headers() - try: - while True: - try: - with open("/dev/shm/mjpeg/cam.jpg", "rb") as jpeg: # nosec - frame = jpeg.read() - except FileNotFoundError as e: - logger.error("Camera has not been started yet") - time.sleep(5) - except Exception as e: - logger.exception(f"An exception occured {e}") - else: - self.wfile.write(b"--FRAME\r\n") - self.send_header("Content-Type", "image/jpeg") - self.send_header("Content-Length", len(frame)) - self.end_headers() - self.wfile.write(frame) - self.wfile.write(b"\r\n") - time.sleep(self.delay) - - except BrokenPipeError as e: - logger.info(f"Removed streaming client {self.client_address}") - else: - self.send_error(404) - self.end_headers() - - -class StreamingServer(socketserver.ThreadingMixIn, http.server.HTTPServer): - allow_reuse_address = True - daemon_threads = True diff --git a/control/planktoscopehat/planktoscope/imagernew/__init__.py b/control/planktoscopehat/planktoscope/imagernew/__init__.py deleted file mode 100644 index 89756e2a..00000000 --- a/control/planktoscopehat/planktoscope/imagernew/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""imagernew provides high-level functionality for performing image acquisition.""" From 7b4de1f97ab788766d8a713011ede39be0011f85 Mon Sep 17 00:00:00 2001 From: Ethan Li Date: Thu, 26 Dec 2024 09:58:26 -0800 Subject: [PATCH 2/3] Update `main.py` for adafruithat (forgotten in previous commit) --- control/adafruithat/main.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/control/adafruithat/main.py b/control/adafruithat/main.py index f8540c01..fe02bf42 100644 --- a/control/adafruithat/main.py +++ b/control/adafruithat/main.py @@ -1,17 +1,17 @@ # Copyright Romain Bazile and other PlanktoScope project contributors -# +# # This file is part of the PlanktoScope software. -# +# # PlanktoScope is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. -# +# # PlanktoScope is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# +# # You should have received a copy of the GNU General Public License # along with PlanktoScope. If not, see . @@ -25,13 +25,13 @@ import planktoscope.mqtt import planktoscope.stepper -import planktoscope.light # Fan HAT LEDs +import planktoscope.light # Fan HAT LEDs import planktoscope.identity -import planktoscope.uuidName # Note: this is deprecated. -import planktoscope.display # Fan HAT OLED screen -from planktoscope.imagernew import mqtt as imagernew +import planktoscope.uuidName # Note: this is deprecated. +import planktoscope.display # Fan HAT OLED screen +from planktoscope.imager import mqtt as imager -# enqueue=True is necessary so we can log accross modules +# enqueue=True is necessary so we can log across modules # rotation happens everyday at 01:00 if not restarted # TODO: ensure the log directory exists logger.add( @@ -58,6 +58,7 @@ run = True # global variable to enable clean shutdown from stop signals + def handler_stop_signals(signum, _): """This handler simply stop the forever running loop in __main__""" global run @@ -67,7 +68,9 @@ def handler_stop_signals(signum, _): if __name__ == "__main__": logger.info("Welcome!") - logger.info( "Initialising signals handling and sanitizing the directories (step 1/4)") + logger.info( + "Initialising signals handling and sanitizing the directories (step 1/4)" + ) signal.signal(signal.SIGINT, handler_stop_signals) signal.signal(signal.SIGTERM, handler_stop_signals) @@ -93,7 +96,9 @@ def handler_stop_signals(signum, _): # create the path! os.makedirs(img_path) - logger.info(f"This PlanktoScope's Raspberry Pi's serial number is {planktoscope.uuidName.getSerial()}") + logger.info( + f"This PlanktoScope's Raspberry Pi's serial number is {planktoscope.uuidName.getSerial()}" + ) logger.info( f"This PlanktoScope's machine name is {planktoscope.identity.load_machine_name()}" ) @@ -113,7 +118,7 @@ def handler_stop_signals(signum, _): # Starts the imager control process logger.info("Starting the imager control process (step 3/4)") try: - imager_thread = imagernew.Worker(shutdown_event) + imager_thread = imager.Worker(shutdown_event) except Exception as e: logger.error(f"The imager control process could not be started: {e}") imager_thread = None From 658ea2d40b4050e9bada5fcb694e0c1983306623 Mon Sep 17 00:00:00 2001 From: Ethan Li Date: Thu, 26 Dec 2024 10:20:55 -0800 Subject: [PATCH 3/3] Update subpackage name references --- .../planktoscope/imager/__init__.py | 2 +- .../adafruithat/planktoscope/imager/mqtt.py | 63 ++++++++++++++----- .../planktoscope/imager/__init__.py | 2 +- .../planktoscope/imager/mqtt.py | 63 ++++++++++++++----- 4 files changed, 96 insertions(+), 34 deletions(-) diff --git a/control/adafruithat/planktoscope/imager/__init__.py b/control/adafruithat/planktoscope/imager/__init__.py index 89756e2a..77d0bb6e 100644 --- a/control/adafruithat/planktoscope/imager/__init__.py +++ b/control/adafruithat/planktoscope/imager/__init__.py @@ -1 +1 @@ -"""imagernew provides high-level functionality for performing image acquisition.""" +"""imager provides high-level functionality for performing image acquisition.""" diff --git a/control/adafruithat/planktoscope/imager/mqtt.py b/control/adafruithat/planktoscope/imager/mqtt.py index cf4ff9f6..59b110ec 100644 --- a/control/adafruithat/planktoscope/imager/mqtt.py +++ b/control/adafruithat/planktoscope/imager/mqtt.py @@ -12,7 +12,7 @@ from planktoscope import identity, integrity, mqtt from planktoscope.camera import mqtt as camera -from planktoscope.imagernew import stopflow +from planktoscope.imager import stopflow loguru.logger.info("planktoscope.imager is loaded") @@ -62,7 +62,9 @@ def run(self) -> None: couldn't be started (e.g. because the camera is missing), it will clean up and then wait until the `stop_event` event is set before quitting. """ - loguru.logger.info(f"The imager control thread has been started in process {os.getpid()}") + loguru.logger.info( + f"The imager control thread has been started in process {os.getpid()}" + ) self._mqtt = mqtt.MQTT_Client(topic="imager/#", name="imager_client") self._mqtt.client.publish("status/imager", '{"status":"Starting up"}') @@ -75,10 +77,16 @@ def run(self) -> None: self._camera = camera.Worker() self._camera.start() if self._camera.camera is None: - loguru.logger.error("Missing camera - maybe it's disconnected or it never started?") + loguru.logger.error( + "Missing camera - maybe it's disconnected or it never started?" + ) # TODO(ethanjli): officially add this error status to the MQTT API! - self._mqtt.client.publish("status/imager", '{"status": "Error: missing camera"}') - loguru.logger.success("Preemptively preparing to shut down since there's no camera...") + self._mqtt.client.publish( + "status/imager", '{"status": "Error: missing camera"}' + ) + loguru.logger.success( + "Preemptively preparing to shut down since there's no camera..." + ) self._cleanup() # TODO(ethanjli): currently we just wait and do nothing until we receive the shutdown # signal, because if we return early then the hardware controller will either shut down @@ -95,7 +103,10 @@ def run(self) -> None: self._mqtt.client.publish("status/imager", '{"status":"Ready"}') try: while not self._stop_event_loop.is_set(): - if self._active_routine is not None and not self._active_routine.is_alive(): + if ( + self._active_routine is not None + and not self._active_routine.is_alive() + ): # Garbage-collect any finished image-acquisition routine threads so that we're # ready for the next configuration update command which arrives: self._active_routine.stop() @@ -163,8 +174,12 @@ def _update_metadata(self, latest_message: dict[str, typing.Any]) -> None: return if "config" not in latest_message: - loguru.logger.error(f"Received message is missing field 'config': {latest_message}") - self._mqtt.client.publish("status/imager", '{"status":"Configuration message error"}') + loguru.logger.error( + f"Received message is missing field 'config': {latest_message}" + ) + self._mqtt.client.publish( + "status/imager", '{"status":"Configuration message error"}' + ) return loguru.logger.info("Updating configuration...") @@ -178,16 +193,22 @@ def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None: assert self._pump is not None assert self._camera is not None - if (acquisition_settings := _parse_acquisition_settings(latest_message)) is None: + if ( + acquisition_settings := _parse_acquisition_settings(latest_message) + ) is None: self._mqtt.client.publish("status/imager", '{"status":"Error"}') return if self._camera.camera is None: loguru.logger.error("Missing camera - maybe it was closed?") # TODO(ethanjli): officially add this error status to the MQTT API! - self._mqtt.client.publish("status/imager", '{"status": "Error: missing camera"}') + self._mqtt.client.publish( + "status/imager", '{"status": "Error: missing camera"}' + ) raise RuntimeError("Camera is not available") - assert (capture_size := self._camera.camera.stream_config.capture_size) is not None + assert ( + capture_size := self._camera.camera.stream_config.capture_size + ) is not None camera_settings = self._camera.camera.settings assert (image_gain := camera_settings.image_gain) is not None machine_name = identity.load_machine_name() @@ -216,7 +237,9 @@ def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None: return self._active_routine = ImageAcquisitionRoutine( - stopflow.Routine(output_path, acquisition_settings, self._pump, self._camera.camera), + stopflow.Routine( + output_path, acquisition_settings, self._pump, self._camera.camera + ), self._mqtt, ) self._active_routine.start() @@ -250,7 +273,9 @@ def _parse_acquisition_settings( total_images=int(latest_message["nb_frame"]), stabilization_duration=float(latest_message["sleep"]), pump=stopflow.DiscretePumpSettings( - direction=stopflow.PumpDirection(latest_message.get("pump_direction", "FORWARD")), + direction=stopflow.PumpDirection( + latest_message.get("pump_direction", "FORWARD") + ), flowrate=float(latest_message.get("pump_flowrate", 2)), volume=float(latest_message["volume"]), ), @@ -317,7 +342,9 @@ class ImageAcquisitionRoutine(threading.Thread): # TODO(ethanjli): instead of taking an arg of type mqtt.MQTT_CLIENT, just take an arg of # whatever `mqtt_client.client`'s type is supposed to be. Or maybe we should just initialize # our own MQTT client in here? - def __init__(self, routine: stopflow.Routine, mqtt_client: mqtt.MQTT_Client) -> None: + def __init__( + self, routine: stopflow.Routine, mqtt_client: mqtt.MQTT_Client + ) -> None: """Initialize the thread. Args: @@ -335,7 +362,9 @@ def run(self) -> None: if (result := self._routine.run_step()) is None: if self._routine.interrupted: loguru.logger.debug("Image-acquisition routine was interrupted!") - self._mqtt_client.publish("status/imager", '{"status":"Interrupted"}') + self._mqtt_client.publish( + "status/imager", '{"status":"Interrupted"}' + ) break loguru.logger.debug("Image-acquisition routine ran to completion!") self._mqtt_client.publish("status/imager", '{"status":"Done"}') @@ -415,7 +444,9 @@ def _receive_messages(self) -> None: continue if self._mqtt.msg["payload"]["status"] not in {"Done", "Interrupted"}: - loguru.logger.debug(f"Ignoring pump status update: {self._mqtt.msg['payload']}") + loguru.logger.debug( + f"Ignoring pump status update: {self._mqtt.msg['payload']}" + ) self._mqtt.read_message() continue diff --git a/control/planktoscopehat/planktoscope/imager/__init__.py b/control/planktoscopehat/planktoscope/imager/__init__.py index 89756e2a..77d0bb6e 100644 --- a/control/planktoscopehat/planktoscope/imager/__init__.py +++ b/control/planktoscopehat/planktoscope/imager/__init__.py @@ -1 +1 @@ -"""imagernew provides high-level functionality for performing image acquisition.""" +"""imager provides high-level functionality for performing image acquisition.""" diff --git a/control/planktoscopehat/planktoscope/imager/mqtt.py b/control/planktoscopehat/planktoscope/imager/mqtt.py index cf4ff9f6..59b110ec 100644 --- a/control/planktoscopehat/planktoscope/imager/mqtt.py +++ b/control/planktoscopehat/planktoscope/imager/mqtt.py @@ -12,7 +12,7 @@ from planktoscope import identity, integrity, mqtt from planktoscope.camera import mqtt as camera -from planktoscope.imagernew import stopflow +from planktoscope.imager import stopflow loguru.logger.info("planktoscope.imager is loaded") @@ -62,7 +62,9 @@ def run(self) -> None: couldn't be started (e.g. because the camera is missing), it will clean up and then wait until the `stop_event` event is set before quitting. """ - loguru.logger.info(f"The imager control thread has been started in process {os.getpid()}") + loguru.logger.info( + f"The imager control thread has been started in process {os.getpid()}" + ) self._mqtt = mqtt.MQTT_Client(topic="imager/#", name="imager_client") self._mqtt.client.publish("status/imager", '{"status":"Starting up"}') @@ -75,10 +77,16 @@ def run(self) -> None: self._camera = camera.Worker() self._camera.start() if self._camera.camera is None: - loguru.logger.error("Missing camera - maybe it's disconnected or it never started?") + loguru.logger.error( + "Missing camera - maybe it's disconnected or it never started?" + ) # TODO(ethanjli): officially add this error status to the MQTT API! - self._mqtt.client.publish("status/imager", '{"status": "Error: missing camera"}') - loguru.logger.success("Preemptively preparing to shut down since there's no camera...") + self._mqtt.client.publish( + "status/imager", '{"status": "Error: missing camera"}' + ) + loguru.logger.success( + "Preemptively preparing to shut down since there's no camera..." + ) self._cleanup() # TODO(ethanjli): currently we just wait and do nothing until we receive the shutdown # signal, because if we return early then the hardware controller will either shut down @@ -95,7 +103,10 @@ def run(self) -> None: self._mqtt.client.publish("status/imager", '{"status":"Ready"}') try: while not self._stop_event_loop.is_set(): - if self._active_routine is not None and not self._active_routine.is_alive(): + if ( + self._active_routine is not None + and not self._active_routine.is_alive() + ): # Garbage-collect any finished image-acquisition routine threads so that we're # ready for the next configuration update command which arrives: self._active_routine.stop() @@ -163,8 +174,12 @@ def _update_metadata(self, latest_message: dict[str, typing.Any]) -> None: return if "config" not in latest_message: - loguru.logger.error(f"Received message is missing field 'config': {latest_message}") - self._mqtt.client.publish("status/imager", '{"status":"Configuration message error"}') + loguru.logger.error( + f"Received message is missing field 'config': {latest_message}" + ) + self._mqtt.client.publish( + "status/imager", '{"status":"Configuration message error"}' + ) return loguru.logger.info("Updating configuration...") @@ -178,16 +193,22 @@ def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None: assert self._pump is not None assert self._camera is not None - if (acquisition_settings := _parse_acquisition_settings(latest_message)) is None: + if ( + acquisition_settings := _parse_acquisition_settings(latest_message) + ) is None: self._mqtt.client.publish("status/imager", '{"status":"Error"}') return if self._camera.camera is None: loguru.logger.error("Missing camera - maybe it was closed?") # TODO(ethanjli): officially add this error status to the MQTT API! - self._mqtt.client.publish("status/imager", '{"status": "Error: missing camera"}') + self._mqtt.client.publish( + "status/imager", '{"status": "Error: missing camera"}' + ) raise RuntimeError("Camera is not available") - assert (capture_size := self._camera.camera.stream_config.capture_size) is not None + assert ( + capture_size := self._camera.camera.stream_config.capture_size + ) is not None camera_settings = self._camera.camera.settings assert (image_gain := camera_settings.image_gain) is not None machine_name = identity.load_machine_name() @@ -216,7 +237,9 @@ def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None: return self._active_routine = ImageAcquisitionRoutine( - stopflow.Routine(output_path, acquisition_settings, self._pump, self._camera.camera), + stopflow.Routine( + output_path, acquisition_settings, self._pump, self._camera.camera + ), self._mqtt, ) self._active_routine.start() @@ -250,7 +273,9 @@ def _parse_acquisition_settings( total_images=int(latest_message["nb_frame"]), stabilization_duration=float(latest_message["sleep"]), pump=stopflow.DiscretePumpSettings( - direction=stopflow.PumpDirection(latest_message.get("pump_direction", "FORWARD")), + direction=stopflow.PumpDirection( + latest_message.get("pump_direction", "FORWARD") + ), flowrate=float(latest_message.get("pump_flowrate", 2)), volume=float(latest_message["volume"]), ), @@ -317,7 +342,9 @@ class ImageAcquisitionRoutine(threading.Thread): # TODO(ethanjli): instead of taking an arg of type mqtt.MQTT_CLIENT, just take an arg of # whatever `mqtt_client.client`'s type is supposed to be. Or maybe we should just initialize # our own MQTT client in here? - def __init__(self, routine: stopflow.Routine, mqtt_client: mqtt.MQTT_Client) -> None: + def __init__( + self, routine: stopflow.Routine, mqtt_client: mqtt.MQTT_Client + ) -> None: """Initialize the thread. Args: @@ -335,7 +362,9 @@ def run(self) -> None: if (result := self._routine.run_step()) is None: if self._routine.interrupted: loguru.logger.debug("Image-acquisition routine was interrupted!") - self._mqtt_client.publish("status/imager", '{"status":"Interrupted"}') + self._mqtt_client.publish( + "status/imager", '{"status":"Interrupted"}' + ) break loguru.logger.debug("Image-acquisition routine ran to completion!") self._mqtt_client.publish("status/imager", '{"status":"Done"}') @@ -415,7 +444,9 @@ def _receive_messages(self) -> None: continue if self._mqtt.msg["payload"]["status"] not in {"Done", "Interrupted"}: - loguru.logger.debug(f"Ignoring pump status update: {self._mqtt.msg['payload']}") + loguru.logger.debug( + f"Ignoring pump status update: {self._mqtt.msg['payload']}" + ) self._mqtt.read_message() continue