forked from SBorg2014/WLAN-Wetterstation
-
Notifications
You must be signed in to change notification settings - Fork 0
/
osm.py
100 lines (90 loc) · 4.03 KB
/
osm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""
OpenSensemap Sensor configuration
"""
import requests
from flask import current_app
SENSOR_NAMES = ["Temperatur", "Luftfeuchte", "Luftdruck relativ", "Luftdruck absolut", "Taupunkt", "gefühlte Temperatur", "Sonnenstrahlung", "Windgeschwindigkeit", "Windrichtung", "UV-Index", "Regen-Rate"]
SENSOR_FIELDS = ["tempf", "humidity", "baromrelin", "baromabsin", "dewptf", "windchillf", "solarradiation", "windspeedmph", "winddir", "uv", "rainratein"]
SENSOR_ICONS = ["osem-thermometer", "osem-humidity", "osem-barometer", "osem-barometer", "osem-thermometer", "osem-thermometer", "osem-brightness", "osem-particulate-matter", "osem-particulate-matter", "osem-brightness", "osem-umbrella"]
SENSOR_UNITS = ['°C', '%H', 'hPa', 'hPa', '°C', '°C', 'W/m²', 'km/h', '°', 'Index', 'mm/h']
SENSORS = list(zip(SENSOR_NAMES, SENSOR_FIELDS, SENSOR_ICONS, SENSOR_UNITS))
def login(config):
try:
url = "https://api.opensensemap.org/users/sign-in"
credentials = open("osm-credentials.txt", "r").read().split("\n")
response = requests.post(url, json=dict(email=credentials[0], password=credentials[1])).json()
config["JWT"] = response["token"]
config["REFRESH"] = response["refreshToken"]
return True
except:
return False
def relogin(config):
pass
def initialize_osm():
if not login(current_app.config):
raise RuntimeError("Login failed")
if not get_sensors(current_app.config):
register_sensors(current_app.config)
def get_sensors(config):
url = f"https://api.opensensemap.org/boxes/{config['SENSEBOX_ID']}"
try:
sensor = requests.get(url).json()
config["SENSOR_IDS"] = {}
delete_sensors = []
for s in sensor["sensors"]:
if s["title"] in SENSOR_NAMES:
i = SENSOR_NAMES.index(s["title"])
config["SENSOR_IDS"][SENSOR_FIELDS[i]] = s["_id"]
else:
delete_sensors.append(s["_id"])
print(f"Sensor {s['title']} is configured at osm, but not in this server.")
if len(delete_sensors) > 0:
import os
if os.environ.get("DELETE_OLD_SM_SENSORS", None) is not None:
print("Deleting sensors", delete_sensors)
sensors = []
for s in delete_sensors:
sensors.append(dict(deleted="true",_id=s))
request = dict(sensors=sensors)
url = f"https://api.opensensemap.org/boxes/{config['SENSEBOX_ID']}"
resp = requests.put(url, json=request, headers={"Authorization": f"Bearer {config['JWT']}"})
else:
print("I could delete sensors", delete_sensors)
print("To go ahead, set the DELETE_OLD_SM_SENSORS environment variable")
if len(config["SENSOR_IDS"]) == len(SENSORS):
print("Matched all sensors")
print(config["SENSOR_IDS"])
return True
print("Not all sensors found. Going to recreate all of them. Might result in chaos...")
return False
except Exception as e:
print(e)
return False
def register_sensors(config):
# return
sensors = []
for sensor in SENSORS:
name, _, icon, unit = sensor
sensors.append(dict(
new="true",
edited="true",
title=name,
unit=unit,
sensorType="Wetterstation",
icon=icon
))
request = dict(sensors=sensors)
url = f"https://api.opensensemap.org/boxes/{config['SENSEBOX_ID']}"
resp = requests.put(url, json=request, headers={"Authorization": f"Bearer {config['JWT']}"})
print(resp.text)
def hitzeindex(temp, feuchte):
number = -8.784695
number += 1.61139411 * temp
number += 2.338549 * feuchte
number -= 0.14611605 * temp * feuchte
number -= 0.012308094 * (temp ** 2)
number -= 0.016424828 * (feuchte ** 2)
number += 0.002211732 * (temp ** 2) * feuchte
number += 0.00072546 * temp * (feuchte ** 2)
number -= 0.000003582 * (temp** 2) * (feuchte ** 2)
return number