-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathread_sensors.py
105 lines (86 loc) · 3.2 KB
/
read_sensors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import time
import logging
import configparser
import json
import pandas as pd
import requests
import RPi.GPIO as GPIO
import board
import busio
import numpy as np
import adafruit_mlx90640
config = configparser.ConfigParser()
config.read('config.ini')
API_URL = config['API']['URL']
ID_REFUGIO = config['API']['ID_REFUGIO']
PASSWORD = config['API']['PASSWORD']
logging.basicConfig(level=logging.INFO)
UMBRAL_CAMBIO_TEMPERATURA = 5.0
class SensorLog:
def __init__(self):
self.df = pd.DataFrame(columns=['timestamp', 'status', 'sensor_id'])
self.failed_requests = []
def add_entry(self, sensor_id, status):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
new_entry = pd.DataFrame([[timestamp, status, sensor_id]], columns=self.df.columns)
self.df = pd.concat([self.df, new_entry], ignore_index=True)
def send_to_api(self, sensor_entry):
data = {
"id_refugio": ID_REFUGIO,
"password": PASSWORD,
"timestamp": sensor_entry['timestamp'],
"status": sensor_entry['status'],
"sensor_id": sensor_entry['sensor_id']
}
try:
response = requests.post(f"{API_URL}/refugio/", json=data)
if response.status_code != 200:
logging.warning(f"Error en la API: {response.content}")
self.failed_requests.append(data)
except requests.exceptions.RequestException as e:
logging.warning(f"Fallo al conectar con la API: {e}")
self.failed_requests.append(data)
def setup_gpio(pin):
GPIO.setmode(GPIO.BCM)
GPIO.setup(pin, GPIO.IN)
def read_sensor(pin):
return GPIO.input(pin)
i2c = busio.I2C(board.SCL, board.SDA, frequency=400000)
mlx = adafruit_mlx90640.MLX90640(i2c)
mlx.refresh_rate = adafruit_mlx90640.RefreshRate.REFRESH_2_HZ
def detect_movement_from_thermal_camera(temperatura_fondo):
frame = np.zeros((24*32,))
try:
mlx.getFrame(frame)
temperatura_actual = np.mean(frame)
if abs(temperatura_actual - temperatura_fondo) > UMBRAL_CAMBIO_TEMPERATURA:
return True
except ValueError:
pass
return False
def main():
temperatura_fondo = 20.0
INFRARED_SENSOR_PIN = 21
sensor_log = SensorLog()
setup_gpio(INFRARED_SENSOR_PIN)
last_sensor_value = None
try:
while True:
sensor_value = read_sensor(INFRARED_SENSOR_PIN)
if sensor_value != last_sensor_value:
last_sensor_value = sensor_value
status = "No obstacle" if sensor_value == 1 else "Obstacle"
logging.info(f"Sensor infrarrojo: {status}")
sensor_log.add_entry(INFRARED_SENSOR_PIN, status)
sensor_log.send_to_api(sensor_log.df.iloc[-1].to_dict())
if detect_movement_from_thermal_camera(temperatura_fondo):
logging.info("Movimiento detectado por la cámara térmica")
sensor_log.add_entry(2, "Thermal movement")
sensor_log.send_to_api(sensor_log.df.iloc[-1].to_dict())
time.sleep(1)
except KeyboardInterrupt:
logging.info("Programa interrumpido")
finally:
GPIO.cleanup()
if __name__ == "__main__":
main()