Skip to content

Commit

Permalink
Merge branch 'new-calibrations' into develop
Browse files Browse the repository at this point in the history
* new-calibrations:
  I think this can be merged now
  back to calibrations
  back to calibrations
  keep checking for mqtt job
  fix typo in persisent cache name; use configs to determine where caches live; remove diskcache
  fix typo in persisent cache name; use configs to determine where caches live; remove diskcache
  sigh
  more liberal parsing
  try this
  plugins now target the api endpoint in their homepage
  wip
  don't need to start stirring if od_statistics will do it anyways - right?
  wip
  okay that wasn't so bad
  commiting just before I redo everything
  fix more
  stirring calibration, and removing the old diskcache for persistant storages
  wip
  • Loading branch information
CamDavidsonPilon committed Jan 2, 2025
2 parents 515d104 + 0427d6c commit 0a12433
Show file tree
Hide file tree
Showing 50 changed files with 1,334 additions and 1,315 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ repos:
hooks:
- id: mypy
additional_dependencies: [
msgspec==0.18.5,
msgspec==0.19.0,
types-pkg_resources==0.1.3,
]

Expand Down
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,26 @@
### Upcoming
- API to set clock time
- new calibrations CLI
- persistent storage is now on single sqlite3 database
- new "add log entry" dialog
- kalman filter table is no longer populated
- deprecated `default` in background_jobs yaml fields.
- [ ] test self-test
- [ ] check back edits to stirring calibration
- [ ] changes to settings api in unit_api
- [ ] logs page
- plugins page
- leader page
- moved intermittant cache location - this requires a restart
- new table for historical experiment assignments
- plugins page has dropdown to select the unit
- new config entries under storage

- stirring calibrations needs to be redone


#### Breaking changes
- fixed typo `utils.local_persistant_storage` to `utils.local_persistent_storage`.

### 24.12.10
- Hotfix for UI settings bug
Expand Down
5 changes: 4 additions & 1 deletion config.dev.ini
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ use_calibration=1
smoothing_penalizer=700.0

[storage]
database=pioreactor.sqlite
database=.pioreactor/storage/pioreactor.sqlite
temporary_cache=/tmp/pioreactor_cache/local_intermittent_pioreactor_metadata.sqlite
persistent_cache=.pioreactor/storage/local_persistent_pioreactor_metadata.sqlite


[logging]
log_file=./pioreactor.log
Expand Down
11 changes: 0 additions & 11 deletions pioreactor/actions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,2 @@
# -*- coding: utf-8 -*-
from __future__ import annotations

