Skip to content

Commit

Permalink
Merge pull request #47 from pimoroni/fix/config-management
Browse files Browse the repository at this point in the history
improved config file handling
  • Loading branch information
lowfatcode authored Sep 2, 2022
2 parents 1692755 + 91f3191 commit 8592dd6
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 286 deletions.
213 changes: 113 additions & 100 deletions enviro/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,92 @@
# keep the power rail alive by holding VSYS_EN high as early as possible
# ===========================================================================
from enviro.constants import *
from machine import Pin
hold_vsys_en_pin = Pin(HOLD_VSYS_EN_PIN, Pin.OUT, value=True)

# all the other imports, so many shiny modules
import math, time, machine, sys, os, ujson
from machine import Timer, PWM, RTC, ADC
from phew import logging, remote_mount
# detect board model based on devices on the i2c bus and pin state
# ===========================================================================
from pimoroni_i2c import PimoroniI2C
i2c = PimoroniI2C(I2C_SDA_PIN, I2C_SCL_PIN, 100000)
i2c_devices = i2c.scan()
model = None
if 56 in i2c_devices: # 56 = colour / light sensor and only present on Indoor
model = "indoor"
elif 35 in i2c_devices: # 35 = ltr-599 on grow & weather
pump1_pin = Pin(10, Pin.IN, Pin.PULL_UP)
model = "grow" if pump1_pin.value() == False else "weather"
pump1_pin.init(pull=None) # disable the pull up (or weather stays awake)
else:
model = "urban" # otherwise it's urban..

# set up the activity led
# ===========================================================================
from machine import PWM, Timer
import math
activity_led_pwm = PWM(Pin(ACTIVITY_LED_PIN))
activity_led_pwm.freq(1000)
activity_led_pwm.duty_u16(0)

# set the brightness of the activity led
def activity_led(brightness):
brightness = max(0, min(100, brightness)) # clamp to range
# gamma correct the brightness (gamma 2.8)
value = int(pow(brightness / 100.0, 2.8) * 65535.0 + 0.5)
activity_led_pwm.duty_u16(value)

activity_led_timer = Timer(-1)
activity_led_pulse_speed_hz = 1
def activity_led_callback(t):
# updates the activity led brightness based on a sinusoid seeded by the current time
brightness = (math.sin(time.ticks_ms() * math.pi * 2 / (1000 / activity_led_pulse_speed_hz)) * 40) + 60
value = int(pow(brightness / 100.0, 2.8) * 65535.0 + 0.5)
activity_led_pwm.duty_u16(value)

# set the activity led into pulsing mode
def pulse_activity_led(speed_hz = 1):
global activity_led_timer, activity_led_pulse_speed_hz
activity_led_pulse_speed_hz = speed_hz
activity_led_timer.deinit()
activity_led_timer.init(period=50, mode=Timer.PERIODIC, callback=activity_led_callback)

# turn off the activity led and disable any pulsing animation that's running
def stop_activity_led():
global activity_led_timer
activity_led_timer.deinit()
activity_led_pwm.duty_u16(0)

# check whether device needs provisioning
# ===========================================================================
import time
from phew import logging
button_pin = Pin(BUTTON_PIN, Pin.IN, Pin.PULL_DOWN)
needs_provisioning = False
start = time.time()
while button_pin.value(): # button held for 3 seconds go into provisioning
if time.time() - start > 3:
needs_provisioning = True
break

try:
import config # fails to import (missing/corrupt) go into provisioning
if not config.provisioned: # provisioned flag not set go into provisioning
needs_provisioning = True
except ImportError as e:
logging.error("> missing or corrupt config.py", e)
needs_provisioning = True

if needs_provisioning:
logging.info("> entering provisioning mode")
import enviro.provisioning
# control never returns to here, provisioning takes over completely




# all the other imports, so many shiny modules
import machine, sys, os, ujson
from machine import RTC, ADC
import phew
from pcf85063a import PCF85063A
import enviro.helpers as helpers

Expand All @@ -22,18 +101,9 @@
Pin(WIFI_CS_PIN).value(old_state)

# set up the button, external trigger, and rtc alarm pins
button_pin = Pin(BUTTON_PIN, Pin.IN, Pin.PULL_DOWN)
rtc_alarm_pin = Pin(RTC_ALARM_PIN, Pin.IN, Pin.PULL_DOWN)
external_trigger_pin = Pin(EXTERNAL_INTERRUPT_PIN, Pin.IN, Pin.PULL_DOWN)

# setup the i2c bus
i2c = PimoroniI2C(I2C_SDA_PIN, I2C_SCL_PIN, 100000)

# set up the activity led
activity_led_pwm = PWM(Pin(ACTIVITY_LED_PIN))
activity_led_pwm.freq(1000)
activity_led_pwm.duty_u16(0)

