diff --git a/.gitignore b/.gitignore index 0af5316..5529439 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,41 @@ ipython_config.py # Remove previous ipynb_checkpoints # git rm -r .ipynb_checkpoints/ +dataset/ +pts/ +wandb/ +*.ipynb +*.png +*.pt +datapreprocess/code_prac.ipynb +datapreprocess/rt_code_test.ipynb +datapreprocess/csv/abnormal/train/*.csv +datapreprocess/csv/abnormal/val/*.csv +datapreprocess/csv/normal/train/*.csv +datapreprocess/csv/normal/val/*.csv +datapreprocess/json/abnormal/train/ +datapreprocess/json/abnormal/val/ +datapreprocess/json/abnormal/TS_03.이상행동_14.교통약자_train/ +datapreprocess/json/abnormal/TS_03.이상행동_14.교통약자_val/ +datapreprocess/json/normal/train/ +datapreprocess/json/normal/val/ +datapreprocess/*.csv +datapreprocess/*.pt +datapreprocess/*.pth +datapreprocess/npy +datapreprocess/nohup.out +model_train/code_prac.ipynb +model_train/model.h5 +model_train/pytorch_model.pth +model_train/nohup.out +model_train/*.ipynb +model_train/wandb +app/testitems/ +pths/ +app/models/pts +app/models/yolov8n-pose.pt +app/yolov8n-pose.pt +.netrc ### macOS ### # General @@ -271,4 +306,7 @@ $RECYCLE.BIN/ # Windows shortcuts *.lnk +<<<<<<< HEAD +# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode,macos,windows,jupyternotebooks,virtualenv,python +======= # End of https://www.toptal.com/developers/gitignore/api/visualstudiocode,macos,windows,jupyternotebooks,virtualenv,python \ No newline at end of file diff --git a/app/README.md b/app/README.md new file mode 100644 index 0000000..b6ec7d0 --- /dev/null +++ b/app/README.md @@ -0,0 +1,57 @@ +# Project Structure + +```bash +. +├── README.md +├── __init__.py +├── api +│ ├── __init__.py +│ ├── album_router.py +│ ├── real_time_router.py +│ ├── upload_router.py +│ └── user_router.py +├── database +│ ├── __init__.py +│ ├── crud.py +│ ├── database.py +│ ├── models.py +│ └── schemas.py +├── inference +│ ├── __init__.py +│ ├── anomaly_detector.py +│ └── rt_anomaly_detector.py +├── main.py +├── templates +│ ├── album_detail.html +│ ├── album_list.html +│ ├── base.html +│ ├── frame.html +│ ├── login.html +│ ├── main.html +│ ├── real_time.html +│ ├── signup.html +│ ├── src +│ │ ├── album_detail.js +│ │ ├── album_list.js +│ │ └── video.js +│ ├── stream.html +│ ├── upload.html +│ └── video.html +└── utils + ├── __init__.py + ├── config.py + ├── security.py + └── utils.py +``` + +# Description + +- api: URL별 로직 구현 + +- database: 데이터베이스 관련 설정 및 함수 + +- inference: 모델 추론 코드(녹화영상, 실시간) + +- templates: UI 템플릿. Bootstrap 사용 + +- utils: config 및 기타 함수 \ No newline at end of file diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/api/__init__.py b/app/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/api/album_router.py b/app/api/album_router.py new file mode 100644 index 0000000..b7175d9 --- /dev/null +++ b/app/api/album_router.py @@ -0,0 +1,162 @@ +from typing import Optional + +from fastapi import ( + APIRouter, + Response, + Request, + HTTPException, + Form, + UploadFile, + File, + Cookie, + Query, + Depends, +) +from fastapi.responses import RedirectResponse +from fastapi.templating import Jinja2Templates +from sqlalchemy.orm import Session + +from database import crud, models +from database.database import get_db +from utils.config import settings +from utils.security import get_current_user +from utils.utils import s3 + +templates = Jinja2Templates(directory="templates") + +router = APIRouter( + prefix="/album", +) + + +@router.get("") +async def upload_get(request: Request, db: Session = Depends(get_db)): + user = get_current_user(request) + if not user: + return RedirectResponse(url="/user/login") + + album_list = crud.get_uploads(db=db, user_id=user.user_id) + print(album_list[0].completes[0].completed) + return templates.TemplateResponse("album_list.html", {'request': request, 'token': user.email, 'album_list':album_list}) + + +@router.post("") +async def modify_name(request: Request, + check_code: str = Form(...), + upload_id: Optional[int] = Form(...), + origin_name: Optional[str] = Form(None), + new_name: Optional[str] = Form(None), + is_real_time: Optional[bool] = Form(None), + db: Session = Depends(get_db)): + user = get_current_user(request) + + if check_code == "edit": + upload_info = ( + db.query(models.Upload) + .filter((models.Upload.name == origin_name) & (models.Upload.upload_id == upload_id)) + .first() + ) + upload_info.name = new_name + + db.add(upload_info) + db.commit() + db.refresh(upload_info) + elif check_code == "delete": + upload_info = crud.get_upload(db, upload_id) + if upload_info: + db.delete(upload_info) + + db.commit() + album_list = crud.get_uploads(db=db, user_id=user.user_id) + + return templates.TemplateResponse("album_list.html", {'request': request, 'token': user.email, 'album_list': album_list}) + + +@router.get("/details") +async def upload_get_one(request: Request, + user_id: int = Query(...), + upload_id: int = Query(...), + db: Session = Depends(get_db)): + + user = get_current_user(request) + + video_info = { + "user_id": user_id, + "upload_id":upload_id, + "date": None, + "upload_name": None, + "is_realtime": None, + "video_id": None, + "video_url": None, + "frame_urls": None, + "score_url": None, + "complete": None + } + + + video = crud.get_video(db=db, upload_id=upload_id) + video_info["video_id"] = video.video_id + uploaded = crud.get_upload(db=db, upload_id=video.upload_id) + video_info["upload_name"] = uploaded.name + video_info["is_realtime"] = uploaded.is_realtime + video_info["date"] = uploaded.date.strftime('%Y-%m-%d %H:%M:%S') + + #frames = crud.get_frames(db=db, video_id=video.video_id) + frames = crud.get_frames_with_highest_score(db=db, video_id=video.video_id) + frame_ids = [frame.frame_id for frame in frames] + frame_urls = [frame.frame_url for frame in frames] + frame_timestamps = [frame.time_stamp for frame in frames] + frame_objs = [] + + video_obj = s3.generate_presigned_url('get_object', + Params={'Bucket': settings.BUCKET, + 'Key': video.video_url}, + ExpiresIn=3600) + + + video_info["video_url"] = video_obj + video_info["complete"] = crud.get_complete(db=db, upload_id=upload_id).completed + if not video_info["complete"]: + return templates.TemplateResponse("album_detail.html", {'request': request, 'token': user.email, 'video_info': video_info, 'loading': True}) + + if frame_ids != []: + for frame_id, frame_url, frame_timestamp in zip(frame_ids, frame_urls, frame_timestamps): + frame_obj = s3.generate_presigned_url('get_object', + Params={'Bucket': settings.BUCKET, + 'Key': frame_url}, + ExpiresIn=3600) + frame_objs.append((frame_id, frame_obj, frame_timestamp.strftime('%H:%M:%S'))) + + score_graph_url = '/'.join(frame_urls[0].split('/')[:-1]) + '/score_graph.png' + score_obj = s3.generate_presigned_url('get_object', + Params={'Bucket': settings.BUCKET, + 'Key': score_graph_url}, + ExpiresIn=3600) + + video_info["frame_urls"] = frame_objs + video_info["score_url"] = score_obj + + #print(video_info) + return templates.TemplateResponse("album_detail.html", {'request': request, 'token': user.email, 'video_info': video_info, 'loading': False}) + + +@router.get("/details/images") +async def image_get(request: Request, + frame_id: int = Query(...), + db: Session = Depends(get_db)): + + user = get_current_user(request) + frame = crud.get_frame(db=db, frame_id=frame_id) + frame_obj = s3.generate_presigned_url( + "get_object", Params={"Bucket": settings.BUCKET, "Key": frame.frame_url}, ExpiresIn=3600 + ) + print(frame_obj) + print(frame.box_kp_json) + frame_info = { + 'frame_url': frame_obj, + 'time_stamp': frame.time_stamp, + 'frame_json': frame.box_kp_json + } + + return templates.TemplateResponse("frame.html", {'request': request, 'token': user.email, 'frame_info': frame_info}) + \ No newline at end of file diff --git a/app/api/inference_router.py b/app/api/inference_router.py new file mode 100644 index 0000000..5a0f9be --- /dev/null +++ b/app/api/inference_router.py @@ -0,0 +1,152 @@ +from datetime import timedelta, datetime, date +from fastapi import FastAPI, Depends, BackgroundTasks, WebSocket, WebSocketDisconnect +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +from sqlalchemy.orm import Session +import os +import sys +import json + +import pytz +import asyncio +import cv2 +import numpy as np +from cap_from_youtube import cap_from_youtube + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(current_dir) + +sys.path.append(parent_dir) + +from database.database import get_db +from database import crud + +from utils.config import settings +from utils.utils import run_model, s3 + +from inference.rt_anomaly_detector_lstmae import RT_AnomalyDetector + +app = FastAPI() + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +class ModelInfo(BaseModel): + user_id: int + upload_id: int + threshold: float + video_uuid_name: str + video_ext: str + video_id: int + video_url: str + +@app.get("/") +def root(): + return {"message": "모델이 돌아가용"} + +# 녹화영상용 +@app.post("/run_model") +async def run_model_endpoint(info: ModelInfo, + background_tasks: BackgroundTasks, + db: Session = Depends(get_db)): + + info = info.dict() + + def run_model_task(): + run_model(info["video_url"], info, settings, db) + + background_tasks.add_task(run_model_task) + + return {"message": "Model execution started."} + + +# 메일을 보내야하는지 판단하는 함수 +async def check_and_send_email(db, video_id, user_id, last_point, smtp): + global last_emailed_time + + frames = crud.get_frames_with_highest_score(db=db, video_id=video_id) + frame_timestamps = [frame.time_stamp.strftime('%H:%M:%S') for frame in frames] + + if len(frame_timestamps) < 6: + return False + + last = datetime.strptime(frame_timestamps[-2], '%H:%M:%S') + check = datetime.strptime(frame_timestamps[-6], '%H:%M:%S') + + if (last - check) == timedelta(seconds=4): # 연속적으로 5초간 지속되면 + if not check <= last_point <= last: + crud.send_email(db, frame_timestamps[-6], frame_timestamps[-2], user_id, smtp) + last_emailed_time = last + +# 과연 웹 서버와 실시간을 분리하는 것이 더 빠른가? +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket, + db: Session = Depends(get_db)): + + await websocket.accept() + smtp = await crud.create_smtp_server() + + try: + video_info_str = await websocket.receive_text() + print("Received video info:", video_info_str) + video_info = json.loads(video_info_str) + global detector, last_emailed_time + if detector is None: + detector = RT_AnomalyDetector(video_info, s3, settings, db, websocket) + detector.ready() + + if video_info["video_url"] == "web": + while True: + timestamp = datetime.now(pytz.timezone('Asia/Seoul')) + # Receive bytes from the websocket + bytes = await websocket.receive_bytes() + data = np.frombuffer(bytes, dtype=np.uint8) + frame = cv2.imdecode(data, cv2.IMREAD_COLOR) + await detector.run(frame, timestamp) + await check_and_send_email(db=db, video_id=video_info['video_id'], user_id=video_info['user_id'], + last_point=last_emailed_time, smtp=smtp) + + else: + if "youtube" in video_info["video_url"]: + cap = cap_from_youtube(video_info["video_url"], '240p') + + else: + cap = cv2.VideoCapture(video_info["video_url"]) + + while True: + success, frame = cap.read() + if not success: + await websocket.send_text(f'카메라 연결에 실패했습니다.') + break + else: + timestamp = datetime.now(pytz.timezone('Asia/Seoul')) + await detector.run(frame, timestamp) + await check_and_send_email(db=db, video_id=video_info['video_id'], user_id=video_info['user_id'], + last_point=last_emailed_time, smtp=smtp) + + ret, buffer = cv2.imencode('.jpg', frame) + await websocket.send_bytes(buffer.tobytes()) + + await asyncio.sleep(0.042) + + except WebSocketDisconnect: + await websocket.close() + await smtp.quit() + + except Exception as e: + # 예외 발생 시 로그 기록 및 연결 종료 + print(f"WebSocket error: {e}") + await websocket.close() + await smtp.quit() + + finally: + try: + detector.upload_score_graph_s3() + except: + pass + detector = None \ No newline at end of file diff --git a/app/api/real_time_router.py b/app/api/real_time_router.py new file mode 100644 index 0000000..4e81b9a --- /dev/null +++ b/app/api/real_time_router.py @@ -0,0 +1,249 @@ +from datetime import timedelta, datetime, date +import pytz +import asyncio +import cv2 +import numpy as np +import json + +from fastapi import APIRouter, Request, Form, Query, Depends, WebSocket, WebSocketDisconnect, status +from fastapi.responses import RedirectResponse +from fastapi.templating import Jinja2Templates +from sqlalchemy.orm import Session +import websockets +from websockets.exceptions import ConnectionClosed + + +from database import schemas, crud +from database.database import get_db +from utils.config import settings +from utils.security import get_current_user +from utils.utils import s3 +#from inference.rt_anomaly_detector import RT_AnomalyDetector +from inference.rt_anomaly_detector_lstmae import RT_AnomalyDetector + +from cap_from_youtube import cap_from_youtube + +templates = Jinja2Templates(directory="templates") +router = APIRouter( + prefix="/real_time", +) + +detector = None +last_emailed_time = datetime.strptime("0:00:00", '%H:%M:%S') + +@router.get("") +async def real_time_get(request: Request): + user = get_current_user(request) + if not user: + return RedirectResponse(url='/user/login') + + return templates.TemplateResponse("real_time.html", {'request': request, "token": user.email}) + + +@router.post("") +async def realtime_post(request: Request, + name: str = Form(...), + real_time_video: str = Form(...), + datetime: datetime = Form(...), + thr: float = Form(...), + db: Session = Depends(get_db)): + + user = get_current_user(request) + user = crud.get_user_by_email(db=db, email=user.email) + + # Form 과 user_id 를 이용하여 upload row insert + _upload_create = schemas.UploadCreate(name=name, date=datetime, is_realtime=True, thr=thr, user_id=user.user_id) + crud.create_upload(db=db, upload=_upload_create) + + # 지금 업로드된 id 획득, 클라이언트로부터 작성된 실시간 스트리밍 영상 url 획득 + uploaded = crud.get_upload_id(db=db, user_id=user.user_id, name=name, date=datetime)[-1] + + # db 에는 실시간임을 알 수 있게만 함 + video_url = f"{real_time_video}" + _video_create = schemas.VideoCreate(video_url=video_url, upload_id=uploaded.upload_id) + crud.create_video(db=db, video=_video_create) + _complete_create = schemas.Complete(completed=True, upload_id=uploaded.upload_id) + crud.create_complete(db=db, complete=_complete_create) + + # model inference 에서 사용할 정보 + info = { + "user_id": user.user_id, + "email": user.email, + "upload_id": uploaded.upload_id, + "name": uploaded.name, + "date": uploaded.date, + "threshold": uploaded.thr, + "video_url": video_url, + "video_id": crud.get_video(db=db, upload_id=uploaded.upload_id).video_id + } + + redirect_url = f"/real_time/stream?user_id={info['user_id']}&upload_id={info['upload_id']}" + + return RedirectResponse(url=redirect_url, status_code=status.HTTP_303_SEE_OTHER) + + +@router.get("/stream") +async def get_stream(request: Request, + user_id: int = Query(...), + upload_id: int = Query(...), + db: Session = Depends(get_db)): + + user = get_current_user(request) + + video = crud.get_video(db=db, upload_id=upload_id) + uploaded = crud.get_upload(db=db, upload_id=video.upload_id) + + video_info = { + "user_id": user_id, + "upload_id": upload_id, + "date": uploaded.date.strftime('%Y-%m-%d %H:%M:%S'), + "upload_name": uploaded.name, + "thr": uploaded.thr, + "video_id": video.video_id, + "video_url": video.video_url, + "is_realtime": True, + "model_server_ip": settings.STREAM_MODEL_SERVER_IP + } + + # video_info = json.dumps(video_info) + + return templates.TemplateResponse("stream.html", {'request': request, 'token': user.email, 'video_info': video_info}) + +# 메일을 보내야하는지 판단하는 함수 +async def check_and_send_email(db, video_id, user_id, last_point, smtp): + global last_emailed_time + + frames = crud.get_frames_with_highest_score(db=db, video_id=video_id) + frame_timestamps = [frame.time_stamp.strftime('%H:%M:%S') for frame in frames] + + if len(frame_timestamps) < 6: + return False + + last = datetime.strptime(frame_timestamps[-2], '%H:%M:%S') + check = datetime.strptime(frame_timestamps[-6], '%H:%M:%S') + + if (last - check) == timedelta(seconds=4): # 연속적으로 5초간 지속되면 + if not check <= last_point <= last: + crud.send_email(db, frame_timestamps[-6], frame_timestamps[-2], user_id, smtp) + last_emailed_time = last + + +@router.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket, + db: Session = Depends(get_db)): + await websocket.accept() + smtp = await crud.create_smtp_server() + + try: + video_info_str = await websocket.receive_text() + print("Received video info:", video_info_str) + video_info = json.loads(video_info_str) + global detector, last_emailed_time + if detector is None: + detector = RT_AnomalyDetector(video_info, s3, settings, db, websocket) + detector.ready() + + if video_info["video_url"] == "web": + while True: + timestamp = datetime.now(pytz.timezone('Asia/Seoul')) + # Receive bytes from the websocket + bytes = await websocket.receive_bytes() + data = np.frombuffer(bytes, dtype=np.uint8) + frame = cv2.imdecode(data, cv2.IMREAD_COLOR) + await detector.run(frame, timestamp) + await check_and_send_email(db=db, video_id=video_info['video_id'], user_id=video_info['user_id'], + last_point=last_emailed_time, smtp=smtp) + + else: + if "youtube" in video_info["video_url"]: + cap = cap_from_youtube(video_info["video_url"], '240p') + + else: + cap = cv2.VideoCapture(video_info["video_url"]) + + while True: + success, frame = cap.read() + if not success: + await websocket.send_text(f'카메라 연결에 실패했습니다.') + break + else: + timestamp = datetime.now(pytz.timezone('Asia/Seoul')) + await detector.run(frame, timestamp) + await check_and_send_email(db=db, video_id=video_info['video_id'], user_id=video_info['user_id'], + last_point=last_emailed_time, smtp=smtp) + + ret, buffer = cv2.imencode('.jpg', frame) + await websocket.send_bytes(buffer.tobytes()) + + await asyncio.sleep(0.042) + + except WebSocketDisconnect: + await websocket.close() + await smtp.quit() + + except Exception as e: + # 예외 발생 시 로그 기록 및 연결 종료 + print(f"WebSocket error: {e}") + await websocket.close() + await smtp.quit() + + finally: + try: + detector.upload_score_graph_s3() + except: + pass + detector = None + + +@router.get("/stream") +async def get_stream(request: Request, + user_id: int = Query(...), + upload_id: int = Query(...), + db: Session = Depends(get_db)): + + user = get_current_user(request) + + video = crud.get_video(db=db, upload_id=upload_id) + uploaded = crud.get_upload(db=db, upload_id=video.upload_id) + + video_info = { + "user_id": user_id, + "upload_id": upload_id, + "date": uploaded.date.strftime('%Y-%m-%d %H:%M:%S'), + "upload_name": uploaded.name, + "thr": uploaded.thr, + "video_id": video.video_id, + "video_url": video.video_url, + "is_realtime": True + } + + # video_info = json.dumps(video_info) + + return templates.TemplateResponse("stream.html", {'request': request, 'token': user.email, 'video_info': video_info}) + + +# db 에서 실시간에서 저장되는 frame url 불러오는 코드 +def fetch_data(db, upload_id): + + video = crud.get_video(db=db, upload_id=upload_id) + frames = crud.get_frames_with_highest_score(db=db, video_id=video.video_id) + frame_ids = [frame.frame_id for frame in frames] + frame_urls = [frame.frame_url for frame in frames] + frame_timestamps = [frame.time_stamp for frame in frames] + frame_objs = [] + + for frame_id, frame_url, frame_timestamp in zip(frame_ids, frame_urls, frame_timestamps): + frame_obj = s3.generate_presigned_url('get_object', + Params={'Bucket': settings.BUCKET, + 'Key': frame_url}, + ExpiresIn=3600) + frame_objs.append((frame_id, frame_obj, frame_timestamp.strftime('%H:%M:%S'))) + + return {"frame_urls": frame_objs} + +@router.get("/fetch_data") +async def fetch_frame_data(upload_id: int = Query(...), + db: Session = Depends(get_db)): + frame_data = fetch_data(db, upload_id) + return frame_data + diff --git a/app/api/upload_router.py b/app/api/upload_router.py new file mode 100644 index 0000000..63d75a1 --- /dev/null +++ b/app/api/upload_router.py @@ -0,0 +1,116 @@ +from datetime import datetime +import os +import uuid +import requests + +from fastapi import APIRouter, Response, Request, Form, UploadFile, File, Depends, BackgroundTasks, status, HTTPException +from fastapi.responses import RedirectResponse +from fastapi.templating import Jinja2Templates +from sqlalchemy.orm import Session + +from utils.config import settings +from database import crud +from database import schemas +from database.database import get_db +from utils.security import get_current_user +from utils.utils import s3 + +templates = Jinja2Templates(directory="templates") + +router = APIRouter( + prefix="/upload", +) + +@router.get("") +async def upload_get(request: Request): + user = get_current_user(request) + err_msg = {"file_ext": None} + if not user: + return RedirectResponse(url='/user/login') + + return templates.TemplateResponse("upload.html", {'request': request, 'token': user.email, "err": err_msg}) + +@router.post("") +async def upload_post(request: Request, + name: str = Form(...), + upload_file: UploadFile = File(...), + datetime: datetime = Form(...), + thr: float = Form(...), + db: Session = Depends(get_db)): + + user = get_current_user(request) + err_msg = {"file_ext": None} + + if not user: + return RedirectResponse(url='/user/login') + + file_ext = os.path.splitext(upload_file.filename)[-1] + if file_ext != ".mp4": + err_msg["file_ext"] = "파일 형식이 다릅니다.(mp4만 지원 가능)" + return templates.TemplateResponse("upload.html", {'request': request, 'token': user.email, "err": err_msg}) + + _upload_create = schemas.UploadCreate(name=name, date=datetime, is_realtime=False, thr=thr, user_id=user.user_id) + crud.create_upload(db=db, upload=_upload_create) + + uploaded = crud.get_upload_id(db=db, user_id=user.user_id, name=name, date=datetime)[-1] + + video_name = uuid.uuid1() + + # model inference 에서 s3 에 올릴 주소 그대로 db 에 insert + video_url = f"video/{user.user_id}/{uploaded.upload_id}/{video_name}{file_ext}" + _video_create = schemas.VideoCreate(video_url=video_url, upload_id=uploaded.upload_id) + crud.create_video(db=db, video=_video_create) + _complete_create = schemas.Complete(completed=False, upload_id=uploaded.upload_id) + crud.create_complete(db=db, complete=_complete_create) + + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="video 를 s3 저장소 업로드에 실패했습니다." + ) + + try: + s3.upload_fileobj( + upload_file.file, + settings.BUCKET, + video_url, + ExtraArgs={'ContentType': 'video/mp4'} + ) + except: + raise s3_upload_exception + + info = { + "user_id": user.user_id, + "email": user.email, + "upload_id": uploaded.upload_id, + "name": name, + "date": datetime, + "threshold": uploaded.thr, + "video_name": upload_file.filename, + "video_uuid_name": video_name, + "video_ext": file_ext, + "video_id": crud.get_video(db=db, upload_id=uploaded.upload_id).video_id, + "video_url": video_url, + } + + model_data = { + "user_id": user.user_id, + "upload_id": uploaded.upload_id, + "threshold": uploaded.thr, + "video_uuid_name": str(video_name), + "video_ext": file_ext, + "video_id": crud.get_video(db=db, upload_id=uploaded.upload_id).video_id, + "video_url": video_url, + } + + model_server_url = settings.UPLOAD_MODEL_SERVER_IP + try: + response = requests.post(model_server_url, json=model_data) + response.raise_for_status() # 응답 상태 코드가 200이 아닌 경우 예외 발생 + print("Model execution started successfully.") + except requests.RequestException: + e = "모델 서버에서 오류가 발생했습니다." + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=e) + + redirect_url = f"/album/details?user_id={info['user_id']}&upload_id={info['upload_id']}" + + return RedirectResponse(url=redirect_url, status_code=status.HTTP_303_SEE_OTHER) diff --git a/app/api/user_router.py b/app/api/user_router.py new file mode 100644 index 0000000..e20952d --- /dev/null +++ b/app/api/user_router.py @@ -0,0 +1,82 @@ +from fastapi import APIRouter, Request, Depends +from fastapi.responses import RedirectResponse +from fastapi.templating import Jinja2Templates +from sqlalchemy.orm import Session + +from database import models, crud +from database.database import get_db +from utils.security import pwd_context + +templates = Jinja2Templates(directory="templates") +router = APIRouter(prefix="/user") + +@router.get("/signup") +async def signup_get(request: Request, + db: Session=Depends(get_db)): + err_msg = {"user": None, "pw": None, "check_pw": None} + return templates.TemplateResponse("signup.html", {"request": request, "err": err_msg}) + +@router.post("/signup") +async def signup_post(request: Request, + db: Session=Depends(get_db)): + body = await request.form() + user, pw, check_pw = body["email"], body["pw"], body["check_pw"] + err_msg = {"user": None, "pw": None, "check_pw": None} + + if not user: + err_msg["user"] = "empty email" + elif not pw: + err_msg["pw"] = "empty password" + elif pw != check_pw: + err_msg["check_pw"] = "not equal password and check_password" + else: + user = db.query(models.User).filter(models.User.email == body['email']).first() + + if user: + err_msg["user"] = "invalid email" + else: + user_info = models.User(email = body['email'], + password = body['pw']) + + crud.create_user(db, user_info) + return RedirectResponse(url="/user/login") + + return templates.TemplateResponse("signup.html", {"request": request, "err": err_msg}) + +@router.get("/login") +async def login_get(request: Request): + err_msg = {"user": None, "pw": None} + return templates.TemplateResponse("login.html", {"request": request, "err": err_msg}) + +@router.post("/login") +async def login_post(request: Request, + db: Session=Depends(get_db)): + body = await request.form() + user, pw= body["email"], body["pw"] + err_msg = {"user": None, "pw": None} + + if body.get("check_pw", None): + return templates.TemplateResponse("login.html", {"request": request, "err": err_msg}) + + if not user: + err_msg["user"] = "empty email" + elif not pw: + err_msg["pw"] = "empty password" + else: + user = db.query(models.User).filter(models.User.email == body['email']).first() + if not user: + err_msg["user"] = "invalid email" + elif not pwd_context.verify(body['pw'], user.password): + err_msg["pw"] = "invalid password" + else: + return RedirectResponse(url="/") + + return templates.TemplateResponse("login.html", {"request": request, "err": err_msg}) + +@router.get("/logout") +async def logout_get(request: Request): + access_token = request.cookies.get("access_token", None) + template = RedirectResponse(url="/") + if access_token: + template.delete_cookie(key="access_token") + return template diff --git a/app/database/__init__.py b/app/database/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/database/crud.py b/app/database/crud.py new file mode 100644 index 0000000..1144e0c --- /dev/null +++ b/app/database/crud.py @@ -0,0 +1,210 @@ +from datetime import timedelta, datetime, date + +from sqlalchemy.orm import Session, aliased +from sqlalchemy import func +import smtplib +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +# from email.mime.image import MIMEImage +# from passlib.context import CryptContext + +from database.schemas import UserBase, UserCreate, UploadCreate, VideoCreate, FrameCreate, Complete +from database import models +from utils.security import get_password_hash, verify_password +from utils.config import settings + +## User +def create_user(db: Session, user: UserCreate): + hashed_password = get_password_hash(user.password) + db_user = models.User(email=user.email, password=hashed_password) + db.add(db_user) + db.commit() + db.refresh(db_user) + return db_user + + +def get_user(db: Session, user_id: int): + return db.query(models.User).filter(models.User.user_id == user_id).first() + + +def get_user_by_email(db: Session, email: str): + return db.query(models.User).filter(models.User.email == email).first() + + +def get_existing_user(db: Session, user_create: UserCreate): + return db.query(models.User).filter((models.User.email == user_create.email)).first() + + +def authenticate(db: Session, *, email: str, password: str): + user = get_user_by_email(db, email=email) + if not user: + return None + if not verify_password(password, user.password): + return None + return user + +def is_active(user: UserBase) -> bool: + return user.is_active + + +## Upload +def create_upload(db: Session, upload: UploadCreate): + db_upload = models.Upload(**upload.dict()) + db.add(db_upload) + db.commit() + db.refresh(db_upload) + return db_upload + + +def delete_upload(db: Session, upload_id: int): + db_upload = db.query(models.Upload).filter(models.Upload.upload_id == upload_id).first() + + if db_upload: + db.delete(db_upload) + db.commit() + return True + return False + + +def get_upload(db: Session, upload_id: int): + return db.query(models.Upload).filter(models.Upload.upload_id == upload_id).first() + + +def get_upload_id( + db: Session, + user_id: int, + name: str, + date: datetime, +): + return ( + db.query(models.Upload) + .filter( + (models.Upload.user_id == user_id) & (models.Upload.name == name) & (models.Upload.date == date) + ) + .all() + ) + + +def get_uploads(db: Session, user_id: int): + return ( + db.query(models.Upload) + .filter(models.Upload.user_id == user_id) + .order_by(models.Upload.upload_id.desc()) + .all() + ) + + +def get_upload_by_name(db: Session, name: str): + return db.query(models.Upload).filter(models.Upload.name == name).first() + + +## Video +def create_video(db: Session, video: VideoCreate): + db_video = models.Video(**video.dict()) + db.add(db_video) + db.commit() + db.refresh(db_video) + return db_video + + +def get_video(db: Session, upload_id: int): + return db.query(models.Video).filter(models.Video.upload_id == upload_id).first() + + +## Frame +def create_frame(db: Session, frame: FrameCreate): + db_frame = models.Frame(**frame.dict()) + db.add(db_frame) + db.commit() + db.refresh(db_frame) + return db_frame + + +def get_frame(db: Session, frame_id: int): + return db.query(models.Frame).filter(models.Frame.frame_id == frame_id).first() + + +def get_frames(db: Session, video_id: int): + return db.query(models.Frame).filter(models.Frame.video_id == video_id).all() + + +def get_frames_with_highest_score(db: Session, video_id: int): + + subquery = ( + db.query( + models.Frame.video_id, models.Frame.time_stamp, func.max(models.Frame.score).label("max_score") + ) + .group_by(models.Frame.video_id, models.Frame.time_stamp) + .subquery() + ) + + subq_alias = aliased(subquery) + + frames = ( + db.query(models.Frame) + .join( + subq_alias, + (models.Frame.video_id == subq_alias.c.video_id) + & (models.Frame.time_stamp == subq_alias.c.time_stamp) + & (models.Frame.score == subq_alias.c.max_score), + ) + .filter(models.Frame.video_id == video_id) + .all() + ) + + return frames + + +def create_complete(db: Session, complete: Complete): + db_complete = models.Complete(**complete.dict()) + db.add(db_complete) + db.commit() + db.refresh(db_complete) + return db_complete + + +def get_complete(db: Session, upload_id: int): + return db.query(models.Complete).filter(models.Complete.upload_id == upload_id).first() + + +def update_complete_status(db: Session, upload_id: int): + + complete_record = db.query(models.Complete).filter(models.Complete.upload_id == upload_id).first() + + if complete_record and not complete_record.completed: + complete_record.completed = True + db.commit() + + +# email feat +async def create_smtp_server(): + + smtp = smtplib.SMTP_SSL(settings.SMTP_ADDRESS, settings.SMTP_PORT) # smtp 서버와 연결 + smtp.login(settings.MAIL_ACCOUNT, settings.MAIL_PASSWORD) # 프로젝트 계정으로 로그인 + + return smtp + + +def send_email(db, check, last, user_id, smtp): + user = get_user(db, user_id) + + # 메일 기본 정보 설정 + msg = MIMEMultipart() + msg["subject"] = f"[IVT] 이상행동 분석 중 결과 전달 메일입니다." + msg["from"] = settings.MAIL_ACCOUNT + msg["To"] = user.email + + # 메일 본문 내용 + content = f"""안녕하세요. Naver AI Tech 6기 '혁신비전테크' 팀 입니다. + +실시간 이상행동 탐지 중 이상행동이 발견되어 해당 시간대를 전달 드립니다. + +{check} ~ {last} 시간을 확인해주세요. +""" + + content_part = MIMEText(content, "plain") + msg.attach(content_part) + + smtp.sendmail(settings.MAIL_ACCOUNT, user.email, msg.as_string()) + + # 탐지 이미지 첨부 (향후 업데이트 예정) diff --git a/app/database/database.py b/app/database/database.py new file mode 100644 index 0000000..758080c --- /dev/null +++ b/app/database/database.py @@ -0,0 +1,26 @@ +from sqlalchemy import create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker + +from utils.config import settings + +SQLALCHEMY_DATABASE_URL = "mysql+pymysql://{}:{}@{}:{}/{}".format( + settings.MYSQL_SERVER_USER, + settings.MYSQL_SERVER_PASSWORD, + settings.MYSQL_SERVER_IP, + settings.MYSQL_SERVER_PORT, + settings.MYSQL_DATABASE +) + +engine = create_engine(SQLALCHEMY_DATABASE_URL, echo=True) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +Base = declarative_base() + +# Dependency +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() \ No newline at end of file diff --git a/app/database/models.py b/app/database/models.py new file mode 100644 index 0000000..017e099 --- /dev/null +++ b/app/database/models.py @@ -0,0 +1,63 @@ +from sqlalchemy import Column, Integer, String, DateTime, Boolean, Float, ForeignKey, Time, JSON +from sqlalchemy.orm import relationship + +from database.database import Base + +class User(Base): + __tablename__ = "user" + + user_id = Column(Integer, primary_key=True, index=True, autoincrement=True) + email = Column(String(50), unique=True, nullable=False) + password = Column(String(200), nullable=False) + is_active = Column(Boolean, default=True) + + uploads = relationship("Upload", back_populates="user") + + +class Upload(Base): + __tablename__ = "upload" + + upload_id = Column(Integer, primary_key=True, index=True, autoincrement=True) + name = Column(String(50), nullable=False) + date = Column(DateTime, nullable=False) + is_realtime = Column(Boolean, default=False) + thr = Column(Float, nullable=False) + user_id = Column(Integer, ForeignKey("user.user_id"), nullable=False) + + user = relationship("User", back_populates="uploads") + videos = relationship("Video", back_populates="upload", cascade="all, delete-orphan") + completes = relationship("Complete", back_populates="upload", cascade="all, delete-orphan") + + +class Video(Base): + __tablename__ = "video" + + video_id = Column(Integer, primary_key=True, index=True, autoincrement=True) + video_url = Column(String(255), nullable=False) + upload_id = Column(Integer, ForeignKey("upload.upload_id"), nullable=False) + + upload = relationship("Upload", back_populates="videos") + frames = relationship("Frame", back_populates="video", cascade="all, delete-orphan") + + +class Frame(Base): + __tablename__ = "frame" + + frame_id = Column(Integer, primary_key=True, index=True, autoincrement=True) + frame_url = Column(String(255), nullable=False) + time_stamp = Column(Time, nullable=False) + box_kp_json = Column(JSON, nullable=False) + score = Column(Float, nullable=False) + video_id = Column(Integer, ForeignKey("video.video_id"), nullable=False) + + video = relationship("Video", back_populates="frames") + + +class Complete(Base): + __tablename__ = "complete" + + complete_id = Column(Integer, primary_key=True, index=True, autoincrement=True) + completed = Column(Boolean, default=False) + upload_id = Column(Integer, ForeignKey("upload.upload_id"), nullable=False) + + upload = relationship("Upload", back_populates="completes") \ No newline at end of file diff --git a/app/database/schemas.py b/app/database/schemas.py new file mode 100644 index 0000000..23d6e2a --- /dev/null +++ b/app/database/schemas.py @@ -0,0 +1,56 @@ +from typing import List, Optional, Dict +from datetime import datetime, time + +from pydantic import BaseModel, field_validator, EmailStr +from pydantic_core.core_schema import FieldValidationInfo + + +class UserBase(BaseModel): + email: EmailStr + is_active: Optional[bool] = True + +class UserCreate(UserBase): + password: str + +# Upload post 스키마 +class UploadCreate(BaseModel): + name: str + date: datetime + is_realtime: Optional[bool] = None + thr: float + user_id: int + + +# Video post 스키마 +class VideoCreate(BaseModel): + video_url: str + upload_id: int + + +class Video(VideoCreate): + video_id: int + frames: List["Frame"] = [] + + class Config: + orm_mode = True + + +# Frame Post 스키마 +class FrameCreate(BaseModel): + frame_url: str + time_stamp: time + box_kp_json: Dict + score: float + video_id: int + + +class Frame(FrameCreate): + frame_id: int + + class Config: + orm_mode = True + + +class Complete(BaseModel): + completed: bool + upload_id: int diff --git a/app/inference/__init__.py b/app/inference/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/inference/anomaly_detector.py b/app/inference/anomaly_detector.py new file mode 100644 index 0000000..48b8684 --- /dev/null +++ b/app/inference/anomaly_detector.py @@ -0,0 +1,419 @@ +from collections import defaultdict +import cv2 +import numpy as np +import pandas as pd +from ultralytics import YOLO +import torch +from sklearn.preprocessing import MinMaxScaler + +from fastapi import HTTPException +from starlette import status +from database import crud +from database import schemas + +import os +import uuid +import json +from datetime import datetime, time +import matplotlib.pyplot as plt +from io import BytesIO + +import torch +import torch.nn as nn + +import albumentations as A + +import sys + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(os.path.dirname(current_dir)) + +sys.path.append(os.path.join(parent_dir, "model")) +import vmae + +# @@ timm은 0.4.12 버전 사용 필수 +from timm.models import create_model + +from copy import deepcopy + +class AnomalyDetector: + def __init__(self, video_file, info, s3_client, settings, db): + self.video = s3_client.generate_presigned_url( + ClientMethod="get_object", Params={"Bucket": settings.BUCKET, "Key": video_file}, ExpiresIn=3600 + ) + # print(self.video) + self.info = info + self.s3 = s3_client + self.settings = settings + self.thr = info["threshold"] + self.video_url = ( + f"video/{info['user_id']}/{info['upload_id']}/{info['video_uuid_name']}{info['video_ext']}" + ) + self.frame_url_base = f"frame/{info['user_id']}/{info['upload_id']}/" + self.db = db + + def display_text(self, frame, text, position): + font = cv2.FONT_HERSHEY_SIMPLEX + font_scale = 1 + font_color = (0, 255, 0) # Green color + font_thickness = 2 + cv2.putText(frame, text, position, font, font_scale, font_color, font_thickness, cv2.LINE_AA) + + def upload_frame_db(self, db, temp_for_db, frame_url): + + temp_json_path = "./temp.json" + + with open(temp_json_path, "w") as f: + json.dump(temp_for_db, f) + + with open(temp_json_path, "r") as f: + box_kp_json = json.load(f) + + _frame_create = schemas.FrameCreate( + frame_url=frame_url, + time_stamp=temp_for_db["timestamp"], + box_kp_json=box_kp_json, + score=temp_for_db["score"], + video_id=self.info["video_id"], + ) + + crud.create_frame(db=db, frame=_frame_create) + + os.remove(temp_json_path) + + def upload_frame_s3(self, s3, frame): + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="Frame 을 s3 저장소 업로드에 실패했습니다." + ) + frame_name = uuid.uuid1() + frame_url = self.frame_url_base + f"{frame_name}" + ".png" + # print(frame_url) + + try: + s3.upload_fileobj( + BytesIO(cv2.imencode(".png", frame)[1].tobytes()), + self.settings.BUCKET, + frame_url, + ExtraArgs={"ContentType": "image/png"}, + ) + except Exception as e: + # print(e) + raise s3_upload_exception + + return frame_url + + def upload_video_s3(self, s3, video): + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="video 를 s3 저장소 업로드에 실패했습니다." + ) + video_url = self.video_url + + video_change_codec = "./temp_video_path_change_codec.mp4" + + os.system('ffmpeg -i "%s" -vcodec libx264 "%s"' % (video, video_change_codec)) + + try: + with open(video_change_codec, "rb") as video_file: + s3.upload_fileobj( + video_file, self.settings.BUCKET, video_url, ExtraArgs={"ContentType": "video/mp4"} + ) + except Exception as e: + # print(e) + raise s3_upload_exception + + os.remove(video_change_codec) + + def upload_score_graph_s3(self, s3, scores): + plt.plot(scores, color="red") + plt.title("Anomaly Scores Over Time") + plt.xlabel(" ") + plt.ylabel(" ") + + plt.xticks([]) + plt.yticks([]) + + save_path = "./model_scores_plot.png" + plt.savefig(save_path) + + with open(save_path, "rb") as image_file: + graph = image_file.read() + + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="score Graph 를 s3 저장소 업로드에 실패했습니다." + ) + score_graph_name = "score_graph.png" + score_graph_url = self.frame_url_base + score_graph_name + + try: + s3.upload_fileobj( + BytesIO(graph), self.settings.BUCKET, score_graph_url, ExtraArgs={"ContentType": "image/png"} + ) + except: + raise s3_upload_exception + + os.remove(save_path) + + def run(self): + # YOLO + tracker_model = YOLO("yolov8n-pose.pt") + + # VMAE v2 + backbone = model = create_model( + "vit_small_patch16_224", + img_size=224, + pretrained=False, + num_classes=710, + all_frames=16, + ) + + load_dict = torch.load( + "/data/ephemeral/home/level2-3-cv-finalproject-cv-06/model/pts/vit_s_k710_dl_from_giant.pth" + ) + + backbone.load_state_dict(load_dict["module"]) + + tf = A.Resize(224, 224) + + # Define sequence_length, prediction_time, and n_features + # sequence_length = 20 + # prediction_time = 1 + # n_features = 38 + + # LSTM autoencoder + checkpoint = torch.load( + "/data/ephemeral/home/level2-3-cv-finalproject-cv-06/model/pts/MIL_20240325_202019_best_auc.pth" + ) + classifier = vmae.MILClassifier(input_dim=710, drop_p=0.3) + classifier.load_state_dict(checkpoint["model_state_dict"]) + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + tracker_model.to(device) + backbone.to(device) + backbone.eval() + classifier.to(device) + classifier.eval() + + # Define the standard frame size + standard_width = 640 + standard_height = 480 + + # Open the video file + cap = cv2.VideoCapture(self.video) + temp_name = None + if not cap.isOpened(): + temp_name = f"{uuid.uuid4()}.mp4" + self.s3.download_file(self.settings.BUCKET, self.video_url, temp_name) + cap = cv2.VideoCapture(temp_name) + fps = cap.get(cv2.CAP_PROP_FPS) + + frame_inteval = fps // 3 + + # Store the track history + track_history = defaultdict(lambda: []) + + # Initialize a dictionary to store separate buffers for each ID + # id_buffers = defaultdict(lambda: []) + + # HTML -> H.264 codec + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + + # video writer -> ID 별 score, bbox 가 나온 영상을 s3 에 업로드 + output_video_path = "./temp_video_path.mp4" + output_video = cv2.VideoWriter(output_video_path, fourcc, fps, (standard_width, standard_height)) + + # Define a function to calculate MSE between two sequences + def calculate_mse(seq1, seq2): + return np.mean(np.power(seq1 - seq2, 2)) + + # anomaly threshold (default 0.02) + # threshold = self.thr + threshold = 0.3 + + # Loop through the video frames + frame_count = 0 + # net_mse = 0 + # avg_mse = 0 + # score graph 를 위한 score list + scores = [] + # vmae에 입력할 frame들을 저장할 list + v_frames = [] + + # 영상 frame 임시 저장 list + frame_list = [] + # yolo results 임시 저장 list + results_list = [] + # temp_for_db 임시 저장 list + tfdb_list = [] + + while cap.isOpened(): + # Read a frame from the video + success, frame = cap.read() + frame_count += 1 # Increment frame count + + if success: + frame = cv2.resize(frame, (standard_width, standard_height)) + + temp_for_db = {"timestamp": None, "bbox": {}, "keypoints": {}, "score": None} + + # track id (사람) 별로 mse 점수가 나오기 때문에 한 frame 에 여러 mse 점수가 나옴. 이를 frame 별 점수로 구하기 위해서 변수 설정 + # mse_unit = 0 + + s_timestamp = round(cap.get(cv2.CAP_PROP_POS_MSEC) / 1000, 2) + datetime_object = datetime.utcfromtimestamp(s_timestamp) + timestamp = datetime_object.strftime("%H:%M:%S") + temp_for_db["timestamp"] = timestamp + + # Run YOLOv8 tracking on the frame, persisting tracks between frames + results = tracker_model.track(frame, persist=True) + + frame_list.append(frame.copy()) + results_list.append(deepcopy(results)) + tfdb_list.append(deepcopy(temp_for_db)) + + # 1초에 3 frame만 저장해서 vmae+MIL에 사용 + if (frame_count - 1) % frame_inteval == 0: + v_frame = tf(image=frame)["image"] + # (224, 224, 3) + v_frame = np.expand_dims(v_frame, axis=0) + # (1, 224, 224, 3) + v_frames.append(v_frame.copy()) + + # 16 frame이 모이면 vmae+MIL 계산 + if len(v_frames) == 16: + in_frames = np.concatenate(v_frames) + # (16, 224, 224, 3) + in_frames = in_frames.transpose(3, 0, 1, 2) + # (RGB 3, frame T=16, H=224, W=224) + in_frames = np.expand_dims(in_frames, axis=0) + # (1, 3, 16 * segments_num, 224, 224) + in_frames = torch.from_numpy(in_frames).float() + # torch.Size([1, 3, 16, 224, 224]) + + in_frames = in_frames.to(device) + + with torch.no_grad(): + v_output = backbone(in_frames) + # torch.Size([1, 710]) + v_score = classifier(v_output) + # torch.Size([1, 1]) + scores.append(v_score.cpu().item()) + + v_frames = [] + + if len(frame_list) == 16 * frame_inteval: + for f_step, (frame_i, results_i, temp_for_db_i) in enumerate( + zip(frame_list, results_list, tfdb_list) + ): + if scores[-1] > threshold: + anomaly_text = f"Anomaly detected, score: {scores[-1]}" + if results_i[0].boxes is not None: # Check if there are results and boxes + + # Get the boxes + boxes = results_i[0].boxes.xywh.cpu() + + if results_i[0].boxes.id is not None: + # If 'int' attribute exists (there are detections), get the track IDs + + track_ids = results_i[0].boxes.id.int().cpu().tolist() + + # Loop through the detections and add data to the DataFrame + # anomaly_text = "" # Initialize the anomaly text + + # vmae에서 보는 frame들만 db에 저장 + if f_step % frame_inteval == 0: + # 한 프레임에서 검출된 사람만큼 돌아가는 반복문. 2명이면 각 id 별로 아래 연산들이 진행됨. + for i, box in zip( + range(0, len(track_ids)), results_i[0].boxes.xywhn.cpu() + ): + + x, y, w, h = box + keypoints = ( + results_i[0].keypoints.xyn[i].cpu().numpy().flatten().tolist() + ) + + xywhk = np.array( + [float(x), float(y), float(w), float(h)] + keypoints + ) + + xywhk = list(map(lambda x: str(round(x, 4)), xywhk)) + + temp_for_db_i["bbox"][f"id {i}"] = " ".join(xywhk[:4]) + + temp_for_db_i["keypoints"][f"id {i}"] = " ".join(xywhk[4:]) + + else: + # If 'int' attribute doesn't exist (no detections), set track_ids to an empty list + track_ids = [] + + # Visualize the results on the frame + annotated_frame = results_i[0].plot() + self.display_text( + annotated_frame, anomaly_text, (10, 30) + ) # Display the anomaly text + + # Plot the tracks + # for box, track_id in zip(boxes, track_ids): + # x, y, w, h = box + # track = track_history[track_id] + # track.append((float(x), float(y))) # x, y center point + # if len(track) > 30: # retain 90 tracks for 90 frames + # track.pop(0) + + # # Draw the tracking lines + # points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2)) + # cv2.polylines( + # annotated_frame, + # [points], + # isClosed=False, + # color=(230, 230, 230), + # thickness=10, + # ) + + # Display the annotated frame + output_video.write(annotated_frame) + # cv2.imshow("YOLOv8 Tracking", annotated_frame) + else: + self.display_text(frame_i, anomaly_text, (10, 30)) # Display the anomaly text + output_video.write(frame_i) + + # vmae에서 보는 frame들만 db에 저장 + if f_step % frame_inteval == 0: + temp_for_db_i["score"] = scores[-1] + + # upload frame to s3 + frame_url = self.upload_frame_s3(self.s3, frame_i) + + # upload frame, ts, bbox, kp to db + self.upload_frame_db(self.db, temp_for_db_i, frame_url) + + else: + anomaly_text = "" + output_video.write(frame_i) + # cv2.imshow("YOLOv8 Tracking", frame) + + # 16 * frame_interval개 frame 표시 후 초기화 + frame_list = [] + results_list = [] + tfdb_list = [] + + else: + if len(frame_list) != 0: + for f in frame_list: + output_video.write(f) + + # Break the loop if the end of the video is reached + break + + # Release the video capture, video writer object + cap.release() + output_video.release() + # cv2.destroyAllWindows() + + # upload video to s3 + self.upload_video_s3(self.s3, output_video_path) + + # upload score graph to s3 + self.upload_score_graph_s3(self.s3, scores) + if temp_name: + os.remove(temp_name) + os.remove(output_video_path) diff --git a/app/inference/anomaly_detector_lstmae.py b/app/inference/anomaly_detector_lstmae.py new file mode 100644 index 0000000..f91ee05 --- /dev/null +++ b/app/inference/anomaly_detector_lstmae.py @@ -0,0 +1,367 @@ +from collections import defaultdict +import cv2 +import numpy as np +import pandas as pd +from ultralytics import YOLO +import torch +from sklearn.preprocessing import MinMaxScaler + +from fastapi import HTTPException +from starlette import status +from database import crud +from database import schemas + +import os +import sys +import uuid +import json +from datetime import datetime, time +import matplotlib.pyplot as plt +from io import BytesIO + +import torch + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(os.path.dirname(current_dir)) + +sys.path.append(os.path.join(parent_dir, "model")) +from lstmae.lstm_ae import LSTMAutoEncoder + +class AnomalyDetector: + def __init__(self, video_file, info, s3_client, settings, db): + self.video = s3_client.generate_presigned_url( + ClientMethod='get_object', + Params={'Bucket': settings.BUCKET, + 'Key': video_file}, + ExpiresIn=3600) + #print(self.video) + self.info = info + self.s3 = s3_client + self.settings = settings + self.thr = info['threshold'] + self.video_url = f"video/{info['user_id']}/{info['upload_id']}/{info['video_uuid_name']}{info['video_ext']}" + self.frame_url_base = f"frame/{info['user_id']}/{info['upload_id']}/" + self.db = db + + + def display_text(self, frame, text, position): + font = cv2.FONT_HERSHEY_SIMPLEX + font_scale = 1 + font_color = (0, 255, 0) # Green color + font_thickness = 2 + cv2.putText(frame, text, position, font, font_scale, font_color, font_thickness, cv2.LINE_AA) + + + def upload_frame_db(self, db, temp_for_db, frame_url): + + temp_json_path = './temp.json' + + with open(temp_json_path, 'w') as f: + json.dump(temp_for_db, f) + + with open(temp_json_path, 'r') as f: + box_kp_json = json.load(f) + + _frame_create = schemas.FrameCreate(frame_url=frame_url, + time_stamp=temp_for_db['timestamp'], + box_kp_json=box_kp_json, + score=temp_for_db['score'], + video_id=self.info['video_id']) + + crud.create_frame(db=db, frame=_frame_create) + + os.remove(temp_json_path) + + + def upload_frame_s3(self, s3, frame): + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Frame 을 s3 저장소 업로드에 실패했습니다." + ) + frame_name = uuid.uuid1() + frame_url = self.frame_url_base + f'{frame_name}' + '.png' + #print(frame_url) + + try: + s3.upload_fileobj( + BytesIO(cv2.imencode('.png', frame)[1].tobytes()), + self.settings.BUCKET, + frame_url, + ExtraArgs={'ContentType': 'image/png'} + ) + except Exception as e: + #print(e) + raise s3_upload_exception + + return frame_url + + def upload_video_s3(self, s3, video): + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="video 를 s3 저장소 업로드에 실패했습니다." + ) + video_url = self.video_url + + video_change_codec = './temp_video_path_change_codec.mp4' + + os.system('ffmpeg -i "%s" -vcodec libx264 "%s"'%(video, video_change_codec)) + + try: + with open(video_change_codec, 'rb') as video_file: + s3.upload_fileobj( + video_file, + self.settings.BUCKET, + video_url, + ExtraArgs={'ContentType': 'video/mp4'} + ) + except Exception as e: + #print(e) + raise s3_upload_exception + + os.remove(video_change_codec) + + + def upload_score_graph_s3(self, s3, scores): + plt.plot(scores, color='red') + plt.title('Anomaly Scores Over Time') + plt.xlabel(' ') + plt.ylabel(' ') + + plt.xticks([]) + plt.yticks([]) + + save_path = './model_scores_plot.png' + plt.savefig(save_path) + + with open(save_path, 'rb') as image_file: + graph = image_file.read() + + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="score Graph 를 s3 저장소 업로드에 실패했습니다." + ) + score_graph_name = 'score_graph.png' + score_graph_url = self.frame_url_base + score_graph_name + + try: + s3.upload_fileobj( + BytesIO(graph), + self.settings.BUCKET, + score_graph_url, + ExtraArgs={'ContentType': 'image/png'} + ) + except: + raise s3_upload_exception + + os.remove(save_path) + + def run(self): + # YOLO + tracker_model = YOLO('yolov8n-pose.pt') + + # Define sequence_length, prediction_time, and n_features + sequence_length = 20 + prediction_time = 1 + n_features = 38 + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + # LSTM autoencoder + checkpoint = torch.load('/data/ephemeral/home/level2-3-cv-finalproject-cv-06/model/pts/LSTM_20240324_222238_best.pth') + autoencoder_model = LSTMAutoEncoder(num_layers=2, hidden_size=50, n_features=n_features, + device=device) + autoencoder_model.load_state_dict(checkpoint['model_state_dict']) + tracker_model.to(device) + autoencoder_model.to(device) + + # Define the standard frame size + standard_width = 640 + standard_height = 480 + + # Open the video file + cap = cv2.VideoCapture(self.video) + if not cap.isOpened(): + temp_name = f'{uuid.uuid4()}.mp4' + self.s3.download_file(self.settings.BUCKET, self.video_url, temp_name) + cap = cv2.VideoCapture(temp_name) + fps = cap.get(cv2.CAP_PROP_FPS) + + # Store the track history + track_history = defaultdict(lambda: []) + + # Initialize a dictionary to store separate buffers for each ID + id_buffers = defaultdict(lambda: []) + + # HTML -> H.264 codec + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + + # video writer -> ID 별 score, bbox 가 나온 영상을 s3 에 업로드 + output_video_path = './temp_video_path.mp4' + output_video = cv2.VideoWriter(output_video_path, fourcc, fps, (standard_width, standard_height)) + + # Define a function to calculate MSE between two sequences + def calculate_mse(seq1, seq2): + return np.mean(np.power(seq1 - seq2, 2)) + + # anomaly threshold (default 0.02) + threshold = self.thr + + # Loop through the video frames + frame_count = 0 + net_mse = 0 + avg_mse = 0 + # score graph 를 위한 score list + scores = [] + + while cap.isOpened(): + # Read a frame from the video + success, frame = cap.read() + frame_count += 1 # Increment frame count + + if success: + frame = cv2.resize(frame, (standard_width, standard_height)) + + temp_for_db = {'timestamp':None, 'bbox':{}, 'keypoints':{}, 'score':None} + + # track id (사람) 별로 mse 점수가 나오기 때문에 한 frame 에 여러 mse 점수가 나옴. 이를 frame 별 점수로 구하기 위해서 변수 설정 + mse_unit = 0 + + s_timestamp = round(cap.get(cv2.CAP_PROP_POS_MSEC)/1000, 2) + datetime_object = datetime.utcfromtimestamp(s_timestamp) + timestamp = datetime_object.strftime('%H:%M:%S') + temp_for_db['timestamp'] = timestamp + + # Run YOLOv8 tracking on the frame, persisting tracks between frames + results = tracker_model.track(frame, persist=True) + + if results[0].boxes is not None: # Check if there are results and boxes + + # Get the boxes + boxes = results[0].boxes.xywh.cpu() + + if results[0].boxes.id is not None: + # If 'int' attribute exists (there are detections), get the track IDs + + track_ids = results[0].boxes.id.int().cpu().tolist() + + # Loop through the detections and add data to the DataFrame + anomaly_text = "" # Initialize the anomaly text + + # 한 프레임에서 검출된 사람만큼 돌아가는 반복문. 2명이면 각 id 별로 아래 연산들이 진행됨. + for i, box in zip(range(0, len(track_ids)), results[0].boxes.xywhn.cpu()): + + x, y, w, h = box + keypoints = results[0].keypoints.xyn[i].cpu().numpy().flatten().tolist() + + # Append the keypoints to the corresponding ID's buffer + # bbox(4), keypoints per id(34) + id_buffers[track_ids[i]].append([float(x),float(y),float(w),float(h)]+keypoints) + + # If the buffer size reaches the threshold (e.g., 20 data points), perform anomaly detection + # track_id 별 20프레임이 쌓이면 아래 연산 진행 + if len(id_buffers[track_ids[i]]) >= 20: + # Convert the buffer to a NumPy array + buffer_array = np.array(id_buffers[track_ids[i]]) + + # Scale the data (you can use the same scaler you used during training) + scaler = MinMaxScaler() + buffer_scaled = scaler.fit_transform(buffer_array) + + # Create sequences for prediction + # x_pred: [1,20,38] + x_pred = buffer_scaled[-sequence_length:].reshape(1, sequence_length, n_features) + + # Predict the next values using the autoencoder model + x_pred = torch.tensor(x_pred, dtype=torch.float32).to(device) + x_pred = autoencoder_model.forward(x_pred) + + # Inverse transform the predicted data to the original scale + x_pred_original = scaler.inverse_transform(x_pred.cpu().detach().numpy().reshape(-1, n_features)) + + # Calculate the MSE between the predicted and actual values + mse = calculate_mse(buffer_array[-prediction_time:], x_pred_original) + + #print(mse) + + net_mse = mse+net_mse + avg_mse= net_mse/frame_count + + mse_unit += mse + + # Check if the MSE exceeds the threshold to detect an anomaly + if mse > 1.5*avg_mse*0.25 + 0.75*threshold: + if(anomaly_text==""): + anomaly_text = f"Focus ID {track_ids[i]}" + else: + anomaly_text = f"{anomaly_text}, {track_ids[i]}" + + #print(anomaly_text) + + temp_for_db['bbox'][f'id {i}'] = ' '.join(map(lambda x: str(round(x,4)), buffer_array[-prediction_time:][0, :4])) + + temp_for_db['keypoints'][f'id {i}'] = ' '.join(map(lambda x: str(round(x,4)), buffer_array[-prediction_time:][0, 4:])) + + # Remove the oldest data point from the buffer to maintain its size + id_buffers[track_ids[i]].pop(0) + + if temp_for_db['bbox'] != {}: + + temp_for_db['score'] = mse_unit + + # upload frame to s3 + frame_url = self.upload_frame_s3(self.s3, frame) + + # upload frame, ts, bbox, kp to db + self.upload_frame_db(self.db, temp_for_db, frame_url) + + else: + anomaly_text = "" + # If 'int' attribute doesn't exist (no detections), set track_ids to an empty list + track_ids = [] + + # Visualize the results on the frame + annotated_frame = results[0].plot() + self.display_text(annotated_frame, anomaly_text, (10, 30)) # Display the anomaly text + + # Plot the tracks + for box, track_id in zip(boxes, track_ids): + x, y, w, h = box + track = track_history[track_id] + track.append((float(x), float(y))) # x, y center point + if len(track) > 30: # retain 90 tracks for 90 frames + track.pop(0) + + # Draw the tracking lines + points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2)) + cv2.polylines(annotated_frame, [points], isClosed=False, color=(230, 230, 230), thickness=10) + + scores.append(mse_unit) + + # Display the annotated frame + output_video.write(annotated_frame) + #cv2.imshow("YOLOv8 Tracking", annotated_frame) + + else: + # If no detections, display the original frame without annotations + scores.append(mse_unit) + output_video.write(frame) + #cv2.imshow("YOLOv8 Tracking", frame) + + else: + # Break the loop if the end of the video is reached + break + + # Release the video capture, video writer object + cap.release() + output_video.release() + #cv2.destroyAllWindows() + + # upload video to s3 + self.upload_video_s3(self.s3, output_video_path) + + # upload score graph to s3 + self.upload_score_graph_s3(self.s3, scores) + + try: + os.remove(temp_name) + except: + pass + os.remove(output_video_path) \ No newline at end of file diff --git a/app/inference/rt_anomaly_detector.py b/app/inference/rt_anomaly_detector.py new file mode 100644 index 0000000..552db41 --- /dev/null +++ b/app/inference/rt_anomaly_detector.py @@ -0,0 +1,340 @@ +from collections import defaultdict +import cv2 +import numpy as np +import pandas as pd +from ultralytics import YOLO +import torch +from sklearn.preprocessing import MinMaxScaler + +from fastapi import HTTPException +from starlette import status +from database import crud +from database import schemas + +import os +import uuid +import json +from datetime import datetime, time, timedelta +import matplotlib.pyplot as plt +from io import BytesIO + +import torch +import torch.nn as nn + +import albumentations as A + +import sys + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(os.path.dirname(current_dir)) + +sys.path.append(os.path.join(parent_dir, "model")) +import vmae + +# @@ timm은 0.4.12 버전 사용 필수 +from timm.models import create_model +from copy import deepcopy + +class RT_AnomalyDetector: + def __init__(self, info, s3_client, settings, db, websocket): + self.info = info + self.s3 = s3_client + self.settings = settings + self.frame_url_base = f"frame/{info['user_id']}/{info['upload_id']}/" + self.db = db + self.websocket = websocket + + async def upload_frame_db(self, db, temp_for_db, frame_url): + + temp_json_path = "./temp.json" + + with open(temp_json_path, "w") as f: + json.dump(temp_for_db, f) + + with open(temp_json_path, "r") as f: + box_kp_json = json.load(f) + + _frame_create = schemas.FrameCreate( + frame_url=frame_url, + time_stamp=temp_for_db["timestamp"], + box_kp_json=box_kp_json, + score=temp_for_db["score"], + video_id=self.info["video_id"], + ) + + crud.create_frame(db=db, frame=_frame_create) + + os.remove(temp_json_path) + + async def upload_frame_s3(self, s3, frame): + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="Frame 을 s3 저장소 업로드에 실패했습니다." + ) + frame_name = uuid.uuid1() + frame_url = self.frame_url_base + f"{frame_name}" + ".png" + # print(frame_url) + + try: + s3.upload_fileobj( + BytesIO(cv2.imencode(".png", frame)[1].tobytes()), + self.settings.BUCKET, + frame_url, + ExtraArgs={"ContentType": "image/png"}, + ) + except Exception as e: + # print(e) + raise s3_upload_exception + + return frame_url + + def upload_score_graph_s3(self): + plt.plot(self.scores, color="red") + plt.title("Anomaly Scores Over Time") + plt.xlabel(" ") + plt.ylabel(" ") + + plt.xticks([]) + plt.yticks([]) + + save_path = "./model_scores_plot.png" + plt.savefig(save_path) + + with open(save_path, "rb") as image_file: + graph = image_file.read() + + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="score Graph 를 s3 저장소 업로드에 실패했습니다." + ) + score_graph_name = "score_graph.png" + score_graph_url = self.frame_url_base + score_graph_name + + try: + self.s3.upload_fileobj( + BytesIO(graph), self.settings.BUCKET, score_graph_url, ExtraArgs={"ContentType": "image/png"} + ) + except: + raise s3_upload_exception + + os.remove(save_path) + + def ready(self): + # YOLO + self.tracker_model = YOLO("yolov8n-pose.pt") + + # VMAE v2 + self.backbone = model = create_model( + "vit_small_patch16_224", + img_size=224, + pretrained=False, + num_classes=710, + all_frames=16, + ) + + load_dict = torch.load( + "/data/ephemeral/home/level2-3-cv-finalproject-cv-06/model/pts/vit_s_k710_dl_from_giant.pth" + ) + + self.backbone.load_state_dict(load_dict["module"]) + + self.tf = A.Resize(224, 224) + + # Define sequence_length, prediction_time, and n_features + # sequence_length = 20 + # prediction_time = 1 + # n_features = 38 + + # classifier + checkpoint = torch.load( + "/data/ephemeral/home/level2-3-cv-finalproject-cv-06/model/pts/MIL_20240325_202019_best_auc.pth" + ) + self.classifier = vmae.MILClassifier(input_dim=710, drop_p=0.3) + self.classifier.load_state_dict(checkpoint["model_state_dict"]) + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + self.tracker_model.to(device) + self.backbone.to(device) + self.backbone.eval() + self.classifier.to(device) + self.classifier.eval() + + # Store the track history + self.track_history = defaultdict(lambda: []) + + # Initialize a dictionary to store separate buffers for each ID + self.id_buffers = defaultdict(lambda: []) + + # Loop through the video frames + self.frame_count = 0 + # self.net_mse = 0 + # self.avg_mse = 0 + + # score graph 를 위한 score list + self.scores = [] + + # vmae에 입력할 frame들을 저장할 list + self.v_frames = [] + + # 영상 frame 임시 저장 list + self.frame_list = [] + # yolo results 임시 저장 list + self.results_list = [] + # temp_for_db 임시 저장 list + self.tfdb_list = [] + + # timestamp 저장 + self.prv_timestamp = 0 + self.fps3_delta = timedelta(seconds=1 / 3) + + async def run(self, frame, timestamp): + + # Define the standard frame size + standard_width = 640 + standard_height = 480 + + # Define sequence_length, prediction_time, and n_features + # sequence_length = 20 + # prediction_time = 1 + # n_features = 38 + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + # Define a function to calculate MSE between two sequences + def calculate_mse(seq1, seq2): + return np.mean(np.power(seq1 - seq2, 2)) + + # anomaly threshold (default 0.02) + # threshold = self.info["thr"] + threshold = 0.1 + + self.frame_count += 1 # Increment frame count + + frame = cv2.resize(frame, (standard_width, standard_height)) + + temp_for_db = {"timestamp": None, "bbox": {}, "keypoints": {}, "score": None} + + # track id (사람) 별로 mse 점수가 나오기 때문에 한 frame 에 여러 mse 점수가 나옴. 이를 frame 별 점수로 구하기 위해서 변수 설정 + # mse_unit = 0 + + frame_checker = False + + # 이전에 frame을 저장한 시점에서 0.3333.. 초 이상 경과했는지 확인 + if self.prv_timestamp == 0: + frame_checker = True + self.prv_timestamp = timestamp + else: + time_delta = timestamp - self.prv_timestamp + if time_delta > self.fps3_delta: + frame_checker = True + self.prv_timestamp = timestamp + + timestamp = timestamp.strftime("%H:%M:%S") + temp_for_db["timestamp"] = timestamp + + # Run YOLOv8 tracking on the frame, persisting tracks between frames + + # print(f"==>> frame_checker: {frame_checker}") + # frame_checker = True + # 1초에 3 frame만 저장해서 vmae+MIL에 사용 + if frame_checker: + results = self.tracker_model.track(frame, persist=True, verbose=False) + # print("yolo 1frame inference") + self.frame_list.append(frame.copy()) + self.results_list.append(deepcopy(results)) + self.tfdb_list.append(deepcopy(temp_for_db)) + + v_frame = self.tf(image=frame)["image"] + # (224, 224, 3) + v_frame = np.expand_dims(v_frame, axis=0) + # (1, 224, 224, 3) + self.v_frames.append(v_frame.copy()) + print(f"==>> len(self.v_frames): {len(self.v_frames)}") + + # 16 frame이 모이면 vmae+MIL 계산 + if len(self.v_frames) == 176: + print("VMAE 176frame inference") + in_frames = np.concatenate(self.v_frames) + # (176, 224, 224, 3) + in_frames = in_frames.reshape(11, 16, 224, 224, 3) + in_frames = in_frames.transpose(0, 4, 1, 2, 3) + # (11, RGB 3, frame T=16, H=224, W=224) + in_frames = torch.from_numpy(in_frames).float() + # torch.Size([11, 3, 16, 224, 224]) + + in_frames = in_frames.to(device) + + with torch.no_grad(): + v_output = self.backbone(in_frames) + # torch.Size([11, 710]) + v_output = v_output.view(1, 11, -1) + v_score = self.classifier(v_output) + v_score = v_score.view(1, 11) + print(f"==>> v_score: {v_score}") + print(f"==>> v_score.shape: {v_score.shape}") + # torch.Size([1, 11]) + s_list = [v_score[0, i].cpu().item() for i in range(11)] + + self.v_frames = [] + for f_step, (frame_i, results_i, temp_for_db_i) in enumerate( + zip(self.frame_list, self.results_list, self.tfdb_list) + ): + if s_list[f_step // 16] > threshold: + # if True: + anomaly_text = f"Anomaly detected, score: {s_list[f_step // 16]}" + + if results_i[0].boxes is not None: # Check if there are results and boxes + + # Get the boxes + boxes = results_i[0].boxes.xywh.cpu() + + if results_i[0].boxes.id is not None: + # If 'int' attribute exists (there are detections), get the track IDs + + track_ids = results_i[0].boxes.id.int().cpu().tolist() + + # Loop through the detections and add data to the DataFrame + # anomaly_text = "" # Initialize the anomaly text + + # 한 프레임에서 검출된 사람만큼 돌아가는 반복문. 2명이면 각 id 별로 아래 연산들이 진행됨. + for i, box in zip(range(0, len(track_ids)), results_i[0].boxes.xywhn.cpu()): + + x, y, w, h = box + keypoints = results_i[0].keypoints.xyn[i].cpu().numpy().flatten().tolist() + + xywhk = np.array([float(x), float(y), float(w), float(h)] + keypoints) + + xywhk = list(map(lambda x: str(round(x, 4)), xywhk)) + + temp_for_db_i["bbox"][f"id {i}"] = " ".join(xywhk[:4]) + + temp_for_db_i["keypoints"][f"id {i}"] = " ".join(xywhk[4:]) + + else: + # If 'int' attribute doesn't exist (no detections), set track_ids to an empty list + track_ids = [] + + # self.scores.append(mse_unit) + + # Display the annotated frame + # cv2.imshow("YOLOv8 Tracking", annotated_frame) + + # else: + # If no detections, display the original frame without annotations + # self.scores.append(mse_unit) + # cv2.imshow("YOLOv8 Tracking", frame) + + temp_for_db_i["score"] = s_list[f_step // 16] + + # upload frame to s3 + frame_url = await self.upload_frame_s3(self.s3, frame_i) + + # upload frame, ts, bbox, kp to db + await self.upload_frame_db(self.db, temp_for_db_i, frame_url) + + await self.websocket.send_text(f"{timestamp}: {anomaly_text}") + + # 초기화 + self.scores.extend(deepcopy(s_list)) + s_list = [] + self.frame_list = [] + self.results_list = [] + self.tfdb_list = [] \ No newline at end of file diff --git a/app/inference/rt_anomaly_detector_lstmae.py b/app/inference/rt_anomaly_detector_lstmae.py new file mode 100644 index 0000000..c0b03ae --- /dev/null +++ b/app/inference/rt_anomaly_detector_lstmae.py @@ -0,0 +1,280 @@ +from collections import defaultdict +import cv2 +import numpy as np +import pandas as pd +from ultralytics import YOLO +import torch +from sklearn.preprocessing import MinMaxScaler + +from fastapi import HTTPException +from starlette import status +from database import crud +from database import schemas + +import os +import sys +import uuid +import json +from datetime import datetime, time +import matplotlib.pyplot as plt +from io import BytesIO + +import torch +import torch.nn as nn + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(os.path.dirname(current_dir)) + +sys.path.append(os.path.join(parent_dir, "model")) + +from lstmae.lstm_ae import LSTMAutoEncoder + +class RT_AnomalyDetector: + def __init__(self, info, s3_client, settings, db, websocket): + self.info = info + self.s3 = s3_client + self.settings = settings + self.frame_url_base = f"frame/{info['user_id']}/{info['upload_id']}/" + self.db = db + self.websocket = websocket + + async def upload_frame_db(self, db, temp_for_db, frame_url): + + temp_json_path = './temp.json' + + with open(temp_json_path, 'w') as f: + json.dump(temp_for_db, f) + + with open(temp_json_path, 'r') as f: + box_kp_json = json.load(f) + + _frame_create = schemas.FrameCreate(frame_url=frame_url, + time_stamp=temp_for_db['timestamp'], + box_kp_json=box_kp_json, + score=temp_for_db['score'], + video_id=self.info['video_id']) + + crud.create_frame(db=db, frame=_frame_create) + + os.remove(temp_json_path) + + async def upload_frame_s3(self, s3, frame): + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Frame 을 s3 저장소 업로드에 실패했습니다." + ) + frame_name = uuid.uuid1() + frame_url = self.frame_url_base + f'{frame_name}' + '.png' + #print(frame_url) + + try: + s3.upload_fileobj( + BytesIO(cv2.imencode('.png', frame)[1].tobytes()), + self.settings.BUCKET, + frame_url, + ExtraArgs={'ContentType': 'image/png'} + ) + except Exception as e: + #print(e) + raise s3_upload_exception + + return frame_url + + def upload_score_graph_s3(self): + plt.plot(self.scores, color='red') + plt.title('Anomaly Scores Over Time') + plt.xlabel(' ') + plt.ylabel(' ') + + plt.xticks([]) + plt.yticks([]) + + save_path = './model_scores_plot.png' + plt.savefig(save_path) + + with open(save_path, 'rb') as image_file: + graph = image_file.read() + + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="score Graph 를 s3 저장소 업로드에 실패했습니다." + ) + score_graph_name = 'score_graph.png' + score_graph_url = self.frame_url_base + score_graph_name + + try: + self.s3.upload_fileobj( + BytesIO(graph), + self.settings.BUCKET, + score_graph_url, + ExtraArgs={'ContentType': 'image/png'} + ) + except: + raise s3_upload_exception + + os.remove(save_path) + + def ready(self): + # YOLO + self.tracker_model = YOLO('yolov8n-pose.pt') + + # Define sequence_length, prediction_time, and n_features + sequence_length = 20 + prediction_time = 1 + n_features = 38 + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + # LSTM autoencoder + # LSTM autoencoder + checkpoint = torch.load('/data/ephemeral/home/level2-3-cv-finalproject-cv-06/model/pts/LSTM_20240324_222238_best.pth') + self.autoencoder_model = LSTMAutoEncoder(num_layers=2, hidden_size=50, n_features=n_features, device=device) + self.autoencoder_model.load_state_dict(checkpoint['model_state_dict']) + + self.tracker_model.to(device) + self.autoencoder_model.to(device) + + # Store the track history + self.track_history = defaultdict(lambda: []) + + # Initialize a dictionary to store separate buffers for each ID + self.id_buffers = defaultdict(lambda: []) + + # Loop through the video frames + self.frame_count = 0 + self.net_mse = 0 + self.avg_mse = 0 + # score graph 를 위한 score list + self.scores = [] + + async def run(self, frame, timestamp): + + # Define the standard frame size + standard_width = 640 + standard_height = 480 + + # Define sequence_length, prediction_time, and n_features + sequence_length = 20 + prediction_time = 1 + n_features = 38 + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + # Define a function to calculate MSE between two sequences + def calculate_mse(seq1, seq2): + return np.mean(np.power(seq1 - seq2, 2)) + + # anomaly threshold (default 0.02) + threshold = self.info['thr'] + + self.frame_count += 1 # Increment frame count + + frame = cv2.resize(frame, (standard_width, standard_height)) + + temp_for_db = {'timestamp':None, 'bbox':{}, 'keypoints':{}, 'score':None} + + # track id (사람) 별로 mse 점수가 나오기 때문에 한 frame 에 여러 mse 점수가 나옴. 이를 frame 별 점수로 구하기 위해서 변수 설정 + mse_unit = 0 + + timestamp = timestamp.strftime('%H:%M:%S') + temp_for_db['timestamp'] = timestamp + + # Run YOLOv8 tracking on the frame, persisting tracks between frames + results = self.tracker_model.track(frame, persist=True) + + if results[0].boxes is not None: # Check if there are results and boxes + + # Get the boxes + boxes = results[0].boxes.xywh.cpu() + + if results[0].boxes.id is not None: + # If 'int' attribute exists (there are detections), get the track IDs + + track_ids = results[0].boxes.id.int().cpu().tolist() + + # Loop through the detections and add data to the DataFrame + anomaly_text = "" # Initialize the anomaly text + + # 한 프레임에서 검출된 사람만큼 돌아가는 반복문. 2명이면 각 id 별로 아래 연산들이 진행됨. + for i, box in zip(range(0, len(track_ids)), results[0].boxes.xywhn.cpu()): + + x, y, w, h = box + keypoints = results[0].keypoints.xyn[i].cpu().numpy().flatten().tolist() + + # Append the keypoints to the corresponding ID's buffer + # bbox(4), keypoints per id(34) + self.id_buffers[track_ids[i]].append([float(x),float(y),float(w),float(h)]+keypoints) + + # If the buffer size reaches the threshold (e.g., 20 data points), perform anomaly detection + # track_id 별 20프레임이 쌓이면 아래 연산 진행 + if len(self.id_buffers[track_ids[i]]) >= 20: + # Convert the buffer to a NumPy array + buffer_array = np.array(self.id_buffers[track_ids[i]]) + + # Scale the data (you can use the same scaler you used during training) + scaler = MinMaxScaler() + buffer_scaled = scaler.fit_transform(buffer_array) + + # Create sequences for prediction + x_pred = buffer_scaled[-sequence_length:].reshape(1, sequence_length, n_features) + + # Predict the next values using the autoencoder model + x_pred = torch.tensor(x_pred, dtype=torch.float32).to(device) + x_pred = self.autoencoder_model.forward(x_pred) + + # Inverse transform the predicted data to the original scale + x_pred_original = scaler.inverse_transform(x_pred.cpu().detach().numpy().reshape(-1, n_features)) + + # Calculate the MSE between the predicted and actual values + mse = calculate_mse(buffer_array[-prediction_time:], x_pred_original) + + #print(mse) + + self.net_mse = mse+self.net_mse + self.avg_mse= self.net_mse/self.frame_count + + mse_unit += mse + + # Check if the MSE exceeds the threshold to detect an anomaly + if mse > 1.5*(self.avg_mse)*0.25 + 0.75*threshold: + + if(anomaly_text==""): + anomaly_text = f"이상행동이 감지되었습니다." + else: + anomaly_text = f"이상행동이 감지되었습니다." + + #print(anomaly_text) + + temp_for_db['bbox'][f'id {i}'] = ' '.join(map(lambda x: str(round(x,4)), buffer_array[-prediction_time:][0, :4])) + + temp_for_db['keypoints'][f'id {i}'] = ' '.join(map(lambda x: str(round(x,4)), buffer_array[-prediction_time:][0, 4:])) + + # Remove the oldest data point from the buffer to maintain its size + self.id_buffers[track_ids[i]].pop(0) + + if temp_for_db['bbox'] != {}: + + temp_for_db['score'] = mse_unit + + # upload frame to s3 + frame_url = await self.upload_frame_s3(self.s3, frame) + + # upload frame, ts, bbox, kp to db + await self.upload_frame_db(self.db, temp_for_db, frame_url) + + await self.websocket.send_text(f'{timestamp}: {anomaly_text}') + + + else: + anomaly_text = "" + # If 'int' attribute doesn't exist (no detections), set track_ids to an empty list + track_ids = [] + + self.scores.append(mse_unit) + + # Display the annotated frame + #cv2.imshow("YOLOv8 Tracking", annotated_frame) + + else: + # If no detections, display the original frame without annotations + self.scores.append(mse_unit) + #cv2.imshow("YOLOv8 Tracking", frame) \ No newline at end of file diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..cdf880a --- /dev/null +++ b/app/main.py @@ -0,0 +1,68 @@ +from datetime import timedelta, datetime +import os + +from fastapi import FastAPI, Request, Response, Form +from fastapi.templating import Jinja2Templates +from fastapi.middleware.cors import CORSMiddleware +from fastapi.staticfiles import StaticFiles +import uvicorn +from jose import jwt + +from api import user_router, upload_router, album_router, real_time_router +from utils.config import settings +from utils.security import get_current_user + +templates = Jinja2Templates(directory="templates") + +app = FastAPI() +static_dir = os.path.join(os.path.dirname(__file__), "templates", "src") +app.mount("/src", StaticFiles(directory=static_dir), name="src") + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.get("/") +async def main_get(request: Request): + user = get_current_user(request) + if user: + return templates.TemplateResponse("main.html", {"request": request, "token": user.email}) + else: + return templates.TemplateResponse("main.html", {"request": request, "token": None}) + + +@app.post("/") +async def main_post(request: Request): + body = await request.form() + email = body["email"] + data = { + "sub": email, + "exp": datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) + } + token = jwt.encode(data, settings.SECRET_KEY, algorithm=settings.ALGORITHM) + + template_response = templates.TemplateResponse('main.html', {'request': request, 'token': email}) + + # 쿠키 저장 + template_response.set_cookie( + key="access_token", + value=token, + expires=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES), + httponly=True, + ) + + return template_response + + +app.include_router(user_router.router) +app.include_router(upload_router.router) +app.include_router(album_router.router) +app.include_router(real_time_router.router) + +if __name__ == '__main__': + uvicorn.run("main:app", host='0.0.0.0', port=30011, reload=True) \ No newline at end of file diff --git a/app/templates/album_detail.html b/app/templates/album_detail.html new file mode 100644 index 0000000..03f06d5 --- /dev/null +++ b/app/templates/album_detail.html @@ -0,0 +1,100 @@ +{% extends 'base.html' %} + +{% block title %} +
사용자 ID: {{ video_info.user_id }}
+업로드 ID: {{ video_info.upload_id }}
+업로드 이름: {{ video_info.upload_name }}
+날짜: {{ video_info.date }}
+ {% if video_info.is_realtime %} +종류: 실시간
실시간 서비스는 영상을 녹화하지 않습니다. 따라서 앨범에서 전체 영상은 보이지 않습니다.
종류: 녹화 영상
+ {% endif %} +# | +이름 | +날짜&시간 | +이상탐지 | +분석 여부 | +Button | +
---|---|---|---|---|---|
{{ loop.index }} | ++ + + {{ upload.name }} + + | ++ {{ upload.date }} + | ++ {% if upload.is_realtime %} + 실시간 + {% else %} + 녹화 영상 + {% endif %} + | +
+ {% if upload.completes[0].completed %}
+ 분석 완료
+ {% else %}
+ 분석중
+ {% endif %}
+ |
+ + + + | +