from pioreactor.actions import led_intensity
from pioreactor.actions import od_blank
from pioreactor.actions import od_calibration
from pioreactor.actions import pump
from pioreactor.actions import pump_calibration
from pioreactor.actions import self_test
from pioreactor.actions import stirring_calibration
from pioreactor.actions.leader import backup_database
from pioreactor.actions.leader import experiment_profile
from pioreactor.actions.leader import export_experiment_data
6 changes: 3 additions & 3 deletions pioreactor/actions/leader/backup_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pioreactor.exc import RsyncError
from pioreactor.logging import create_logger
from pioreactor.utils import local_intermittent_storage
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import local_persistent_storage
from pioreactor.utils import managed_lifecycle
from pioreactor.utils.networking import resolve_to_address
from pioreactor.utils.networking import rsync
Expand Down Expand Up @@ -69,7 +69,7 @@ def backup_database(output_file: str, force: bool = False, backup_to_workers: in
bck.close()
con.close()

with local_persistant_storage("database_backups") as cache:
with local_persistent_storage("database_backups") as cache:
cache["latest_backup_timestamp"] = current_time

logger.info("Completed backup of database.")
Expand Down Expand Up @@ -102,7 +102,7 @@ def backup_database(output_file: str, force: bool = False, backup_to_workers: in
logger.debug(f"Backed up database to {backup_unit}:{output_file}.")
backups_complete += 1

with local_persistant_storage("database_backups") as cache:
with local_persistent_storage("database_backups") as cache:
cache[f"latest_backup_in_{backup_unit}"] = current_time

return
Expand Down
30 changes: 16 additions & 14 deletions pioreactor/actions/leader/experiment_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pioreactor.whoami import get_unit_name
from pioreactor.whoami import is_testing_env

bool_expression = str | bool
BoolExpression = str | bool
Env = dict[str, Any]

STRICT_EXPRESSION_PATTERN = r"^\${{(.*?)}}$"
Expand Down Expand Up @@ -90,7 +90,7 @@ def evaluate_log_message(message: str, env: dict) -> str:
return result_string


def evaluate_bool_expression(bool_expression: bool_expression, env: dict) -> bool:
def evaluate_bool_expression(bool_expression: BoolExpression, env: dict) -> bool:
from pioreactor.experiment_profiles.parser import parse_profile_expression_to_bool

if isinstance(bool_expression, bool):
Expand All @@ -103,7 +103,7 @@ def evaluate_bool_expression(bool_expression: bool_expression, env: dict) -> boo
return parse_profile_expression_to_bool(bool_expression, env=env)


def check_syntax_of_bool_expression(bool_expression: bool_expression) -> bool:
def check_syntax_of_bool_expression(bool_expression: BoolExpression) -> bool:
from pioreactor.experiment_profiles.parser import check_syntax

if isinstance(bool_expression, bool):
Expand Down Expand Up @@ -320,11 +320,11 @@ def when(
client: Client,
job_name: str,
dry_run: bool,
if_: Optional[bool_expression],
if_: Optional[BoolExpression],
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
condition: bool_expression,
condition: BoolExpression,
when_action: struct.When,
actions: list[struct.Action],
schedule: scheduler,
Expand Down Expand Up @@ -393,12 +393,12 @@ def repeat(
client: Client,
job_name: str,
dry_run: bool,
if_: Optional[bool_expression],
if_: Optional[BoolExpression],
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
repeat_action: struct.Repeat,
while_: Optional[bool_expression],
while_: Optional[BoolExpression],
repeat_every_hours: float,
max_hours: Optional[float],
actions: list[struct.BasicAction],
Expand Down Expand Up @@ -484,7 +484,7 @@ def log(
client: Client,
job_name: str,
dry_run: bool,
if_: Optional[bool_expression],
if_: Optional[BoolExpression],
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
Expand Down Expand Up @@ -517,7 +517,7 @@ def start_job(
client: Client,
job_name: str,
dry_run: bool,
if_: Optional[bool_expression],
if_: Optional[BoolExpression],
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
Expand Down Expand Up @@ -559,7 +559,7 @@ def pause_job(
client: Client,
job_name: str,
dry_run: bool,
if_: Optional[bool_expression],
if_: Optional[BoolExpression],
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
Expand Down Expand Up @@ -595,7 +595,7 @@ def resume_job(
client: Client,
job_name: str,
dry_run: bool,
if_: Optional[bool_expression],
if_: Optional[BoolExpression],
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
Expand Down Expand Up @@ -632,7 +632,7 @@ def stop_job(
client: Client,
job_name: str,
dry_run: bool,
if_: Optional[bool_expression],
if_: Optional[BoolExpression],
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
Expand Down Expand Up @@ -668,7 +668,7 @@ def update_job(
client: Client,
job_name: str,
dry_run: bool,
if_: Optional[bool_expression],
if_: Optional[BoolExpression],
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
Expand Down Expand Up @@ -814,7 +814,9 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run:
unit = get_unit_name()
action_name = "experiment_profile"
logger = create_logger(action_name, unit=unit, experiment=experiment)
with managed_lifecycle(unit, experiment, action_name, ignore_is_active_state=True) as state:
with managed_lifecycle(
unit, experiment, action_name, ignore_is_active_state=True, is_long_running_job=True
) as state:
try:
profile = load_and_verify_profile(profile_filename)
except Exception as e:
Expand Down
13 changes: 8 additions & 5 deletions pioreactor/actions/leader/export_experiment_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,10 @@ def export_experiment_data(
iloc_unit = None

parition_to_writer_map: dict[tuple, Any] = {}

count = 0
with ExitStack() as stack:
for i, row in enumerate(cursor, start=1):
for row in cursor:
count += 1
rows_partition = (
row[iloc_experiment] if iloc_experiment is not None else "all_experiments",
row[iloc_unit] if iloc_unit is not None else "all_units",
Expand All @@ -245,10 +246,12 @@ def export_experiment_data(

parition_to_writer_map[rows_partition].writerow(row)

if i % 1000 == 0:
logger.debug(f"Exported {i} rows...")
if count % 10_000 == 0:
logger.debug(f"Exported {count} rows...")

logger.debug(f"Exported {i} rows from {dataset_name}.")
logger.debug(f"Exported {count} rows from {dataset_name}.")
if count == 0:
logger.warning(f"No data present in {dataset_name}. Check database?")

for filename in filenames:
path_to_file = Path(Path(output).parent / filename)
Expand Down
15 changes: 4 additions & 11 deletions pioreactor/actions/od_blank.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
from __future__ import annotations

from collections import defaultdict
Expand All @@ -21,7 +20,7 @@
from pioreactor.logging import create_logger
from pioreactor.pubsub import prune_retained_messages
from pioreactor.utils import is_pio_job_running
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import local_persistent_storage
from pioreactor.utils import managed_lifecycle
from pioreactor.utils import math_helpers
from pioreactor.utils.timing import current_utc_datetime
Expand Down Expand Up @@ -119,7 +118,7 @@ def delete_od_blank(unit=None, experiment=None):
unit = unit or whoami.get_unit_name()
experiment = experiment or whoami.get_assigned_experiment_name(unit)

with local_persistant_storage(action_name) as cache:
with local_persistent_storage(action_name) as cache:
if experiment not in cache:
return

Expand Down Expand Up @@ -150,7 +149,6 @@ def od_blank(
experiment=None,
) -> dict[pt.PdChannel, float]:
from pioreactor.background_jobs.od_reading import start_od_reading
from pioreactor.background_jobs.stirring import start_stirring

action_name = "od_blank"
unit = unit or whoami.get_unit_name()
Expand All @@ -174,17 +172,12 @@ def od_blank(
interval=1.5,
experiment=testing_experiment, # use testing experiment to not pollute the database (and they would show up in the UI)
fake_data=whoami.is_testing_env(),
) as od_stream, start_stirring(
unit=unit,
experiment=testing_experiment,
) as st:
) as od_stream:
# warm up OD reader
for count, _ in enumerate(od_stream, start=0):
if count == 5:
break

st.block_until_rpm_is_close_to_target(timeout=30)

means, _ = od_statistics(
od_stream,
action_name,
Expand All @@ -198,7 +191,7 @@ def od_blank(
logger.error(e)
raise e

with local_persistant_storage(action_name) as cache:
with local_persistent_storage(action_name) as cache:
cache[experiment] = dumps(means)

for channel, mean in means.items():
Expand Down
14 changes: 6 additions & 8 deletions pioreactor/actions/pump.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,16 @@
from pioreactor.whoami import get_assigned_experiment_name
from pioreactor.whoami import get_unit_name

DEFAULT_PWM_CALIBRATION = structs.PumpCalibration(
# TODO: provide better estimates for duration_ and bias_ based on some historical data.
# it can even be a function of voltage
name="default",
DEFAULT_PWM_CALIBRATION: structs.AnyPumpCalibration = structs._PumpCalibration(
pioreactor_unit=get_unit_name(),
created_at=default_datetime_for_pioreactor(),
pump="",
hz=200.0,
dc=100.0,
duration_=1.0,
bias_=0,
voltage=-1,
calibration_name="default_pump_calibration",
curve_type="poly",
curve_data_=[1.0, 0.0],
recorded_data={"x": [], "y": []},
)


Expand Down Expand Up @@ -166,7 +164,7 @@ def _get_pin(pump_type: str, config) -> pt.GpioPin:

def _get_calibration(pump_type: str) -> structs.AnyPumpCalibration:
# TODO: make sure current voltage is the same as calibrated. Actually where should that check occur? in Pump?
with utils.local_persistant_storage("current_pump_calibration") as cache:
with utils.local_persistent_storage("current_pump_calibration") as cache:
try:
return decode(cache[pump_type], type=structs.AnyPumpCalibration) # type: ignore
except KeyError:
Expand Down
18 changes: 4 additions & 14 deletions pioreactor/actions/self_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import sys
from json import dumps
from json import loads
from threading import Thread
from time import sleep
from typing import Callable
Expand Down Expand Up @@ -44,7 +43,7 @@
from pioreactor.types import LedChannel
from pioreactor.types import PdChannel
from pioreactor.utils import is_pio_job_running
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import local_persistent_storage
from pioreactor.utils import managed_lifecycle
from pioreactor.utils import SummableDict
from pioreactor.utils.math_helpers import correlation
Expand Down Expand Up @@ -397,16 +396,7 @@ def test_positive_correlation_between_rpm_and_stirring(
assert is_heating_pcb_present(), "Heating PCB was not detected."
assert voltage_in_aux() <= 18.0, f"Voltage measured {voltage_in_aux()} > 18.0V"

with local_persistant_storage("stirring_calibration") as cache:
if "linear_v1" in cache:
parameters = loads(cache["linear_v1"])
rpm_coef = parameters["rpm_coef"]
intercept = parameters["intercept"]

initial_dc = rpm_coef * 700 + intercept

else:
initial_dc = config.getfloat("stirring.config", "initial_duty_cycle")
initial_dc = config.getfloat("stirring.config", "initial_duty_cycle")

dcs = []
measured_rpms = []
Expand Down Expand Up @@ -471,12 +461,12 @@ def _run(self, managed_state, logger: CustomLogger, unit: str, testing_experimen

managed_state.publish_setting(test_name, int(res))

with local_persistant_storage("self_test_results") as c:
with local_persistent_storage("self_test_results") as c:
c[(self.experiment, test_name)] = int(res)


def get_failed_test_names(experiment: str) -> Iterator[str]:
with local_persistant_storage("self_test_results") as c:
with local_persistent_storage("self_test_results") as c:
for name in get_all_test_names():
if c.get((experiment, name)) == 0:
yield name
Expand Down
4 changes: 2 additions & 2 deletions pioreactor/automations/dosing/chemostat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pioreactor.automations import events
from pioreactor.automations.dosing.base import DosingAutomationJob
from pioreactor.exc import CalibrationError
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import local_persistent_storage


class Chemostat(DosingAutomationJob):
Expand All @@ -20,7 +20,7 @@ class Chemostat(DosingAutomationJob):
def __init__(self, volume: float | str, **kwargs) -> None:
super().__init__(**kwargs)

with local_persistant_storage("current_pump_calibration") as cache:
with local_persistent_storage("current_pump_calibration") as cache:
if "media" not in cache:
raise CalibrationError("Media and waste pump calibration must be performed first.")
elif "waste" not in cache:
Expand Down
Loading

0 comments on commit 0a12433

Please sign in to comment.