diff --git a/phone_sensors/api.py b/phone_sensors/api.py index 07471a9..a47fd84 100644 --- a/phone_sensors/api.py +++ b/phone_sensors/api.py @@ -11,7 +11,7 @@ from sqlmodel import Session from phone_sensors.birdnet import submit_analyze_audio_job -from phone_sensors.schemas import SensorMetadata +from phone_sensors.schemas import SensorStatus from phone_sensors.settings import get_db_session, get_redis_connection app = FastAPI(title="Phone Sensors API", version="0.1.0") @@ -53,7 +53,7 @@ async def upload( Data will be queued to be processed by the server. Returns a job ID in 128-bit UUID format. """ - metadata = SensorMetadata( + metadata = SensorStatus( sensor_id=sensor_id, timestamp=timestamp, lat=lat, @@ -64,11 +64,5 @@ async def upload( ) file_path = Path(tempfile.mktemp(suffix=".wav")) file_path.write_bytes(await audio_file.read()) - job_id = submit_analyze_audio_job(session, redis_conn, file_path, sensor_metadata=metadata) + job_id = submit_analyze_audio_job(redis_conn, file_path, sensor_status=metadata) return job_id - - -@app.post("/register_sensor") -def register_sensor() -> dict: - """Sensor registration endpoint.""" - raise NotImplementedError("Not implemented yet") diff --git a/phone_sensors/birdnet.py b/phone_sensors/birdnet.py index a34b9e0..8d6db2e 100644 --- a/phone_sensors/birdnet.py +++ b/phone_sensors/birdnet.py @@ -8,51 +8,72 @@ from birdnetlib.analyzer import Analyzer from pydantic import FilePath from redis import Redis +from requests import session from rq import Callback, Queue from rq.job import Job -from sqlmodel import Session +from sqlmodel import Session, select -from phone_sensors.schemas import BirdNetDetection, Detection, SensorMetadata -from phone_sensors.settings import get_settings +from phone_sensors.schemas import BirdNetDetection, Detection, SensorStatus +from phone_sensors.settings import get_db_session, get_settings logger = logging.getLogger(__name__) def analyze_audio( - file_path: FilePath, sensor_metadata: SensorMetadata, min_conf: float -) -> tuple[list[BirdNetDetection], SensorMetadata]: + file_path: FilePath, sensor_status: SensorStatus, min_conf: float +) -> tuple[list[BirdNetDetection], SensorStatus]: """Analyze audio file and return list of bird species.""" logger.info("Analyzing audio file %s...", file_path) - logger.debug("Sensor metadata: %s", sensor_metadata.model_dump_json(indent=2)) + logger.debug("Sensor metadata: %s", sensor_status.model_dump_json(indent=2)) recording = Recording( analyzer=Analyzer(), path=file_path, - lat=sensor_metadata.lat, - lon=sensor_metadata.lon, + lat=sensor_status.lat, + lon=sensor_status.lon, min_conf=min_conf, ) recording.analyze() - return recording.detections, sensor_metadata + return recording.detections, sensor_status def on_analyze_audio_job_success( - job: Job, connection: Redis, result: list[BirdNetDetection], *args, **kwargs + job: Job, + connection: Redis, + result: tuple[list[BirdNetDetection], SensorStatus], + *args, + **kwargs ) -> None: # pylint: disable=unused-argument # type: ignore """Callback for analyze_audio job success.""" - detections, sensor_metadata = result + detections, sensor_status = result print("Processing result:", result) - # session.add_all(Detection.from_birdnet_detections(detections, sensor_metadata)) - # session.commit() + # update sensor status if exists, otherwise create new + session = next(get_db_session()) + curr_status = session.exec( + select(SensorStatus).where(SensorStatus.sensor_id == sensor_status.sensor_id) + ).one_or_none() + if curr_status: + curr_status.timestamp = sensor_status.timestamp + curr_status.lat = sensor_status.lat + curr_status.lon = sensor_status.lon + curr_status.accuracy = sensor_status.accuracy + curr_status.battery = sensor_status.battery + curr_status.temperature = sensor_status.temperature + else: + session.add(sensor_status) + session.commit() + + session.add_all(Detection.from_birdnet_detections(detections, sensor_status)) + session.commit() def submit_analyze_audio_job( - session: Session, connection: Redis, file_path: FilePath, sensor_metadata: SensorMetadata + connection: Redis, file_path: FilePath, sensor_status: SensorStatus ) -> UUID: """Analyze audio file and return list of bird species.""" job = Queue(connection=connection).enqueue( analyze_audio, file_path=file_path, - sensor_metadata=sensor_metadata, + sensor_status=sensor_status, min_conf=get_settings().birdnet_min_confidence, on_success=Callback(on_analyze_audio_job_success), ) diff --git a/phone_sensors/cli.py b/phone_sensors/cli.py new file mode 100644 index 0000000..8f1594e --- /dev/null +++ b/phone_sensors/cli.py @@ -0,0 +1,29 @@ +"""Main entry point for the application.""" + +from multiprocessing import Process + +import rich +import uvicorn +from rq.worker_pool import WorkerPool +from sqlalchemy import create_engine + +from phone_sensors.api import app +from phone_sensors.db import init_db +from phone_sensors.settings import get_redis_connection, get_settings + + +def main(): + settings = get_settings() + rich.print(settings) + engine = create_engine(str(settings.postgres_dsn)) + init_db(engine) + api_process = Process( + target=uvicorn.run, args=(app,), kwargs={"host": settings.host, "port": settings.port} + ) + worker_pool_process = Process( + target=WorkerPool( + ["default", "high", "low"], num_workers=3, connection=next(get_redis_connection()) + ).start + ) + api_process.start() + worker_pool_process.start() diff --git a/phone_sensors/schemas.py b/phone_sensors/schemas.py index 3d5cdf5..be8e7c0 100644 --- a/phone_sensors/schemas.py +++ b/phone_sensors/schemas.py @@ -10,9 +10,10 @@ from sqlmodel import Column, Field, SQLModel -class SensorMetadata(BaseModel): +class SensorStatus(SQLModel, table=True): """Schema for the SensorMetadata format.""" + __tablename__ = "sensor_status" # type: ignore sensor_id: UUID timestamp: datetime.datetime lat: float @@ -41,8 +42,9 @@ class BirdNetDetection(BaseModel): class Detection(SQLModel, table=True): """Schema for the Prediction table.""" + __tablename__ = "detection" # type: ignore detection_id: int = Field(default=None, primary_key=True) - sensor_id: UUID = Field(foreign_key="sensor.sensor_id") + sensor_id: UUID = Field(foreign_key="sensor_status.sensor_id") timestamp: datetime.datetime duration: float lat: float @@ -58,7 +60,7 @@ class Detection(SQLModel, table=True): @classmethod def from_birdnet_detections( - cls, birdnet_detections: list[BirdNetDetection], sensor_metadata: SensorMetadata + cls, birdnet_detections: list[BirdNetDetection], sensor_metadata: SensorStatus ) -> list["Detection"]: """Create a list of Detection instances from BirdNet detections.""" detections = [] diff --git a/phone_sensors/settings.py b/phone_sensors/settings.py index feaaa85..9642ceb 100644 --- a/phone_sensors/settings.py +++ b/phone_sensors/settings.py @@ -11,8 +11,10 @@ class Settings(BaseSettings): """Settings for phone_sensors package.""" - redis_dsn: RedisDsn = Field("redis://0.0.0.0:6379/0") + redis_dsn: RedisDsn = Field("redis://redis:6379/0") postgres_dsn: PostgresDsn = Field("postgresql://postgres:phonesensors@db/sensor_data") + host: str = Field("0.0.0.0") + port: int = Field(8000) birdnet_min_confidence: float = Field(0.25) diff --git a/tests/test_birdnet.py b/tests/test_birdnet.py index 3ef7f1e..8d6e2bf 100644 --- a/tests/test_birdnet.py +++ b/tests/test_birdnet.py @@ -4,7 +4,7 @@ import rich -from phone_sensors.birdnet import SensorMetadata, analyze_audio +from phone_sensors.birdnet import SensorStatus, analyze_audio SPECIES_LIST = """ Accipiter cooperii_Cooper's Hawk @@ -61,7 +61,7 @@ def test_analyze_audio(): """Test analyze_audio function.""" file_path = Path("./example.wav") - sensor_metadata = SensorMetadata.model_validate( + sensor_metadata = SensorStatus.model_validate( { "sensor_id": "123e4567-e89b-12d3-a456-426614174000", "timestamp": "2021-01-01T12:00:00",