# intialise the pcf85063a real time clock chip
rtc = PCF85063A(i2c)
i2c.writeto_mem(0x51, 0x00, b'\x00') # ensure rtc is running (this should be default?)
Expand All @@ -56,22 +126,26 @@
print(" - -- ---- -----=--==--=== hey enviro, let's go! ===--==--=----- ---- -- - ")
print("")

# returns True if the board needs provisioning
def needs_provisioning():
# if config fails to import (missing or corrupt) then we need to provision
try:
import config
if not helpers.get_config("provisioned"):
return True
except ImportError as e:
logging.error("> error in config.py", e)
return True # error importing config
return False # config imported fine

def provision():
# this import starts the provisioning process and control will never
# return from here
import enviro.provisioning


def connect_to_wifi():
if phew.is_connected_to_wifi():
logging.info(f"> already connected to wifi")
return True

wifi_ssid = config.wifi_ssid
wifi_password = config.wifi_password

logging.info(f"> connecting to wifi network '{wifi_ssid}'")
ip = phew.connect_to_wifi(wifi_ssid, wifi_password, timeout_seconds=30)

if not ip:
logging.error(f"! failed to connect to wireless network {wifi_ssid}")
return False

logging.info(" - ip address: ", ip)

return True

# returns the reason we woke up
def wake_reason():
Expand All @@ -82,11 +156,11 @@ def wake_reason():
def halt(message):
logging.error(message)
warn_led(WARN_LED_BLINK)
sleep(helpers.get_config("reading_frequency"))
sleep(config.reading_frequency)

# returns True if we've used up 90% of the internal filesystem
def low_disk_space():
if not remote_mount: # os.statvfs doesn't exist on remote mounts
if not phew.remote_mount: # os.statvfs doesn't exist on remote mounts
return (os.statvfs(".")[3] / os.statvfs(".")[2]) < 0.1
return False

Expand All @@ -97,23 +171,14 @@ def is_clock_set():
# connect to wifi and attempt to fetch the current time from an ntp server
def sync_clock_from_ntp():
from phew import ntp
if not helpers.connect_to_wifi():
if not connect_to_wifi():
return False
timestamp = ntp.fetch()
if not timestamp:
return False
rtc.datetime(timestamp) # set the time on the rtc chip
return True

# returns true if the button is held for the number of seconds provided
def button_held_for(seconds):
start = time.time()
while button_pin.value():
if time.time() - start > seconds:
return True
time.sleep(0.1)
return False

# set the state of the warning led (off, on, blinking)
def warn_led(state):
if state == WARN_LED_OFF:
Expand All @@ -126,33 +191,6 @@ def warn_led(state):
# the pcf85063a defaults to 32KHz clock output so need to explicitly turn off
warn_led(WARN_LED_OFF)

# set the brightness of the activity led
def activity_led(brightness):
brightness = max(0, min(100, brightness)) # clamp to range
# gamma correct the brightness (gamma 2.8)
value = int(pow(brightness / 100.0, 2.8) * 65535.0 + 0.5)
activity_led_pwm.duty_u16(value)

activity_led_timer = Timer(-1)
activity_led_pulse_speed_hz = 1
def activity_led_callback(t):
# updates the activity led brightness based on a sinusoid seeded by the current time
brightness = (math.sin(time.ticks_ms() * math.pi * 2 / (1000 / activity_led_pulse_speed_hz)) * 40) + 60
value = int(pow(brightness / 100.0, 2.8) * 65535.0 + 0.5)
activity_led_pwm.duty_u16(value)

# set the activity led into pulsing mode
def pulse_activity_led(speed_hz = 1):
global activity_led_timer, activity_led_pulse_speed_hz
activity_led_pulse_speed_hz = speed_hz
activity_led_timer.deinit()
activity_led_timer.init(period=50, mode=Timer.PERIODIC, callback=activity_led_callback)

# turn off the activity led and disable any pulsing animation that's running
def stop_activity_led():
global activity_led_timer
activity_led_timer.deinit()
activity_led_pwm.duty_u16(0)

# returns the reason the board woke up from deep sleep
def get_wake_reason():
Expand All @@ -177,23 +215,8 @@ def wake_reason_name(wake_reason):
}
return names[wake_reason] if wake_reason in names else None

# guess board type based on devices on the i2c bus and pin state
def detect_model():
i2c_devices = i2c.scan()
result = None
if 56 in i2c_devices: # 56 = colour / light sensor and only present on Indoor
result = "indoor"
elif 35 in i2c_devices: # 35 = ltr-599 on grow & weather
pump1_pin = Pin(10, Pin.IN, Pin.PULL_UP)
result = "grow" if pump1_pin.value() == False else "weather"
pump1_pin.init(pull=None) # disable the pull up (or weather stays awake)
else:
result = "urban" # otherwise it's urban..
return result

# return the module that implements this board type
def get_board():
model = detect_model()
if model == "indoor":
import enviro.boards.indoor as board
if model == "grow":
Expand Down Expand Up @@ -226,16 +249,16 @@ def save_reading(readings):
f.write(",".join(row) + "\r\n")

# is an upload destination set? if so cache this reading for upload too
if helpers.get_config("destination"):
if config.destination:
cache_upload(readings)

# save the provided readings into a cache file for future uploading
def cache_upload(readings):
payload = {
"nickname": helpers.get_config("nickname"),
"nickname": config.nickname,
"timestamp": helpers.datetime_string(),
"readings": readings,
"model": detect_model(),
"model": model,
"uid": helpers.uid()
}
uploads_filename = f"uploads/{helpers.datetime_string()}.json"
Expand All @@ -248,14 +271,14 @@ def cached_upload_count():

# returns True if we have more cached uploads than our config allows
def is_upload_needed():
return cached_upload_count() >= helpers.get_config("upload_frequency")
return cached_upload_count() >= config.upload_frequency

# upload cached readings to the configured destination
def upload_readings():
if not helpers.connect_to_wifi():
if not connect_to_wifi():
return False

destination = helpers.get_config("destination")
destination = config.destination
exec(f"import enviro.destinations.{destination}")
destination_module = sys.modules[f"enviro.destinations.{destination}"]
for cache_file in os.ilistdir("uploads"):
Expand Down Expand Up @@ -283,16 +306,6 @@ def startup():
logging.debug(" - turn on activity led")
pulse_activity_led(0.5)

# if button held for 3 seconds on startup then go into provisioning mode
user_requested_provisioning = button_held_for(3)

# if enviro isn't configured or the user requested provisioning then
# put the board into provisioning (setup) mode
if user_requested_provisioning or needs_provisioning():
logging.info("> entering provisioning mode")
provision()
# control never returns to here, provisioning takes over comp letely

# ensure we have a directory to store reading and upload files
helpers.mkdir_safe("readings")
helpers.mkdir_safe("uploads")
Expand Down Expand Up @@ -321,7 +334,7 @@ def sleep(minutes = -1):

# if running via mpremote/pyboard.py with a remote mount then we can't
# reset the board so just exist
if remote_mount:
if phew.remote_mount:
sys.exit()

# we'll wait here until the rtc timer triggers and then reset the board
Expand Down
8 changes: 4 additions & 4 deletions enviro/destinations/adafruit_io.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enviro.helpers import get_config
import urequests
import config

def upload_reading(reading):
# create adafruit.io payload format
Expand All @@ -9,14 +9,14 @@ def upload_reading(reading):
}

# add all the sensor readings
nickname = get_config("nickname")
nickname = config.nickname
for key, value in reading["readings"].items():
key = key.replace("_", "-")
payload["feeds"].append({"key": f"{nickname}-{key}", "value": value})

# send the payload
username = get_config("adafruit_io_username")
headers = {'X-AIO-Key': get_config("adafruit_io_key"), 'Content-Type': 'application/json'}
username = config.adafruit_io_username
headers = {'X-AIO-Key': config.adafruit_io_key, 'Content-Type': 'application/json'}
url = f"http://io.adafruit.com/api/v2/{username}/groups/enviro/data"

try:
Expand Down
8 changes: 4 additions & 4 deletions enviro/destinations/http.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from enviro.helpers import get_config
import urequests
import config

def upload_reading(reading):
url = get_config("custom_http_url")
url = config.custom_http_url

auth = None
if get_config("custom_http_username"):
auth = (get_config("custom_http_username"), get_config("custom_http_password"))
if config.custom_http_username:
auth = (config.custom_http_username, config.custom_http_password)

try:
# post reading data to http endpoint
Expand Down
10 changes: 5 additions & 5 deletions enviro/destinations/influxdb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enviro.helpers import get_config
import urequests, time
import config

def url_encode(t):
result = ""
Expand All @@ -14,7 +14,7 @@ def url_encode(t):
return result

def upload_reading(reading):
bucket = get_config("influxdb_bucket")
bucket = config.influxdb_bucket

payload = ""
for key, value in reading["readings"].items():
Expand All @@ -33,13 +33,13 @@ def upload_reading(reading):
nickname = reading["nickname"]
payload += f"{key},device={nickname} value={value}"

influxdb_token = get_config("influxdb_token")
influxdb_token = config.influxdb_token
headers = {
"Authorization": f"Token {influxdb_token}"
}

url = get_config("influxdb_url")
org = get_config("influxdb_org")
url = config.influxdb_url
org = config.influxdb_org
url += f"/api/v2/write?precision=s&org={url_encode(org)}&bucket={url_encode(bucket)}"

try:
Expand Down
Loading

0 comments on commit 8592dd6

Please sign in to comment.