diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 0000000..f8ec71a --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,19 @@ +--- +name: Bug Report +about: 버그를 레포트 할 때 사용하는 템플릿 +title: "[BUG] " +labels: '' +assignees: '' + +--- + +## Describe the bug +- + +## To Reproduce +- + +## Expected Behavior +- + +## Screenshots (Optional) \ No newline at end of file diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 0000000..2643aaa --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,14 @@ +--- +name: Feature Request +about: 새로운 기능을 추가할 때 사용하는 템플릿 +title: "[FEAT] " +labels: '' +assignees: '' + +--- + +## Background +- + +## To Do +- [ ] \ No newline at end of file diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..c8f0cae --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,12 @@ +## Overview +- + +## Change Log +- + +## To Reviewer +- + +## Issue Tags +- closed: # +- sell also: # (optional) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5529439 --- /dev/null +++ b/.gitignore @@ -0,0 +1,312 @@ +### JupyterNotebooks ### +# gitignore template for Jupyter Notebooks +# website: http://jupyter.org/ + +.ipynb_checkpoints +*/.ipynb_checkpoints/* + +# IPython +profile_default/ +ipython_config.py + +# Remove previous ipynb_checkpoints +# git rm -r .ipynb_checkpoints/ +dataset/ +pts/ +wandb/ +*.ipynb +*.png +*.pt +datapreprocess/code_prac.ipynb +datapreprocess/rt_code_test.ipynb +datapreprocess/csv/abnormal/train/*.csv +datapreprocess/csv/abnormal/val/*.csv +datapreprocess/csv/normal/train/*.csv +datapreprocess/csv/normal/val/*.csv +datapreprocess/json/abnormal/train/ +datapreprocess/json/abnormal/val/ +datapreprocess/json/abnormal/TS_03.이상행동_14.교통약자_train/ +datapreprocess/json/abnormal/TS_03.이상행동_14.교통약자_val/ +datapreprocess/json/normal/train/ +datapreprocess/json/normal/val/ +datapreprocess/*.csv +datapreprocess/*.pt +datapreprocess/*.pth +datapreprocess/npy +datapreprocess/nohup.out +model_train/code_prac.ipynb +model_train/model.h5 +model_train/pytorch_model.pth +model_train/nohup.out +model_train/*.ipynb +model_train/wandb +app/testitems/ +pths/ +app/models/pts +app/models/yolov8n-pose.pt +app/yolov8n-pose.pt +.netrc + +### macOS ### +# General +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + +### macOS Patch ### +# iCloud generated files +*.icloud + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook + +# IPython + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +### Python Patch ### +# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration +poetry.toml + +# ruff +.ruff_cache/ + +# LSP config files +pyrightconfig.json + +### VirtualEnv ### +# Virtualenv +# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/ +[Bb]in +[Ii]nclude +[Ll]ib +[Ll]ib64 +[Ll]ocal +[Ss]cripts +pyvenv.cfg +pip-selfcheck.json + +### VisualStudioCode ### +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +!.vscode/*.code-snippets + +# Local History for Visual Studio Code +.history/ + +# Built Visual Studio Code Extensions +*.vsix + +### VisualStudioCode Patch ### +# Ignore all local history of files +.history +.ionide + +### Windows ### +# Windows thumbnail cache files +Thumbs.db +Thumbs.db:encryptable +ehthumbs.db +ehthumbs_vista.db + +# Dump file +*.stackdump + +# Folder config file +[Dd]esktop.ini + +# Recycle Bin used on file shares +$RECYCLE.BIN/ + +# Windows Installer files +*.cab +*.msi +*.msix +*.msm +*.msp + +# Windows shortcuts +*.lnk + +<<<<<<< HEAD +# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode,macos,windows,jupyternotebooks,virtualenv,python +======= +# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode,macos,windows,jupyternotebooks,virtualenv,python \ No newline at end of file diff --git a/.gitmessage b/.gitmessage new file mode 100644 index 0000000..c10e929 --- /dev/null +++ b/.gitmessage @@ -0,0 +1,24 @@ +################ +# <타입> : <제목> 의 형식으로 제목을 아래 공백줄에 작성 +# 제목은 50자 이내 / 변경사항이 "무엇"인지 명확히 작성 / 끝에 마침표 금지 +# 예) feat : 로그인 기능 추가 + +# 바로 아래 공백은 지우지 마세요 (제목과 본문의 분리를 위함) + +################ +# 본문(구체적인 내용)을 아랫줄에 작성 +# 여러 줄의 메시지를 작성할 땐 "-"로 구분 (한 줄은 72자 이내) + +################ +# 꼬릿말(footer)을 아랫줄에 작성 (현재 커밋과 관련된 이슈 번호 추가 등) +# 예) Close #7 + +################ +# feat : 새로운 기능 추가 +# fix : 버그 수정 +# docs : 문서 수정 +# test : 테스트 코드 추가 +# refact : 코드 리팩토링 +# style : 코드 의미에 영향을 주지 않는 변경사항 +# chore : 빌드 부분 혹은 패키지 매니저 수정사항 +################ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..6c7e06b --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,52 @@ +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.5.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-added-large-files +- repo: https://github.com/pycqa/flake8 + rev: 7.0.0 + hooks: + - id: flake8 +- repo: https://github.com/myint/autoflake + rev: v2.2.1 + hooks: + - id: autoflake + args: ["--in-place", "--remove-all-unused-imports", "--remove-unused-variables", "--expand-star-imports", "--ignore-init-module-imports"] + files: \.py$ + exclude: | + (?x)( + ^.git/| + ^output/| + ^plugins + ) +- repo: https://github.com/pycqa/isort + rev: '5.13.2' + hooks: + - id: isort + language: python + args: ["--filter-files"] + files: \.py$ +- repo: https://github.com/psf/black + rev: '24.2.0' + hooks: + - id: black + args: ["--line-length=120"] + exclude: | + (?x)( + ^.git/| + ^output/| + ^plugins| + ^plugins| + ) +exclude: | + (?x)( + ^.git/| + ^output/| + ^plugins| + ^plugins| + ) \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..c001125 --- /dev/null +++ b/README.md @@ -0,0 +1,349 @@ +# 실시간 무인매장 이상행동 탐지 +cv-06 혁신비전테크(IVT) 최종 프로젝트 + +## 🎞️ 프로젝트 개요 + + + +## 💁🏻♂️ 팀 구성 및 역할 +| 이 름 | 역 할 | +| ----- | ----- | +| 김시웅 | 데이터셋 구성 및 코드 작성, 데이터 전처리(백본 feature 추출), VMAE 계열 모델 구현 | +| 박정민 | 데이터 탐색, FastAPI 기반 백엔드 기능 구현, 배포 | +| 백광현 | 프로젝트 기획 / 서비스 아키텍처 설계, API 구현 및 실시간 개발, LSTM-AE 모델링 / 학습 및 추론, 웹 서버 / 모델서버 분리, 배포 | +| 이동형 | 웹 개발 부분 버그 수정 및 리팩토링 | +| 조형서 | 웹 개발, 모델 조사 | +| 최수진 | 모델링, 데이터 전처리, YOLO 계열 모델 구현 | +- 개발 타임라인 + +
+ +
+ +## 📍 문제 정의 + + ++ + + +
+ +⇒ CCTV 를 활용하여 이상행동을 자동으로 탐지하고 증거확보 및 실시간 알람을 준다면 이 문제를 해소할 수 있지 않을까? + +## 💡 해결 방안 + + + +- 녹화된 영상을 직접 돌려보는 **시간과 비용 발생** + + ⇒ 업로드 영상을 분석 후 **타임 스탬프와 스크린 샷** 제공 + +- CCTV가 있더라도 관리자가 24시간 확인할 수 없어 **현장을 놓치는 문제** 발생 + + ⇒ **실시간 영상 분석**을 통해 **타임 스탬프, 스크린 샷** 그리고 **알람** 기능을 제공 + + + + +- **대용량, 장시간** CCTV 데이터를 사람보다 빠르게 처리하도록 속도 개선 +- 무인 매장에서 발생할 수 있는 **다양한 상황**들을 잘 감지할 수 있도록 개선 +- 이상 상황은 정확하게 판단하면서 **오탐률을 낮추는 방향**으로 개선 + +--- + +## 📼 Data + +### **AI Hub 실내(편의점, 매장) 사람 [이상](https://www.aihub.or.kr/aihubdata/data/view.do?currMenu=115&topMenu=100&aihubDataSe=data&dataSetSn=71550) / [정상](https://www.aihub.or.kr/aihubdata/data/view.do?currMenu=115&topMenu=100&aihubDataSe=data&dataSetSn=71549) 행동 데이터** + +- 특징 + - 이상 상황 여부를 프레임 단위 라벨링 + - 객체 바운딩 박스 + 포즈 스켈레톤 키포인트 라벨링 제공 + +- 카테고리 + - 구매 행동 : 매장 이동, 구매, 반품, 비교 등 + - 이상 행동 : 파손, 방화, 흡연, 절도, 폭행 등 + ++ + +
+ +## 🔨 Data Preprocessing + ++ +
+ +- 비디오 데이터는 이미지 데이터에 비해 매우 큰 용량 + + → 주어진 AI Stages V100 서버는 100 GB **용량 제한**이 있어 모델 전체 End-to-End 학습이 아닌 기학습 가중치를 사용한 백본 네트워크로 영상의 **Feature를 미리 계산**해 학습을 진행 + +- Backbone 네트워크의 기학습된 가중치는 고정하고, +영상 Feature들을 미리 계산해 csv(YOLO v8), npy(Video MAE v2) 파일에 저장해 학습 데이터 용량을 `353 GB` → `2.42 GB` 로 줄여서 학습을 진행 + +--- + +## 🤖 Model + + + +▶️ Backbone Network : **Video Masked Auto Encoder v2** + +- 일반적인 영상 이상 탐지 모델의 영상 Feature 추출에 사용되는 Backbone Network는 **I3D** + +⛔ I3D 방식은 Optical Flow를 추가로 계산하기 때문에 실시간성 확보에 어려움이 있다고 판단 + +👉 **Optical Flow 사용하지 않고** + +👉 **Action Recognition** 분야에서 좋은 성능을 낸 Backbone Network 선정 + +▶️ **YOLO v8** + +- 주어진 데이터 셋의 라벨링에서 객체별 **바운딩 박스**와 **포즈 스켈레톤 키포인트**를 제공하기 때문에 활용할 수 있는 모델을 선정 + +▶️ Classifier : [**LSTM Auto-Encoder](https://github.com/surya2003-real/anomaly-detection-and-object-tracking?tab=readme-ov-file), [MGFN](https://arxiv.org/abs/2211.15098), [Deep MIL Ranking](https://arxiv.org/abs/1801.04264), [BN-WVAD](https://arxiv.org/abs/2311.15367)** + +- 영상 Feature를 정상 / 이상 영상으로 **이진 분류**하는 Classifier + +⛔ 데이터 셋이 프레임 단위로 정상 / 이상 여부를 제공해 지도 학습이 가능하지만 + +👉 장기적으로 **Continual Learning**과 **영상 Feature 관리**를 위해 + +👉 **비지도 학습** 또는 **비디오 단위 라벨링**을 활용한 **약한 지도 학습**이 가능한 구조를 사용 + + + +1️⃣ **YOLO v8 + [LSTM autoencoder](https://github.com/surya2003-real/anomaly-detection-and-object-tracking?tab=readme-ov-file)** + ++ +
+ +- 데이터에서 제공하는 **포즈 스켈레톤** 정보를 활용하고자 함 + - Feature 추출 : **YOLO v8 Pose** + - 프레임 별 사람을 탐지해 Bbox, Keypoints 출력 + - 입력 영상의 Feature 추출 +- YOLO v8 (실시간 객체 탐지) + LSTM (시간적 특성 모델링) 역할로 구성 +- Classifier : **LSTM AE** + - 비지도 학습을 활용 + - **정상 행동만 학습**하고, **Encoder 입력**과 **Decoder 출력**의 차이를 줄이도록 학습 + - 학습 데이터와 다른 **이상 행동 입력이 주어지면**, 복원된 출력은 입력과 많은 차이가 발생 + + → **MSE**로 계산된 **복원 오차**가 임계치를 넘게 되면 이상으로 판단 + +- 장점: **실시간 데이터 처리, 스켈레톤 기반 행동 인식** 가능 +- 한계: 정상 영상이어도 학습 과정에서 배우지 않은 경우 이상으로 판단 + +2️⃣ **YOLO v8 + [MGFN](https://arxiv.org/abs/2211.15098)** + ++ +
+ +- LSTM의 한계를 극복하고자 +**MGFN**(Magnitude-Contrastive-Glance-and-Focus Network)을 활용 +- **약한 지도 학습** 방식을 도입 + + → 라벨이 **부정확, 불완전한 데이터**에서도 **학습이 가능**하도록 개선 + +- Classifier : **MGFN** + - 약한 지도 학습 활용 + - **어텐션 메커니즘** + - 비디오 내의 다양한 **시간적 및 공간적 특징**을 분석하기 위해 설계 + - 정상 / 이상 행동의 **차이**를 더 잘 포착 +- 장점 : MGFN 사용으로 더 **정교한 Feature 추출** 및 **성능 향상, 빠른 추론 속도** +- 한계: 학습 과정에서 **높은 계산 비용**과 **많은 시간** 소요 + +--- + +3️⃣ **Video MAE v2 + [Deep MIL ranking model](https://arxiv.org/abs/1801.04264)** + ++ +
+ +- Optical Flow 사용하지 않는 **Video MAE v2** 선정 + - Feature 추출 : **Video MAE v2** + - 영상을 16프레임으로 나눠 710 차원의 Feature Vector로 추출 +- 비디오 단위 라벨링으로 **약한 지도 학습** 방식 적용 가능. +- Classifier : **Deep MIL Ranking** + - UCF-Crime 데이터 셋의 베이스라인 모델 + - 영상을 여러 조각으로 나눠 **조각 별** **이상 예측 점수**를 출력 + - 정상 / 이상 영상을 1:1로 병행해 점수를 예측한 뒤 + 이상 영상의 모든 조각 예측 점수 중 최대값이 + 정상 영상의 모든 조각 예측 점수 중 최대값보다 커지도록 학습 + - BN-WVAD의 **Feature Enhancer** 구조를 추가 적용한 실험도 진행 +- 장점 + - 학습 시 이상 영상도 학습해 비지도 방식보다 **일반화 성능 향상** +- 한계 : 이상 영상 중 이상 행동 토막의 위치를 잘못 예측하는 등 **라벨링 노이즈 발생** 가능 + +4️⃣ **Video MAE v2 + [BN-WVAD](https://arxiv.org/abs/2311.15367)** + ++ +
+ +- UCF-Crime 데이터 셋 기준 SOTA 성능의 **BN-WVAD** 선정 + - ROC AUC = 0.8724 + - Deep MIL Ranking Model = 0.7541 +- Classifier : **BN-WVAD** + - Transformer 계열 **Feature enhancer**를 사용해 Video MAE v2가 추출한 Feature Vector의 품질을 향상 + - 영상의 각 조각의 **최종 예측 점수**는 + 해당 조각의 Anomaly Classifier 결과와 + Feature Vector들의 Mahalanobis Distance 평균을 **곱한 결과** + - 각 Layer의 Output Feature 벡터들을 + 배치 정규화 과정에서 구해진 특정 벡터의 평균과 **[Mahalanobis distance](https://en.wikipedia.org/wiki/Mahalanobis_distance)**로 거리 계산 +- 장점 + - Deep MIL ranking model의 **라벨링 노이즈** 문제 **개선** +- 한계 : Triplet 계열 Loss를 사용해 **Batch Size**가 다른 모델에 비해 **매우 커야** 학습이 잘 진행됨 + +--- + + + +- **ROC AUC score** + - 이상 탐지 모델은 + **탐지율(True Postive Rate)**도 중요하지만 + **오탐율(False Postive Rate)** 또한 매우 중요 + - ⇒ Threshold 값에 따른 **오탐율, 탐지율 값**을 + 곡선으로 표현한 ROC Curve의 면적인 **ROC AUC**로 성능 평가 +- **FPS** + - 30 FPS 이상의 실시간 탐지를 위해 + **1 프레임 당 처리 속도(FPS)**로 속도 평가 + ++ +
+TP, FP 에 따른 ROC Curve
+ + + +- 실험 기록 및 관리는 WandB를 사용하였으며, ROC AUC, FPS 외에도 정확도, 정상 / 이상 영상 예측 스코어 평균, 예측 스코어 최대값 평균 등 다양한 결과 값들을 기록 + ++ + +
+ +- 최종 결과 ++ +
+ + - ROC AUC 기준 가장 좋은 성능을 보인 VMAEv2+FE+MIL 구성은 실제 이상 행동을 배우기보다는 데이터셋의 이상행동 발생 프레임 위치의 패턴만을 배운 것을 발견하여 **최종 모델**로는 **VMAEv2+MIL** 구조를 채용 + +## 🌏 Service + + + +- **Web Server - Front** + - BootStrap, HTML + - 설계한 와이어 프레임 기반으로 페이지별 기능 구현 + - 웹캠 기능 + - 세션 토큰을 활용한 사용자 검증 + - 실시간 탐지 시 일정 시간에 따라 탐지된 프레임 자동 업데이트 +- **Web Server - Back** + - Fast API + - 클라이언트와 효율적 통신을 위한 **RestAPI** 설계 + - 모델 서버의 **트래픽 최소화**를 위해 DB 저장 및 읽기, 영상 저장 작업은 **웹 서버에서 진행** + - **Websocket**을 이용해 모델 서버에 **실시간 프레임 전달** + - 웹캠, RTSP, Youtube 동영상 등 다양한 소스 처리 가능하도록 구현 + - SMTP 를 활용한 **이메일 알람** 로직 구현 +- **Web Server - Database** + - MySQL, S3 + - DB에 대해 쓰기 작업보다는 **읽기 작업이 많고**, 복잡한 쿼리가 없기 때문에 **속도와 안정성이 좋은 MySQL** 선정 + - SQLAlchemy 의 ORM 활용 + - **용량이 큰** 비디오, 프레임 이미지들을 위한 저장소로 **AWS S3** 선정. DB에는 S3 URL 을 적재하여 접근 가능하도록 함. + - 모델 추론 **결과(bounding** **box, keypoints)를 저장**하여 이후 추가 기능 혹은 모델 학습에 사용할 수 있도록 함. +- **Model Server** + - FastAPI, Pytorch + - GPU 사용 서버 + - 녹화 영상의 경우, 추론 이후 **OpenCV** 와 **FFMPEG** 를 이용, 후처리(코덱 설정)하여 html 에서 송출 가능하도록 함 + - **실시간 추론 서버**와 **녹화영상 추론 서버**로 나누어 운영. + - 추론 시 이상행동 프레임을 AWS S3 에 저장하고, DB frame 테이블을 갱신 + ++ +
+ ++ +
+ +## 🛎️ Product Serving + +- AI Stages 서버는 도커 컨테이너 환경으로 **외부 접속 및 방화벽 설정** **불가** + - VPN에서 외부 접속이 가능하도록 하는 **우회 경로 오픈**이 **금지** + - 제공되는 .ovpn 파일과 **SSH 포트 포워딩**을 통해 **AWS EC2** 환경에서 배포를 시도했으나 VPN 관련 오류인지, SSH 오류인지 로그를 확인하기 어려웠습니다. +- 우선, API 엔드포인트를 활용하여 웹 -모델 서버를 분리한 상태로 서비스를 완성시켜 놓았고, 이후 로컬 혹은 AWS 환경에서 배포를 지속적으로 시도하고 있습니다. +- 추가로 로드밸런싱을 이용하여 서버의 부하를 더 줄이는 방안도 공부하고 있습니다. +- 우리 서비스는 실시간 영상 분석을 제공하고 있는데, 다른 네트워크에 위치한 웹 서버 - 모델 서버 간 통신이 “실시간” 구현에 있어 문제되는지 면밀히 검토할 예정입니다. + +## 📸 실제 모습 + + + +1. 이상행동으로 판단된 **장면들과 타임스탬프를 저장**하고, 해당 시간대로 이동해 쉽게 확인할 수 있도록 제공 +2. 특정 장면을 자료로 사용하기 위해 **화질 개선 혹은 몽타주 생성** 등의 기능을 추가할 수 있음 + ++ +
+ + + +1. **웹캠**, **동영상 스트리밍** 또는 **외부 CCTV** 와 연결하여 **실시간 이상행동 분석** 실시 +2. 이상 행동이 일정 시간 지속되면 가입된 이메일로 **발생 시간** 전송 + ++ +
+ + + +1. 분석 단위로 앨범 기능을 제공하여 관리에 용이하고 결과를 재사용할 수 있다. + ++ +
\ No newline at end of file diff --git a/app/.env b/app/.env new file mode 100644 index 0000000..fd02435 --- /dev/null +++ b/app/.env @@ -0,0 +1,24 @@ +# RDS +# MYSQL_SERVER_HOST = "cv06-database.xxxxxxxxxxxx.us-east-1.rds.amazonaws.com" +# MYSQL_SERVER_PORT = 3306 +# MYSQL_SERVER_USER = "bkh" +# MYSQL_SERVER_PASSWORD = "cv06-database" +# MYSQL_DATABASE = "cv06_database" + +AWS_ACCESS_KEY = "AWS access key" +AWS_SECRET_KEY = "AWS secret key" +BUCKET = "bucket name" + +SMTP_ADDRESS = "smtp.gmail.com" +SMTP_PORT = 465 +MAIL_ACCOUNT = "aitech06ivt@gmail.com" +MAIL_PASSWORD = "hrxs kybl yccd lgsy" + +UPLOAD_MODEL_SERVER_IP = "upload video model server" +STREAM_MODEL_SERVER_IP = "real time video model server(ws)" + +MYSQL_SERVER_IP = "10.28.xxx.xxx" +MYSQL_SERVER_PORT = 30xxx +MYSQL_SERVER_USER = "bkh" +MYSQL_SERVER_PASSWORD = "bkh" +MYSQL_DATABASE = "cv06_database" \ No newline at end of file diff --git a/app/README.md b/app/README.md new file mode 100644 index 0000000..b6ec7d0 --- /dev/null +++ b/app/README.md @@ -0,0 +1,57 @@ +# Project Structure + +```bash +. +├── README.md +├── __init__.py +├── api +│ ├── __init__.py +│ ├── album_router.py +│ ├── real_time_router.py +│ ├── upload_router.py +│ └── user_router.py +├── database +│ ├── __init__.py +│ ├── crud.py +│ ├── database.py +│ ├── models.py +│ └── schemas.py +├── inference +│ ├── __init__.py +│ ├── anomaly_detector.py +│ └── rt_anomaly_detector.py +├── main.py +├── templates +│ ├── album_detail.html +│ ├── album_list.html +│ ├── base.html +│ ├── frame.html +│ ├── login.html +│ ├── main.html +│ ├── real_time.html +│ ├── signup.html +│ ├── src +│ │ ├── album_detail.js +│ │ ├── album_list.js +│ │ └── video.js +│ ├── stream.html +│ ├── upload.html +│ └── video.html +└── utils + ├── __init__.py + ├── config.py + ├── security.py + └── utils.py +``` + +# Description + +- api: URL별 로직 구현 + +- database: 데이터베이스 관련 설정 및 함수 + +- inference: 모델 추론 코드(녹화영상, 실시간) + +- templates: UI 템플릿. Bootstrap 사용 + +- utils: config 및 기타 함수 \ No newline at end of file diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/api/__init__.py b/app/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/api/album_router.py b/app/api/album_router.py new file mode 100644 index 0000000..04a29a4 --- /dev/null +++ b/app/api/album_router.py @@ -0,0 +1,199 @@ +from typing import Optional + +from database import crud, models +from database.database import get_db +from fastapi import ( + APIRouter, + Cookie, + Depends, + File, + Form, + HTTPException, + Query, + Request, + Response, + UploadFile, +) +from fastapi.responses import RedirectResponse +from fastapi.templating import Jinja2Templates +from sqlalchemy.orm import Session +from utils.config import settings +from utils.security import get_current_user +from utils.utils import s3 + +templates = Jinja2Templates(directory="templates") + +router = APIRouter( + prefix="/album", +) + + +@router.get("") +async def upload_get(request: Request, db: Session = Depends(get_db)): + user = get_current_user(request) + if not user: + return RedirectResponse(url="/user/login") + + album_list = crud.get_uploads(db=db, user_id=user.user_id) + print(album_list[0].completes[0].completed) + return templates.TemplateResponse( + "album_list.html", + {"request": request, "token": user.email, "album_list": album_list}, + ) + + +@router.post("") +async def modify_name( + request: Request, + check_code: str = Form(...), + upload_id: Optional[int] = Form(...), + origin_name: Optional[str] = Form(None), + new_name: Optional[str] = Form(None), + is_real_time: Optional[bool] = Form(None), + db: Session = Depends(get_db), +): + user = get_current_user(request) + + if check_code == "edit": + upload_info = ( + db.query(models.Upload) + .filter( + (models.Upload.name == origin_name) + & (models.Upload.upload_id == upload_id) + ) + .first() + ) + upload_info.name = new_name + + db.add(upload_info) + db.commit() + db.refresh(upload_info) + elif check_code == "delete": + upload_info = crud.get_upload(db, upload_id) + if upload_info: + db.delete(upload_info) + + db.commit() + album_list = crud.get_uploads(db=db, user_id=user.user_id) + + return templates.TemplateResponse( + "album_list.html", + {"request": request, "token": user.email, "album_list": album_list}, + ) + + +@router.get("/details") +async def upload_get_one( + request: Request, + user_id: int = Query(...), + upload_id: int = Query(...), + db: Session = Depends(get_db), +): + + user = get_current_user(request) + + video_info = { + "user_id": user_id, + "upload_id": upload_id, + "date": None, + "upload_name": None, + "is_realtime": None, + "video_id": None, + "video_url": None, + "frame_urls": None, + "score_url": None, + "complete": None, + } + + video = crud.get_video(db=db, upload_id=upload_id) + video_info["video_id"] = video.video_id + uploaded = crud.get_upload(db=db, upload_id=video.upload_id) + video_info["upload_name"] = uploaded.name + video_info["is_realtime"] = uploaded.is_realtime + video_info["date"] = uploaded.date.strftime("%Y-%m-%d %H:%M:%S") + + # frames = crud.get_frames(db=db, video_id=video.video_id) + frames = crud.get_frames_with_highest_score(db=db, video_id=video.video_id) + frame_ids = [frame.frame_id for frame in frames] + frame_urls = [frame.frame_url for frame in frames] + frame_timestamps = [frame.time_stamp for frame in frames] + frame_objs = [] + + video_obj = s3.generate_presigned_url( + "get_object", + Params={"Bucket": settings.BUCKET, "Key": video.video_url}, + ExpiresIn=3600, + ) + + video_info["video_url"] = video_obj + video_info["complete"] = crud.get_complete(db=db, upload_id=upload_id).completed + if not video_info["complete"]: + return templates.TemplateResponse( + "album_detail.html", + { + "request": request, + "token": user.email, + "video_info": video_info, + "loading": True, + }, + ) + + if frame_ids != []: + for frame_id, frame_url, frame_timestamp in zip( + frame_ids, frame_urls, frame_timestamps + ): + frame_obj = s3.generate_presigned_url( + "get_object", + Params={"Bucket": settings.BUCKET, "Key": frame_url}, + ExpiresIn=3600, + ) + frame_objs.append( + (frame_id, frame_obj, frame_timestamp.strftime("%H:%M:%S")) + ) + + score_graph_url = "/".join(frame_urls[0].split("/")[:-1]) + "/score_graph.png" + score_obj = s3.generate_presigned_url( + "get_object", + Params={"Bucket": settings.BUCKET, "Key": score_graph_url}, + ExpiresIn=3600, + ) + + video_info["frame_urls"] = frame_objs + video_info["score_url"] = score_obj + + # print(video_info) + return templates.TemplateResponse( + "album_detail.html", + { + "request": request, + "token": user.email, + "video_info": video_info, + "loading": False, + }, + ) + + +@router.get("/details/images") +async def image_get( + request: Request, frame_id: int = Query(...), db: Session = Depends(get_db) +): + + user = get_current_user(request) + frame = crud.get_frame(db=db, frame_id=frame_id) + frame_obj = s3.generate_presigned_url( + "get_object", + Params={"Bucket": settings.BUCKET, "Key": frame.frame_url}, + ExpiresIn=3600, + ) + print(frame_obj) + print(frame.box_kp_json) + frame_info = { + "frame_url": frame_obj, + "time_stamp": frame.time_stamp, + "frame_json": frame.box_kp_json, + } + + return templates.TemplateResponse( + "frame.html", + {"request": request, "token": user.email, "frame_info": frame_info}, + ) diff --git a/app/api/inference_router.py b/app/api/inference_router.py new file mode 100644 index 0000000..28510db --- /dev/null +++ b/app/api/inference_router.py @@ -0,0 +1,165 @@ +import asyncio +import json +import os +import sys +from datetime import date, datetime, timedelta + +import cv2 +import numpy as np +import pytz +from cap_from_youtube import cap_from_youtube +from fastapi import BackgroundTasks, Depends, FastAPI, WebSocket, WebSocketDisconnect +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +from sqlalchemy.orm import Session + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(current_dir) + +sys.path.append(parent_dir) + +from database import crud +from database.database import get_db +from inference.rt_anomaly_detector_lstmae import RT_AnomalyDetector +from utils.config import settings +from utils.utils import run_model, s3 + +app = FastAPI() + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +class ModelInfo(BaseModel): + user_id: int + upload_id: int + threshold: float + video_uuid_name: str + video_ext: str + video_id: int + video_url: str + + +@app.get("/") +def root(): + return {"message": "모델이 돌아가용"} + + +# 녹화영상용 +@app.post("/run_model") +async def run_model_endpoint( + info: ModelInfo, background_tasks: BackgroundTasks, db: Session = Depends(get_db) +): + + info = info.dict() + + def run_model_task(): + run_model(info["video_url"], info, settings, db) + + background_tasks.add_task(run_model_task) + + return {"message": "Model execution started."} + + +# 메일을 보내야하는지 판단하는 함수 +async def check_and_send_email(db, video_id, user_id, last_point, smtp): + global last_emailed_time + + frames = crud.get_frames_with_highest_score(db=db, video_id=video_id) + frame_timestamps = [frame.time_stamp.strftime("%H:%M:%S") for frame in frames] + + if len(frame_timestamps) < 6: + return False + + last = datetime.strptime(frame_timestamps[-2], "%H:%M:%S") + check = datetime.strptime(frame_timestamps[-6], "%H:%M:%S") + + if (last - check) == timedelta(seconds=4): # 연속적으로 5초간 지속되면 + if not check <= last_point <= last: + crud.send_email( + db, frame_timestamps[-6], frame_timestamps[-2], user_id, smtp + ) + last_emailed_time = last + + +# 과연 웹 서버와 실시간을 분리하는 것이 더 빠른가? +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket, db: Session = Depends(get_db)): + + await websocket.accept() + smtp = await crud.create_smtp_server() + + try: + video_info_str = await websocket.receive_text() + print("Received video info:", video_info_str) + video_info = json.loads(video_info_str) + global detector, last_emailed_time + if detector is None: + detector = RT_AnomalyDetector(video_info, s3, settings, db, websocket) + detector.ready() + + if video_info["video_url"] == "web": + while True: + timestamp = datetime.now(pytz.timezone("Asia/Seoul")) + # Receive bytes from the websocket + bytes = await websocket.receive_bytes() + data = np.frombuffer(bytes, dtype=np.uint8) + frame = cv2.imdecode(data, cv2.IMREAD_COLOR) + await detector.run(frame, timestamp) + await check_and_send_email( + db=db, + video_id=video_info["video_id"], + user_id=video_info["user_id"], + last_point=last_emailed_time, + smtp=smtp, + ) + + else: + if "youtube" in video_info["video_url"]: + cap = cap_from_youtube(video_info["video_url"], "240p") + + else: + cap = cv2.VideoCapture(video_info["video_url"]) + + while True: + success, frame = cap.read() + if not success: + await websocket.send_text(f"카메라 연결에 실패했습니다.") + break + else: + timestamp = datetime.now(pytz.timezone("Asia/Seoul")) + await detector.run(frame, timestamp) + await check_and_send_email( + db=db, + video_id=video_info["video_id"], + user_id=video_info["user_id"], + last_point=last_emailed_time, + smtp=smtp, + ) + + ret, buffer = cv2.imencode(".jpg", frame) + await websocket.send_bytes(buffer.tobytes()) + + await asyncio.sleep(0.042) + + except WebSocketDisconnect: + await websocket.close() + await smtp.quit() + + except Exception as e: + # 예외 발생 시 로그 기록 및 연결 종료 + print(f"WebSocket error: {e}") + await websocket.close() + await smtp.quit() + + finally: + try: + detector.upload_score_graph_s3() + except: + pass + detector = None diff --git a/app/api/real_time_router.py b/app/api/real_time_router.py new file mode 100644 index 0000000..69b3c7c --- /dev/null +++ b/app/api/real_time_router.py @@ -0,0 +1,293 @@ +import asyncio +import json +from datetime import date, datetime, timedelta + +import cv2 +import numpy as np +import pytz +import websockets +from cap_from_youtube import cap_from_youtube +from database import crud, schemas +from database.database import get_db +from fastapi import ( + APIRouter, + Depends, + Form, + Query, + Request, + WebSocket, + WebSocketDisconnect, + status, +) +from fastapi.responses import RedirectResponse +from fastapi.templating import Jinja2Templates + +# from inference.rt_anomaly_detector import RT_AnomalyDetector +from inference.rt_anomaly_detector_lstmae import RT_AnomalyDetector +from sqlalchemy.orm import Session +from utils.config import settings +from utils.security import get_current_user +from utils.utils import s3 +from websockets.exceptions import ConnectionClosed + +templates = Jinja2Templates(directory="templates") +router = APIRouter( + prefix="/real_time", +) + +detector = None +last_emailed_time = datetime.strptime("0:00:00", "%H:%M:%S") + + +@router.get("") +async def real_time_get(request: Request): + user = get_current_user(request) + if not user: + return RedirectResponse(url="/user/login") + + return templates.TemplateResponse( + "real_time.html", {"request": request, "token": user.email} + ) + + +@router.post("") +async def realtime_post( + request: Request, + name: str = Form(...), + real_time_video: str = Form(...), + datetime: datetime = Form(...), + thr: float = Form(...), + db: Session = Depends(get_db), +): + + user = get_current_user(request) + user = crud.get_user_by_email(db=db, email=user.email) + + # Form 과 user_id 를 이용하여 upload row insert + _upload_create = schemas.UploadCreate( + name=name, date=datetime, is_realtime=True, thr=thr, user_id=user.user_id + ) + crud.create_upload(db=db, upload=_upload_create) + + # 지금 업로드된 id 획득, 클라이언트로부터 작성된 실시간 스트리밍 영상 url 획득 + uploaded = crud.get_upload_id( + db=db, user_id=user.user_id, name=name, date=datetime + )[-1] + + # db 에는 실시간임을 알 수 있게만 함 + video_url = f"{real_time_video}" + _video_create = schemas.VideoCreate( + video_url=video_url, upload_id=uploaded.upload_id + ) + crud.create_video(db=db, video=_video_create) + _complete_create = schemas.Complete(completed=True, upload_id=uploaded.upload_id) + crud.create_complete(db=db, complete=_complete_create) + + # model inference 에서 사용할 정보 + info = { + "user_id": user.user_id, + "email": user.email, + "upload_id": uploaded.upload_id, + "name": uploaded.name, + "date": uploaded.date, + "threshold": uploaded.thr, + "video_url": video_url, + "video_id": crud.get_video(db=db, upload_id=uploaded.upload_id).video_id, + } + + redirect_url = ( + f"/real_time/stream?user_id={info['user_id']}&upload_id={info['upload_id']}" + ) + + return RedirectResponse(url=redirect_url, status_code=status.HTTP_303_SEE_OTHER) + + +@router.get("/stream") +async def get_stream( + request: Request, + user_id: int = Query(...), + upload_id: int = Query(...), + db: Session = Depends(get_db), +): + + user = get_current_user(request) + + video = crud.get_video(db=db, upload_id=upload_id) + uploaded = crud.get_upload(db=db, upload_id=video.upload_id) + + video_info = { + "user_id": user_id, + "upload_id": upload_id, + "date": uploaded.date.strftime("%Y-%m-%d %H:%M:%S"), + "upload_name": uploaded.name, + "thr": uploaded.thr, + "video_id": video.video_id, + "video_url": video.video_url, + "is_realtime": True, + "model_server_ip": settings.STREAM_MODEL_SERVER_IP, + } + + # video_info = json.dumps(video_info) + + return templates.TemplateResponse( + "stream.html", + {"request": request, "token": user.email, "video_info": video_info}, + ) + + +# 메일을 보내야하는지 판단하는 함수 +async def check_and_send_email(db, video_id, user_id, last_point, smtp): + global last_emailed_time + + frames = crud.get_frames_with_highest_score(db=db, video_id=video_id) + frame_timestamps = [frame.time_stamp.strftime("%H:%M:%S") for frame in frames] + + if len(frame_timestamps) < 6: + return False + + last = datetime.strptime(frame_timestamps[-2], "%H:%M:%S") + check = datetime.strptime(frame_timestamps[-6], "%H:%M:%S") + + if (last - check) == timedelta(seconds=4): # 연속적으로 5초간 지속되면 + if not check <= last_point <= last: + crud.send_email( + db, frame_timestamps[-6], frame_timestamps[-2], user_id, smtp + ) + last_emailed_time = last + + +@router.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket, db: Session = Depends(get_db)): + await websocket.accept() + smtp = await crud.create_smtp_server() + + try: + video_info_str = await websocket.receive_text() + print("Received video info:", video_info_str) + video_info = json.loads(video_info_str) + global detector, last_emailed_time + if detector is None: + detector = RT_AnomalyDetector(video_info, s3, settings, db, websocket) + detector.ready() + + if video_info["video_url"] == "web": + while True: + timestamp = datetime.now(pytz.timezone("Asia/Seoul")) + # Receive bytes from the websocket + bytes = await websocket.receive_bytes() + data = np.frombuffer(bytes, dtype=np.uint8) + frame = cv2.imdecode(data, cv2.IMREAD_COLOR) + await detector.run(frame, timestamp) + await check_and_send_email( + db=db, + video_id=video_info["video_id"], + user_id=video_info["user_id"], + last_point=last_emailed_time, + smtp=smtp, + ) + + else: + if "youtube" in video_info["video_url"]: + cap = cap_from_youtube(video_info["video_url"], "240p") + + else: + cap = cv2.VideoCapture(video_info["video_url"]) + + while True: + success, frame = cap.read() + if not success: + await websocket.send_text(f"카메라 연결에 실패했습니다.") + break + else: + timestamp = datetime.now(pytz.timezone("Asia/Seoul")) + await detector.run(frame, timestamp) + await check_and_send_email( + db=db, + video_id=video_info["video_id"], + user_id=video_info["user_id"], + last_point=last_emailed_time, + smtp=smtp, + ) + + ret, buffer = cv2.imencode(".jpg", frame) + await websocket.send_bytes(buffer.tobytes()) + + await asyncio.sleep(0.042) + + except WebSocketDisconnect: + await websocket.close() + await smtp.quit() + + except Exception as e: + # 예외 발생 시 로그 기록 및 연결 종료 + print(f"WebSocket error: {e}") + await websocket.close() + await smtp.quit() + + finally: + try: + detector.upload_score_graph_s3() + except: + pass + detector = None + + +@router.get("/stream") +async def get_stream( + request: Request, + user_id: int = Query(...), + upload_id: int = Query(...), + db: Session = Depends(get_db), +): + + user = get_current_user(request) + + video = crud.get_video(db=db, upload_id=upload_id) + uploaded = crud.get_upload(db=db, upload_id=video.upload_id) + + video_info = { + "user_id": user_id, + "upload_id": upload_id, + "date": uploaded.date.strftime("%Y-%m-%d %H:%M:%S"), + "upload_name": uploaded.name, + "thr": uploaded.thr, + "video_id": video.video_id, + "video_url": video.video_url, + "is_realtime": True, + } + + # video_info = json.dumps(video_info) + + return templates.TemplateResponse( + "stream.html", + {"request": request, "token": user.email, "video_info": video_info}, + ) + + +# db 에서 실시간에서 저장되는 frame url 불러오는 코드 +def fetch_data(db, upload_id): + + video = crud.get_video(db=db, upload_id=upload_id) + frames = crud.get_frames_with_highest_score(db=db, video_id=video.video_id) + frame_ids = [frame.frame_id for frame in frames] + frame_urls = [frame.frame_url for frame in frames] + frame_timestamps = [frame.time_stamp for frame in frames] + frame_objs = [] + + for frame_id, frame_url, frame_timestamp in zip( + frame_ids, frame_urls, frame_timestamps + ): + frame_obj = s3.generate_presigned_url( + "get_object", + Params={"Bucket": settings.BUCKET, "Key": frame_url}, + ExpiresIn=3600, + ) + frame_objs.append((frame_id, frame_obj, frame_timestamp.strftime("%H:%M:%S"))) + + return {"frame_urls": frame_objs} + + +@router.get("/fetch_data") +async def fetch_frame_data(upload_id: int = Query(...), db: Session = Depends(get_db)): + frame_data = fetch_data(db, upload_id) + return frame_data diff --git a/app/api/upload_router.py b/app/api/upload_router.py new file mode 100644 index 0000000..6a73ddc --- /dev/null +++ b/app/api/upload_router.py @@ -0,0 +1,141 @@ +import os +import uuid +from datetime import datetime + +import requests +from database import crud, schemas +from database.database import get_db +from fastapi import ( + APIRouter, + BackgroundTasks, + Depends, + File, + Form, + HTTPException, + Request, + Response, + UploadFile, + status, +) +from fastapi.responses import RedirectResponse +from fastapi.templating import Jinja2Templates +from sqlalchemy.orm import Session +from utils.config import settings +from utils.security import get_current_user +from utils.utils import s3 + +templates = Jinja2Templates(directory="templates") + +router = APIRouter( + prefix="/upload", +) + + +@router.get("") +async def upload_get(request: Request): + user = get_current_user(request) + err_msg = {"file_ext": None} + if not user: + return RedirectResponse(url="/user/login") + + return templates.TemplateResponse( + "upload.html", {"request": request, "token": user.email, "err": err_msg} + ) + + +@router.post("") +async def upload_post( + request: Request, + name: str = Form(...), + upload_file: UploadFile = File(...), + datetime: datetime = Form(...), + thr: float = Form(...), + db: Session = Depends(get_db), +): + + user = get_current_user(request) + err_msg = {"file_ext": None} + + if not user: + return RedirectResponse(url="/user/login") + + file_ext = os.path.splitext(upload_file.filename)[-1] + if file_ext != ".mp4": + err_msg["file_ext"] = "파일 형식이 다릅니다.(mp4만 지원 가능)" + return templates.TemplateResponse( + "upload.html", {"request": request, "token": user.email, "err": err_msg} + ) + + _upload_create = schemas.UploadCreate( + name=name, date=datetime, is_realtime=False, thr=thr, user_id=user.user_id + ) + crud.create_upload(db=db, upload=_upload_create) + + uploaded = crud.get_upload_id( + db=db, user_id=user.user_id, name=name, date=datetime + )[-1] + + video_name = uuid.uuid1() + + # model inference 에서 s3 에 올릴 주소 그대로 db 에 insert + video_url = f"video/{user.user_id}/{uploaded.upload_id}/{video_name}{file_ext}" + _video_create = schemas.VideoCreate( + video_url=video_url, upload_id=uploaded.upload_id + ) + crud.create_video(db=db, video=_video_create) + _complete_create = schemas.Complete(completed=False, upload_id=uploaded.upload_id) + crud.create_complete(db=db, complete=_complete_create) + + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="video 를 s3 저장소 업로드에 실패했습니다.", + ) + + try: + s3.upload_fileobj( + upload_file.file, + settings.BUCKET, + video_url, + ExtraArgs={"ContentType": "video/mp4"}, + ) + except: + raise s3_upload_exception + + info = { + "user_id": user.user_id, + "email": user.email, + "upload_id": uploaded.upload_id, + "name": name, + "date": datetime, + "threshold": uploaded.thr, + "video_name": upload_file.filename, + "video_uuid_name": video_name, + "video_ext": file_ext, + "video_id": crud.get_video(db=db, upload_id=uploaded.upload_id).video_id, + "video_url": video_url, + } + + model_data = { + "user_id": user.user_id, + "upload_id": uploaded.upload_id, + "threshold": uploaded.thr, + "video_uuid_name": str(video_name), + "video_ext": file_ext, + "video_id": crud.get_video(db=db, upload_id=uploaded.upload_id).video_id, + "video_url": video_url, + } + + model_server_url = settings.UPLOAD_MODEL_SERVER_IP + try: + response = requests.post(model_server_url, json=model_data) + response.raise_for_status() # 응답 상태 코드가 200이 아닌 경우 예외 발생 + print("Model execution started successfully.") + except requests.RequestException: + e = "모델 서버에서 오류가 발생했습니다." + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=e) + + redirect_url = ( + f"/album/details?user_id={info['user_id']}&upload_id={info['upload_id']}" + ) + + return RedirectResponse(url=redirect_url, status_code=status.HTTP_303_SEE_OTHER) diff --git a/app/api/user_router.py b/app/api/user_router.py new file mode 100644 index 0000000..d0e107f --- /dev/null +++ b/app/api/user_router.py @@ -0,0 +1,92 @@ +from database import crud, models +from database.database import get_db +from fastapi import APIRouter, Depends, Request +from fastapi.responses import RedirectResponse +from fastapi.templating import Jinja2Templates +from sqlalchemy.orm import Session +from utils.security import pwd_context + +templates = Jinja2Templates(directory="templates") +router = APIRouter(prefix="/user") + + +@router.get("/signup") +async def signup_get(request: Request, db: Session = Depends(get_db)): + err_msg = {"user": None, "pw": None, "check_pw": None} + return templates.TemplateResponse( + "signup.html", {"request": request, "err": err_msg} + ) + + +@router.post("/signup") +async def signup_post(request: Request, db: Session = Depends(get_db)): + body = await request.form() + user, pw, check_pw = body["email"], body["pw"], body["check_pw"] + err_msg = {"user": None, "pw": None, "check_pw": None} + + if not user: + err_msg["user"] = "empty email" + elif not pw: + err_msg["pw"] = "empty password" + elif pw != check_pw: + err_msg["check_pw"] = "not equal password and check_password" + else: + user = db.query(models.User).filter(models.User.email == body["email"]).first() + + if user: + err_msg["user"] = "invalid email" + else: + user_info = models.User(email=body["email"], password=body["pw"]) + + crud.create_user(db, user_info) + return RedirectResponse(url="/user/login") + + return templates.TemplateResponse( + "signup.html", {"request": request, "err": err_msg} + ) + + +@router.get("/login") +async def login_get(request: Request): + err_msg = {"user": None, "pw": None} + return templates.TemplateResponse( + "login.html", {"request": request, "err": err_msg} + ) + + +@router.post("/login") +async def login_post(request: Request, db: Session = Depends(get_db)): + body = await request.form() + user, pw = body["email"], body["pw"] + err_msg = {"user": None, "pw": None} + + if body.get("check_pw", None): + return templates.TemplateResponse( + "login.html", {"request": request, "err": err_msg} + ) + + if not user: + err_msg["user"] = "empty email" + elif not pw: + err_msg["pw"] = "empty password" + else: + user = db.query(models.User).filter(models.User.email == body["email"]).first() + if not user: + err_msg["user"] = "invalid email" + elif not pwd_context.verify(body["pw"], user.password): + err_msg["pw"] = "invalid password" + else: + return RedirectResponse(url="/") + + return templates.TemplateResponse( + "login.html", {"request": request, "err": err_msg} + ) + + +@router.get("/logout") +async def logout_get(request: Request): + access_token = request.cookies.get("access_token", None) + template = RedirectResponse(url="/") + if access_token: + template.delete_cookie(key="access_token") + return template diff --git a/app/database/__init__.py b/app/database/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/database/crud.py b/app/database/crud.py new file mode 100644 index 0000000..0f66ee6 --- /dev/null +++ b/app/database/crud.py @@ -0,0 +1,236 @@ +import smtplib +from datetime import date, datetime, timedelta +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +from database import models +from database.schemas import ( + Complete, + FrameCreate, + UploadCreate, + UserBase, + UserCreate, + VideoCreate, +) +from sqlalchemy import func +from sqlalchemy.orm import Session, aliased +from utils.config import settings +from utils.security import get_password_hash, verify_password + +# from email.mime.image import MIMEImage +# from passlib.context import CryptContext + + + +## User +def create_user(db: Session, user: UserCreate): + hashed_password = get_password_hash(user.password) + db_user = models.User(email=user.email, password=hashed_password) + db.add(db_user) + db.commit() + db.refresh(db_user) + return db_user + + +def get_user(db: Session, user_id: int): + return db.query(models.User).filter(models.User.user_id == user_id).first() + + +def get_user_by_email(db: Session, email: str): + return db.query(models.User).filter(models.User.email == email).first() + + +def get_existing_user(db: Session, user_create: UserCreate): + return ( + db.query(models.User).filter((models.User.email == user_create.email)).first() + ) + + +def authenticate(db: Session, *, email: str, password: str): + user = get_user_by_email(db, email=email) + if not user: + return None + if not verify_password(password, user.password): + return None + return user + + +def is_active(user: UserBase) -> bool: + return user.is_active + + +## Upload +def create_upload(db: Session, upload: UploadCreate): + db_upload = models.Upload(**upload.dict()) + db.add(db_upload) + db.commit() + db.refresh(db_upload) + return db_upload + + +def delete_upload(db: Session, upload_id: int): + db_upload = ( + db.query(models.Upload).filter(models.Upload.upload_id == upload_id).first() + ) + + if db_upload: + db.delete(db_upload) + db.commit() + return True + return False + + +def get_upload(db: Session, upload_id: int): + return db.query(models.Upload).filter(models.Upload.upload_id == upload_id).first() + + +def get_upload_id( + db: Session, + user_id: int, + name: str, + date: datetime, +): + return ( + db.query(models.Upload) + .filter( + (models.Upload.user_id == user_id) + & (models.Upload.name == name) + & (models.Upload.date == date) + ) + .all() + ) + + +def get_uploads(db: Session, user_id: int): + return ( + db.query(models.Upload) + .filter(models.Upload.user_id == user_id) + .order_by(models.Upload.upload_id.desc()) + .all() + ) + + +def get_upload_by_name(db: Session, name: str): + return db.query(models.Upload).filter(models.Upload.name == name).first() + + +## Video +def create_video(db: Session, video: VideoCreate): + db_video = models.Video(**video.dict()) + db.add(db_video) + db.commit() + db.refresh(db_video) + return db_video + + +def get_video(db: Session, upload_id: int): + return db.query(models.Video).filter(models.Video.upload_id == upload_id).first() + + +## Frame +def create_frame(db: Session, frame: FrameCreate): + db_frame = models.Frame(**frame.dict()) + db.add(db_frame) + db.commit() + db.refresh(db_frame) + return db_frame + + +def get_frame(db: Session, frame_id: int): + return db.query(models.Frame).filter(models.Frame.frame_id == frame_id).first() + + +def get_frames(db: Session, video_id: int): + return db.query(models.Frame).filter(models.Frame.video_id == video_id).all() + + +def get_frames_with_highest_score(db: Session, video_id: int): + + subquery = ( + db.query( + models.Frame.video_id, + models.Frame.time_stamp, + func.max(models.Frame.score).label("max_score"), + ) + .group_by(models.Frame.video_id, models.Frame.time_stamp) + .subquery() + ) + + subq_alias = aliased(subquery) + + frames = ( + db.query(models.Frame) + .join( + subq_alias, + (models.Frame.video_id == subq_alias.c.video_id) + & (models.Frame.time_stamp == subq_alias.c.time_stamp) + & (models.Frame.score == subq_alias.c.max_score), + ) + .filter(models.Frame.video_id == video_id) + .all() + ) + + return frames + + +def create_complete(db: Session, complete: Complete): + db_complete = models.Complete(**complete.dict()) + db.add(db_complete) + db.commit() + db.refresh(db_complete) + return db_complete + + +def get_complete(db: Session, upload_id: int): + return ( + db.query(models.Complete).filter(models.Complete.upload_id == upload_id).first() + ) + + +def update_complete_status(db: Session, upload_id: int): + + complete_record = ( + db.query(models.Complete).filter(models.Complete.upload_id == upload_id).first() + ) + + if complete_record and not complete_record.completed: + complete_record.completed = True + db.commit() + + +# email feat +async def create_smtp_server(): + + smtp = smtplib.SMTP_SSL( + settings.SMTP_ADDRESS, settings.SMTP_PORT + ) # smtp 서버와 연결 + smtp.login( + settings.MAIL_ACCOUNT, settings.MAIL_PASSWORD + ) # 프로젝트 계정으로 로그인 + + return smtp + + +def send_email(db, check, last, user_id, smtp): + user = get_user(db, user_id) + + # 메일 기본 정보 설정 + msg = MIMEMultipart() + msg["subject"] = f"[IVT] 이상행동 분석 중 결과 전달 메일입니다." + msg["from"] = settings.MAIL_ACCOUNT + msg["To"] = user.email + + # 메일 본문 내용 + content = f"""안녕하세요. Naver AI Tech 6기 '혁신비전테크' 팀 입니다. + +실시간 이상행동 탐지 중 이상행동이 발견되어 해당 시간대를 전달 드립니다. + +{check} ~ {last} 시간을 확인해주세요. +""" + + content_part = MIMEText(content, "plain") + msg.attach(content_part) + + smtp.sendmail(settings.MAIL_ACCOUNT, user.email, msg.as_string()) + + # 탐지 이미지 첨부 (향후 업데이트 예정) diff --git a/app/database/database.py b/app/database/database.py new file mode 100644 index 0000000..e34c55e --- /dev/null +++ b/app/database/database.py @@ -0,0 +1,26 @@ +from sqlalchemy import create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker +from utils.config import settings + +SQLALCHEMY_DATABASE_URL = "mysql+pymysql://{}:{}@{}:{}/{}".format( + settings.MYSQL_SERVER_USER, + settings.MYSQL_SERVER_PASSWORD, + settings.MYSQL_SERVER_IP, + settings.MYSQL_SERVER_PORT, + settings.MYSQL_DATABASE, +) + +engine = create_engine(SQLALCHEMY_DATABASE_URL, echo=True) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +Base = declarative_base() + + +# Dependency +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() diff --git a/app/database/models.py b/app/database/models.py new file mode 100644 index 0000000..b201bb8 --- /dev/null +++ b/app/database/models.py @@ -0,0 +1,77 @@ +from database.database import Base +from sqlalchemy import ( + JSON, + Boolean, + Column, + DateTime, + Float, + ForeignKey, + Integer, + String, + Time, +) +from sqlalchemy.orm import relationship + + +class User(Base): + __tablename__ = "user" + + user_id = Column(Integer, primary_key=True, index=True, autoincrement=True) + email = Column(String(50), unique=True, nullable=False) + password = Column(String(200), nullable=False) + is_active = Column(Boolean, default=True) + + uploads = relationship("Upload", back_populates="user") + + +class Upload(Base): + __tablename__ = "upload" + + upload_id = Column(Integer, primary_key=True, index=True, autoincrement=True) + name = Column(String(50), nullable=False) + date = Column(DateTime, nullable=False) + is_realtime = Column(Boolean, default=False) + thr = Column(Float, nullable=False) + user_id = Column(Integer, ForeignKey("user.user_id"), nullable=False) + + user = relationship("User", back_populates="uploads") + videos = relationship( + "Video", back_populates="upload", cascade="all, delete-orphan" + ) + completes = relationship( + "Complete", back_populates="upload", cascade="all, delete-orphan" + ) + + +class Video(Base): + __tablename__ = "video" + + video_id = Column(Integer, primary_key=True, index=True, autoincrement=True) + video_url = Column(String(255), nullable=False) + upload_id = Column(Integer, ForeignKey("upload.upload_id"), nullable=False) + + upload = relationship("Upload", back_populates="videos") + frames = relationship("Frame", back_populates="video", cascade="all, delete-orphan") + + +class Frame(Base): + __tablename__ = "frame" + + frame_id = Column(Integer, primary_key=True, index=True, autoincrement=True) + frame_url = Column(String(255), nullable=False) + time_stamp = Column(Time, nullable=False) + box_kp_json = Column(JSON, nullable=False) + score = Column(Float, nullable=False) + video_id = Column(Integer, ForeignKey("video.video_id"), nullable=False) + + video = relationship("Video", back_populates="frames") + + +class Complete(Base): + __tablename__ = "complete" + + complete_id = Column(Integer, primary_key=True, index=True, autoincrement=True) + completed = Column(Boolean, default=False) + upload_id = Column(Integer, ForeignKey("upload.upload_id"), nullable=False) + + upload = relationship("Upload", back_populates="completes") diff --git a/app/database/schemas.py b/app/database/schemas.py new file mode 100644 index 0000000..62ea29d --- /dev/null +++ b/app/database/schemas.py @@ -0,0 +1,58 @@ +from datetime import datetime, time +from typing import Dict, List, Optional + +from pydantic import BaseModel, EmailStr, field_validator +from pydantic_core.core_schema import FieldValidationInfo + + +class UserBase(BaseModel): + email: EmailStr + is_active: Optional[bool] = True + + +class UserCreate(UserBase): + password: str + + +# Upload post 스키마 +class UploadCreate(BaseModel): + name: str + date: datetime + is_realtime: Optional[bool] = None + thr: float + user_id: int + + +# Video post 스키마 +class VideoCreate(BaseModel): + video_url: str + upload_id: int + + +class Video(VideoCreate): + video_id: int + frames: List["Frame"] = [] + + class Config: + orm_mode = True + + +# Frame Post 스키마 +class FrameCreate(BaseModel): + frame_url: str + time_stamp: time + box_kp_json: Dict + score: float + video_id: int + + +class Frame(FrameCreate): + frame_id: int + + class Config: + orm_mode = True + + +class Complete(BaseModel): + completed: bool + upload_id: int diff --git a/app/inference/__init__.py b/app/inference/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/inference/anomaly_detector.py b/app/inference/anomaly_detector.py new file mode 100644 index 0000000..3c979cb --- /dev/null +++ b/app/inference/anomaly_detector.py @@ -0,0 +1,458 @@ +import json +import os +import sys +import uuid +from collections import defaultdict +from datetime import datetime, time +from io import BytesIO + +import albumentations as A +import cv2 +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import torch +import torch.nn as nn +from database import crud, schemas +from fastapi import HTTPException +from sklearn.preprocessing import MinMaxScaler +from starlette import status +from ultralytics import YOLO + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(os.path.dirname(current_dir)) + +sys.path.append(os.path.join(parent_dir, "model")) +from copy import deepcopy + +import vmae + +# @@ timm은 0.4.12 버전 사용 필수 +from timm.models import create_model + + +class AnomalyDetector: + def __init__(self, video_file, info, s3_client, settings, db): + self.video = s3_client.generate_presigned_url( + ClientMethod="get_object", + Params={"Bucket": settings.BUCKET, "Key": video_file}, + ExpiresIn=3600, + ) + # print(self.video) + self.info = info + self.s3 = s3_client + self.settings = settings + self.thr = info["threshold"] + self.video_url = f"video/{info['user_id']}/{info['upload_id']}/{info['video_uuid_name']}{info['video_ext']}" + self.frame_url_base = f"frame/{info['user_id']}/{info['upload_id']}/" + self.db = db + + def display_text(self, frame, text, position): + font = cv2.FONT_HERSHEY_SIMPLEX + font_scale = 1 + font_color = (0, 255, 0) # Green color + font_thickness = 2 + cv2.putText( + frame, + text, + position, + font, + font_scale, + font_color, + font_thickness, + cv2.LINE_AA, + ) + + def upload_frame_db(self, db, temp_for_db, frame_url): + + temp_json_path = "./temp.json" + + with open(temp_json_path, "w") as f: + json.dump(temp_for_db, f) + + with open(temp_json_path, "r") as f: + box_kp_json = json.load(f) + + _frame_create = schemas.FrameCreate( + frame_url=frame_url, + time_stamp=temp_for_db["timestamp"], + box_kp_json=box_kp_json, + score=temp_for_db["score"], + video_id=self.info["video_id"], + ) + + crud.create_frame(db=db, frame=_frame_create) + + os.remove(temp_json_path) + + def upload_frame_s3(self, s3, frame): + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Frame 을 s3 저장소 업로드에 실패했습니다.", + ) + frame_name = uuid.uuid1() + frame_url = self.frame_url_base + f"{frame_name}" + ".png" + # print(frame_url) + + try: + s3.upload_fileobj( + BytesIO(cv2.imencode(".png", frame)[1].tobytes()), + self.settings.BUCKET, + frame_url, + ExtraArgs={"ContentType": "image/png"}, + ) + except Exception as e: + # print(e) + raise s3_upload_exception + + return frame_url + + def upload_video_s3(self, s3, video): + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="video 를 s3 저장소 업로드에 실패했습니다.", + ) + video_url = self.video_url + + video_change_codec = "./temp_video_path_change_codec.mp4" + + os.system('ffmpeg -i "%s" -vcodec libx264 "%s"' % (video, video_change_codec)) + + try: + with open(video_change_codec, "rb") as video_file: + s3.upload_fileobj( + video_file, + self.settings.BUCKET, + video_url, + ExtraArgs={"ContentType": "video/mp4"}, + ) + except Exception as e: + # print(e) + raise s3_upload_exception + + os.remove(video_change_codec) + + def upload_score_graph_s3(self, s3, scores): + plt.plot(scores, color="red") + plt.title("Anomaly Scores Over Time") + plt.xlabel(" ") + plt.ylabel(" ") + + plt.xticks([]) + plt.yticks([]) + + save_path = "./model_scores_plot.png" + plt.savefig(save_path) + + with open(save_path, "rb") as image_file: + graph = image_file.read() + + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="score Graph 를 s3 저장소 업로드에 실패했습니다.", + ) + score_graph_name = "score_graph.png" + score_graph_url = self.frame_url_base + score_graph_name + + try: + s3.upload_fileobj( + BytesIO(graph), + self.settings.BUCKET, + score_graph_url, + ExtraArgs={"ContentType": "image/png"}, + ) + except: + raise s3_upload_exception + + os.remove(save_path) + + def run(self): + # YOLO + tracker_model = YOLO("yolov8n-pose.pt") + + # VMAE v2 + backbone = model = create_model( + "vit_small_patch16_224", + img_size=224, + pretrained=False, + num_classes=710, + all_frames=16, + ) + + load_dict = torch.load( + "/data/ephemeral/home/level2-3-cv-finalproject-cv-06/model/pts/vit_s_k710_dl_from_giant.pth" + ) + + backbone.load_state_dict(load_dict["module"]) + + tf = A.Resize(224, 224) + + # Define sequence_length, prediction_time, and n_features + # sequence_length = 20 + # prediction_time = 1 + # n_features = 38 + + # LSTM autoencoder + checkpoint = torch.load( + "/data/ephemeral/home/level2-3-cv-finalproject-cv-06/model/pts/MIL_20240325_202019_best_auc.pth" + ) + classifier = vmae.MILClassifier(input_dim=710, drop_p=0.3) + classifier.load_state_dict(checkpoint["model_state_dict"]) + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + tracker_model.to(device) + backbone.to(device) + backbone.eval() + classifier.to(device) + classifier.eval() + + # Define the standard frame size + standard_width = 640 + standard_height = 480 + + # Open the video file + cap = cv2.VideoCapture(self.video) + temp_name = None + if not cap.isOpened(): + temp_name = f"{uuid.uuid4()}.mp4" + self.s3.download_file(self.settings.BUCKET, self.video_url, temp_name) + cap = cv2.VideoCapture(temp_name) + fps = cap.get(cv2.CAP_PROP_FPS) + + frame_inteval = fps // 3 + + # Store the track history + track_history = defaultdict(lambda: []) + + # Initialize a dictionary to store separate buffers for each ID + # id_buffers = defaultdict(lambda: []) + + # HTML -> H.264 codec + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + + # video writer -> ID 별 score, bbox 가 나온 영상을 s3 에 업로드 + output_video_path = "./temp_video_path.mp4" + output_video = cv2.VideoWriter( + output_video_path, fourcc, fps, (standard_width, standard_height) + ) + + # Define a function to calculate MSE between two sequences + def calculate_mse(seq1, seq2): + return np.mean(np.power(seq1 - seq2, 2)) + + # anomaly threshold (default 0.02) + # threshold = self.thr + threshold = 0.3 + + # Loop through the video frames + frame_count = 0 + # net_mse = 0 + # avg_mse = 0 + # score graph 를 위한 score list + scores = [] + # vmae에 입력할 frame들을 저장할 list + v_frames = [] + + # 영상 frame 임시 저장 list + frame_list = [] + # yolo results 임시 저장 list + results_list = [] + # temp_for_db 임시 저장 list + tfdb_list = [] + + while cap.isOpened(): + # Read a frame from the video + success, frame = cap.read() + frame_count += 1 # Increment frame count + + if success: + frame = cv2.resize(frame, (standard_width, standard_height)) + + temp_for_db = { + "timestamp": None, + "bbox": {}, + "keypoints": {}, + "score": None, + } + + # track id (사람) 별로 mse 점수가 나오기 때문에 한 frame 에 여러 mse 점수가 나옴. 이를 frame 별 점수로 구하기 위해서 변수 설정 + # mse_unit = 0 + + s_timestamp = round(cap.get(cv2.CAP_PROP_POS_MSEC) / 1000, 2) + datetime_object = datetime.utcfromtimestamp(s_timestamp) + timestamp = datetime_object.strftime("%H:%M:%S") + temp_for_db["timestamp"] = timestamp + + # Run YOLOv8 tracking on the frame, persisting tracks between frames + results = tracker_model.track(frame, persist=True) + + frame_list.append(frame.copy()) + results_list.append(deepcopy(results)) + tfdb_list.append(deepcopy(temp_for_db)) + + # 1초에 3 frame만 저장해서 vmae+MIL에 사용 + if (frame_count - 1) % frame_inteval == 0: + v_frame = tf(image=frame)["image"] + # (224, 224, 3) + v_frame = np.expand_dims(v_frame, axis=0) + # (1, 224, 224, 3) + v_frames.append(v_frame.copy()) + + # 16 frame이 모이면 vmae+MIL 계산 + if len(v_frames) == 16: + in_frames = np.concatenate(v_frames) + # (16, 224, 224, 3) + in_frames = in_frames.transpose(3, 0, 1, 2) + # (RGB 3, frame T=16, H=224, W=224) + in_frames = np.expand_dims(in_frames, axis=0) + # (1, 3, 16 * segments_num, 224, 224) + in_frames = torch.from_numpy(in_frames).float() + # torch.Size([1, 3, 16, 224, 224]) + + in_frames = in_frames.to(device) + + with torch.no_grad(): + v_output = backbone(in_frames) + # torch.Size([1, 710]) + v_score = classifier(v_output) + # torch.Size([1, 1]) + scores.append(v_score.cpu().item()) + + v_frames = [] + + if len(frame_list) == 16 * frame_inteval: + for f_step, (frame_i, results_i, temp_for_db_i) in enumerate( + zip(frame_list, results_list, tfdb_list) + ): + if scores[-1] > threshold: + anomaly_text = f"Anomaly detected, score: {scores[-1]}" + if ( + results_i[0].boxes is not None + ): # Check if there are results and boxes + + # Get the boxes + boxes = results_i[0].boxes.xywh.cpu() + + if results_i[0].boxes.id is not None: + # If 'int' attribute exists (there are detections), get the track IDs + + track_ids = ( + results_i[0].boxes.id.int().cpu().tolist() + ) + + # Loop through the detections and add data to the DataFrame + # anomaly_text = "" # Initialize the anomaly text + + # vmae에서 보는 frame들만 db에 저장 + if f_step % frame_inteval == 0: + # 한 프레임에서 검출된 사람만큼 돌아가는 반복문. 2명이면 각 id 별로 아래 연산들이 진행됨. + for i, box in zip( + range(0, len(track_ids)), + results_i[0].boxes.xywhn.cpu(), + ): + + x, y, w, h = box + keypoints = ( + results_i[0] + .keypoints.xyn[i] + .cpu() + .numpy() + .flatten() + .tolist() + ) + + xywhk = np.array( + [float(x), float(y), float(w), float(h)] + + keypoints + ) + + xywhk = list( + map(lambda x: str(round(x, 4)), xywhk) + ) + + temp_for_db_i["bbox"][f"id {i}"] = " ".join( + xywhk[:4] + ) + + temp_for_db_i["keypoints"][f"id {i}"] = ( + " ".join(xywhk[4:]) + ) + + else: + # If 'int' attribute doesn't exist (no detections), set track_ids to an empty list + track_ids = [] + + # Visualize the results on the frame + annotated_frame = results_i[0].plot() + self.display_text( + annotated_frame, anomaly_text, (10, 30) + ) # Display the anomaly text + + # Plot the tracks + # for box, track_id in zip(boxes, track_ids): + # x, y, w, h = box + # track = track_history[track_id] + # track.append((float(x), float(y))) # x, y center point + # if len(track) > 30: # retain 90 tracks for 90 frames + # track.pop(0) + + # # Draw the tracking lines + # points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2)) + # cv2.polylines( + # annotated_frame, + # [points], + # isClosed=False, + # color=(230, 230, 230), + # thickness=10, + # ) + + # Display the annotated frame + output_video.write(annotated_frame) + # cv2.imshow("YOLOv8 Tracking", annotated_frame) + else: + self.display_text( + frame_i, anomaly_text, (10, 30) + ) # Display the anomaly text + output_video.write(frame_i) + + # vmae에서 보는 frame들만 db에 저장 + if f_step % frame_inteval == 0: + temp_for_db_i["score"] = scores[-1] + + # upload frame to s3 + frame_url = self.upload_frame_s3(self.s3, frame_i) + + # upload frame, ts, bbox, kp to db + self.upload_frame_db(self.db, temp_for_db_i, frame_url) + + else: + anomaly_text = "" + output_video.write(frame_i) + # cv2.imshow("YOLOv8 Tracking", frame) + + # 16 * frame_interval개 frame 표시 후 초기화 + frame_list = [] + results_list = [] + tfdb_list = [] + + else: + if len(frame_list) != 0: + for f in frame_list: + output_video.write(f) + + # Break the loop if the end of the video is reached + break + + # Release the video capture, video writer object + cap.release() + output_video.release() + # cv2.destroyAllWindows() + + # upload video to s3 + self.upload_video_s3(self.s3, output_video_path) + + # upload score graph to s3 + self.upload_score_graph_s3(self.s3, scores) + if temp_name: + os.remove(temp_name) + os.remove(output_video_path) diff --git a/app/inference/anomaly_detector_lstmae.py b/app/inference/anomaly_detector_lstmae.py new file mode 100644 index 0000000..e38e316 --- /dev/null +++ b/app/inference/anomaly_detector_lstmae.py @@ -0,0 +1,421 @@ +import json +import os +import sys +import uuid +from collections import defaultdict +from datetime import datetime, time +from io import BytesIO + +import cv2 +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import torch +from database import crud, schemas +from fastapi import HTTPException +from sklearn.preprocessing import MinMaxScaler +from starlette import status +from ultralytics import YOLO + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(os.path.dirname(current_dir)) + +sys.path.append(os.path.join(parent_dir, "model")) +from lstmae.lstm_ae import LSTMAutoEncoder + + +class AnomalyDetector: + def __init__(self, video_file, info, s3_client, settings, db): + self.video = s3_client.generate_presigned_url( + ClientMethod="get_object", + Params={"Bucket": settings.BUCKET, "Key": video_file}, + ExpiresIn=3600, + ) + # print(self.video) + self.info = info + self.s3 = s3_client + self.settings = settings + self.thr = info["threshold"] + self.video_url = f"video/{info['user_id']}/{info['upload_id']}/{info['video_uuid_name']}{info['video_ext']}" + self.frame_url_base = f"frame/{info['user_id']}/{info['upload_id']}/" + self.db = db + + def display_text(self, frame, text, position): + font = cv2.FONT_HERSHEY_SIMPLEX + font_scale = 1 + font_color = (0, 255, 0) # Green color + font_thickness = 2 + cv2.putText( + frame, + text, + position, + font, + font_scale, + font_color, + font_thickness, + cv2.LINE_AA, + ) + + def upload_frame_db(self, db, temp_for_db, frame_url): + + temp_json_path = "./temp.json" + + with open(temp_json_path, "w") as f: + json.dump(temp_for_db, f) + + with open(temp_json_path, "r") as f: + box_kp_json = json.load(f) + + _frame_create = schemas.FrameCreate( + frame_url=frame_url, + time_stamp=temp_for_db["timestamp"], + box_kp_json=box_kp_json, + score=temp_for_db["score"], + video_id=self.info["video_id"], + ) + + crud.create_frame(db=db, frame=_frame_create) + + os.remove(temp_json_path) + + def upload_frame_s3(self, s3, frame): + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Frame 을 s3 저장소 업로드에 실패했습니다.", + ) + frame_name = uuid.uuid1() + frame_url = self.frame_url_base + f"{frame_name}" + ".png" + # print(frame_url) + + try: + s3.upload_fileobj( + BytesIO(cv2.imencode(".png", frame)[1].tobytes()), + self.settings.BUCKET, + frame_url, + ExtraArgs={"ContentType": "image/png"}, + ) + except Exception as e: + # print(e) + raise s3_upload_exception + + return frame_url + + def upload_video_s3(self, s3, video): + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="video 를 s3 저장소 업로드에 실패했습니다.", + ) + video_url = self.video_url + + video_change_codec = "./temp_video_path_change_codec.mp4" + + os.system('ffmpeg -i "%s" -vcodec libx264 "%s"' % (video, video_change_codec)) + + try: + with open(video_change_codec, "rb") as video_file: + s3.upload_fileobj( + video_file, + self.settings.BUCKET, + video_url, + ExtraArgs={"ContentType": "video/mp4"}, + ) + except Exception as e: + # print(e) + raise s3_upload_exception + + os.remove(video_change_codec) + + def upload_score_graph_s3(self, s3, scores): + plt.plot(scores, color="red") + plt.title("Anomaly Scores Over Time") + plt.xlabel(" ") + plt.ylabel(" ") + + plt.xticks([]) + plt.yticks([]) + + save_path = "./model_scores_plot.png" + plt.savefig(save_path) + + with open(save_path, "rb") as image_file: + graph = image_file.read() + + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="score Graph 를 s3 저장소 업로드에 실패했습니다.", + ) + score_graph_name = "score_graph.png" + score_graph_url = self.frame_url_base + score_graph_name + + try: + s3.upload_fileobj( + BytesIO(graph), + self.settings.BUCKET, + score_graph_url, + ExtraArgs={"ContentType": "image/png"}, + ) + except: + raise s3_upload_exception + + os.remove(save_path) + + def run(self): + # YOLO + tracker_model = YOLO("yolov8n-pose.pt") + + # Define sequence_length, prediction_time, and n_features + sequence_length = 20 + prediction_time = 1 + n_features = 38 + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + # LSTM autoencoder + checkpoint = torch.load( + "/data/ephemeral/home/level2-3-cv-finalproject-cv-06/model/pts/LSTM_20240324_222238_best.pth" + ) + autoencoder_model = LSTMAutoEncoder( + num_layers=2, hidden_size=50, n_features=n_features, device=device + ) + autoencoder_model.load_state_dict(checkpoint["model_state_dict"]) + tracker_model.to(device) + autoencoder_model.to(device) + + # Define the standard frame size + standard_width = 640 + standard_height = 480 + + # Open the video file + cap = cv2.VideoCapture(self.video) + if not cap.isOpened(): + temp_name = f"{uuid.uuid4()}.mp4" + self.s3.download_file(self.settings.BUCKET, self.video_url, temp_name) + cap = cv2.VideoCapture(temp_name) + fps = cap.get(cv2.CAP_PROP_FPS) + + # Store the track history + track_history = defaultdict(lambda: []) + + # Initialize a dictionary to store separate buffers for each ID + id_buffers = defaultdict(lambda: []) + + # HTML -> H.264 codec + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + + # video writer -> ID 별 score, bbox 가 나온 영상을 s3 에 업로드 + output_video_path = "./temp_video_path.mp4" + output_video = cv2.VideoWriter( + output_video_path, fourcc, fps, (standard_width, standard_height) + ) + + # Define a function to calculate MSE between two sequences + def calculate_mse(seq1, seq2): + return np.mean(np.power(seq1 - seq2, 2)) + + # anomaly threshold (default 0.02) + threshold = self.thr + + # Loop through the video frames + frame_count = 0 + net_mse = 0 + avg_mse = 0 + # score graph 를 위한 score list + scores = [] + + while cap.isOpened(): + # Read a frame from the video + success, frame = cap.read() + frame_count += 1 # Increment frame count + + if success: + frame = cv2.resize(frame, (standard_width, standard_height)) + + temp_for_db = { + "timestamp": None, + "bbox": {}, + "keypoints": {}, + "score": None, + } + + # track id (사람) 별로 mse 점수가 나오기 때문에 한 frame 에 여러 mse 점수가 나옴. 이를 frame 별 점수로 구하기 위해서 변수 설정 + mse_unit = 0 + + s_timestamp = round(cap.get(cv2.CAP_PROP_POS_MSEC) / 1000, 2) + datetime_object = datetime.utcfromtimestamp(s_timestamp) + timestamp = datetime_object.strftime("%H:%M:%S") + temp_for_db["timestamp"] = timestamp + + # Run YOLOv8 tracking on the frame, persisting tracks between frames + results = tracker_model.track(frame, persist=True) + + if results[0].boxes is not None: # Check if there are results and boxes + + # Get the boxes + boxes = results[0].boxes.xywh.cpu() + + if results[0].boxes.id is not None: + # If 'int' attribute exists (there are detections), get the track IDs + + track_ids = results[0].boxes.id.int().cpu().tolist() + + # Loop through the detections and add data to the DataFrame + anomaly_text = "" # Initialize the anomaly text + + # 한 프레임에서 검출된 사람만큼 돌아가는 반복문. 2명이면 각 id 별로 아래 연산들이 진행됨. + for i, box in zip( + range(0, len(track_ids)), results[0].boxes.xywhn.cpu() + ): + + x, y, w, h = box + keypoints = ( + results[0] + .keypoints.xyn[i] + .cpu() + .numpy() + .flatten() + .tolist() + ) + + # Append the keypoints to the corresponding ID's buffer + # bbox(4), keypoints per id(34) + id_buffers[track_ids[i]].append( + [float(x), float(y), float(w), float(h)] + keypoints + ) + + # If the buffer size reaches the threshold (e.g., 20 data points), perform anomaly detection + # track_id 별 20프레임이 쌓이면 아래 연산 진행 + if len(id_buffers[track_ids[i]]) >= 20: + # Convert the buffer to a NumPy array + buffer_array = np.array(id_buffers[track_ids[i]]) + + # Scale the data (you can use the same scaler you used during training) + scaler = MinMaxScaler() + buffer_scaled = scaler.fit_transform(buffer_array) + + # Create sequences for prediction + # x_pred: [1,20,38] + x_pred = buffer_scaled[-sequence_length:].reshape( + 1, sequence_length, n_features + ) + + # Predict the next values using the autoencoder model + x_pred = torch.tensor(x_pred, dtype=torch.float32).to( + device + ) + x_pred = autoencoder_model.forward(x_pred) + + # Inverse transform the predicted data to the original scale + x_pred_original = scaler.inverse_transform( + x_pred.cpu() + .detach() + .numpy() + .reshape(-1, n_features) + ) + + # Calculate the MSE between the predicted and actual values + mse = calculate_mse( + buffer_array[-prediction_time:], x_pred_original + ) + + # print(mse) + + net_mse = mse + net_mse + avg_mse = net_mse / frame_count + + mse_unit += mse + + # Check if the MSE exceeds the threshold to detect an anomaly + if mse > 1.5 * avg_mse * 0.25 + 0.75 * threshold: + if anomaly_text == "": + anomaly_text = f"Focus ID {track_ids[i]}" + else: + anomaly_text = f"{anomaly_text}, {track_ids[i]}" + + # print(anomaly_text) + + temp_for_db["bbox"][f"id {i}"] = " ".join( + map( + lambda x: str(round(x, 4)), + buffer_array[-prediction_time:][0, :4], + ) + ) + + temp_for_db["keypoints"][f"id {i}"] = " ".join( + map( + lambda x: str(round(x, 4)), + buffer_array[-prediction_time:][0, 4:], + ) + ) + + # Remove the oldest data point from the buffer to maintain its size + id_buffers[track_ids[i]].pop(0) + + if temp_for_db["bbox"] != {}: + + temp_for_db["score"] = mse_unit + + # upload frame to s3 + frame_url = self.upload_frame_s3(self.s3, frame) + + # upload frame, ts, bbox, kp to db + self.upload_frame_db(self.db, temp_for_db, frame_url) + + else: + anomaly_text = "" + # If 'int' attribute doesn't exist (no detections), set track_ids to an empty list + track_ids = [] + + # Visualize the results on the frame + annotated_frame = results[0].plot() + self.display_text( + annotated_frame, anomaly_text, (10, 30) + ) # Display the anomaly text + + # Plot the tracks + for box, track_id in zip(boxes, track_ids): + x, y, w, h = box + track = track_history[track_id] + track.append((float(x), float(y))) # x, y center point + if len(track) > 30: # retain 90 tracks for 90 frames + track.pop(0) + + # Draw the tracking lines + points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2)) + cv2.polylines( + annotated_frame, + [points], + isClosed=False, + color=(230, 230, 230), + thickness=10, + ) + + scores.append(mse_unit) + + # Display the annotated frame + output_video.write(annotated_frame) + # cv2.imshow("YOLOv8 Tracking", annotated_frame) + + else: + # If no detections, display the original frame without annotations + scores.append(mse_unit) + output_video.write(frame) + # cv2.imshow("YOLOv8 Tracking", frame) + + else: + # Break the loop if the end of the video is reached + break + + # Release the video capture, video writer object + cap.release() + output_video.release() + # cv2.destroyAllWindows() + + # upload video to s3 + self.upload_video_s3(self.s3, output_video_path) + + # upload score graph to s3 + self.upload_score_graph_s3(self.s3, scores) + + try: + os.remove(temp_name) + except: + pass + os.remove(output_video_path) diff --git a/app/inference/rt_anomaly_detector.py b/app/inference/rt_anomaly_detector.py new file mode 100644 index 0000000..9b70305 --- /dev/null +++ b/app/inference/rt_anomaly_detector.py @@ -0,0 +1,362 @@ +import json +import os +import sys +import uuid +from collections import defaultdict +from datetime import datetime, time, timedelta +from io import BytesIO + +import albumentations as A +import cv2 +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import torch +import torch.nn as nn +from database import crud, schemas +from fastapi import HTTPException +from sklearn.preprocessing import MinMaxScaler +from starlette import status +from ultralytics import YOLO + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(os.path.dirname(current_dir)) + +sys.path.append(os.path.join(parent_dir, "model")) +from copy import deepcopy + +import vmae + +# @@ timm은 0.4.12 버전 사용 필수 +from timm.models import create_model + + +class RT_AnomalyDetector: + def __init__(self, info, s3_client, settings, db, websocket): + self.info = info + self.s3 = s3_client + self.settings = settings + self.frame_url_base = f"frame/{info['user_id']}/{info['upload_id']}/" + self.db = db + self.websocket = websocket + + async def upload_frame_db(self, db, temp_for_db, frame_url): + + temp_json_path = "./temp.json" + + with open(temp_json_path, "w") as f: + json.dump(temp_for_db, f) + + with open(temp_json_path, "r") as f: + box_kp_json = json.load(f) + + _frame_create = schemas.FrameCreate( + frame_url=frame_url, + time_stamp=temp_for_db["timestamp"], + box_kp_json=box_kp_json, + score=temp_for_db["score"], + video_id=self.info["video_id"], + ) + + crud.create_frame(db=db, frame=_frame_create) + + os.remove(temp_json_path) + + async def upload_frame_s3(self, s3, frame): + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Frame 을 s3 저장소 업로드에 실패했습니다.", + ) + frame_name = uuid.uuid1() + frame_url = self.frame_url_base + f"{frame_name}" + ".png" + # print(frame_url) + + try: + s3.upload_fileobj( + BytesIO(cv2.imencode(".png", frame)[1].tobytes()), + self.settings.BUCKET, + frame_url, + ExtraArgs={"ContentType": "image/png"}, + ) + except Exception as e: + # print(e) + raise s3_upload_exception + + return frame_url + + def upload_score_graph_s3(self): + plt.plot(self.scores, color="red") + plt.title("Anomaly Scores Over Time") + plt.xlabel(" ") + plt.ylabel(" ") + + plt.xticks([]) + plt.yticks([]) + + save_path = "./model_scores_plot.png" + plt.savefig(save_path) + + with open(save_path, "rb") as image_file: + graph = image_file.read() + + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="score Graph 를 s3 저장소 업로드에 실패했습니다.", + ) + score_graph_name = "score_graph.png" + score_graph_url = self.frame_url_base + score_graph_name + + try: + self.s3.upload_fileobj( + BytesIO(graph), + self.settings.BUCKET, + score_graph_url, + ExtraArgs={"ContentType": "image/png"}, + ) + except: + raise s3_upload_exception + + os.remove(save_path) + + def ready(self): + # YOLO + self.tracker_model = YOLO("yolov8n-pose.pt") + + # VMAE v2 + self.backbone = model = create_model( + "vit_small_patch16_224", + img_size=224, + pretrained=False, + num_classes=710, + all_frames=16, + ) + + load_dict = torch.load( + "/data/ephemeral/home/level2-3-cv-finalproject-cv-06/model/pts/vit_s_k710_dl_from_giant.pth" + ) + + self.backbone.load_state_dict(load_dict["module"]) + + self.tf = A.Resize(224, 224) + + # Define sequence_length, prediction_time, and n_features + # sequence_length = 20 + # prediction_time = 1 + # n_features = 38 + + # classifier + checkpoint = torch.load( + "/data/ephemeral/home/level2-3-cv-finalproject-cv-06/model/pts/MIL_20240325_202019_best_auc.pth" + ) + self.classifier = vmae.MILClassifier(input_dim=710, drop_p=0.3) + self.classifier.load_state_dict(checkpoint["model_state_dict"]) + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + self.tracker_model.to(device) + self.backbone.to(device) + self.backbone.eval() + self.classifier.to(device) + self.classifier.eval() + + # Store the track history + self.track_history = defaultdict(lambda: []) + + # Initialize a dictionary to store separate buffers for each ID + self.id_buffers = defaultdict(lambda: []) + + # Loop through the video frames + self.frame_count = 0 + # self.net_mse = 0 + # self.avg_mse = 0 + + # score graph 를 위한 score list + self.scores = [] + + # vmae에 입력할 frame들을 저장할 list + self.v_frames = [] + + # 영상 frame 임시 저장 list + self.frame_list = [] + # yolo results 임시 저장 list + self.results_list = [] + # temp_for_db 임시 저장 list + self.tfdb_list = [] + + # timestamp 저장 + self.prv_timestamp = 0 + self.fps3_delta = timedelta(seconds=1 / 3) + + async def run(self, frame, timestamp): + + # Define the standard frame size + standard_width = 640 + standard_height = 480 + + # Define sequence_length, prediction_time, and n_features + # sequence_length = 20 + # prediction_time = 1 + # n_features = 38 + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + # Define a function to calculate MSE between two sequences + def calculate_mse(seq1, seq2): + return np.mean(np.power(seq1 - seq2, 2)) + + # anomaly threshold (default 0.02) + # threshold = self.info["thr"] + threshold = 0.1 + + self.frame_count += 1 # Increment frame count + + frame = cv2.resize(frame, (standard_width, standard_height)) + + temp_for_db = {"timestamp": None, "bbox": {}, "keypoints": {}, "score": None} + + # track id (사람) 별로 mse 점수가 나오기 때문에 한 frame 에 여러 mse 점수가 나옴. 이를 frame 별 점수로 구하기 위해서 변수 설정 + # mse_unit = 0 + + frame_checker = False + + # 이전에 frame을 저장한 시점에서 0.3333.. 초 이상 경과했는지 확인 + if self.prv_timestamp == 0: + frame_checker = True + self.prv_timestamp = timestamp + else: + time_delta = timestamp - self.prv_timestamp + if time_delta > self.fps3_delta: + frame_checker = True + self.prv_timestamp = timestamp + + timestamp = timestamp.strftime("%H:%M:%S") + temp_for_db["timestamp"] = timestamp + + # Run YOLOv8 tracking on the frame, persisting tracks between frames + + # print(f"==>> frame_checker: {frame_checker}") + # frame_checker = True + # 1초에 3 frame만 저장해서 vmae+MIL에 사용 + if frame_checker: + results = self.tracker_model.track(frame, persist=True, verbose=False) + # print("yolo 1frame inference") + self.frame_list.append(frame.copy()) + self.results_list.append(deepcopy(results)) + self.tfdb_list.append(deepcopy(temp_for_db)) + + v_frame = self.tf(image=frame)["image"] + # (224, 224, 3) + v_frame = np.expand_dims(v_frame, axis=0) + # (1, 224, 224, 3) + self.v_frames.append(v_frame.copy()) + print(f"==>> len(self.v_frames): {len(self.v_frames)}") + + # 16 frame이 모이면 vmae+MIL 계산 + if len(self.v_frames) == 176: + print("VMAE 176frame inference") + in_frames = np.concatenate(self.v_frames) + # (176, 224, 224, 3) + in_frames = in_frames.reshape(11, 16, 224, 224, 3) + in_frames = in_frames.transpose(0, 4, 1, 2, 3) + # (11, RGB 3, frame T=16, H=224, W=224) + in_frames = torch.from_numpy(in_frames).float() + # torch.Size([11, 3, 16, 224, 224]) + + in_frames = in_frames.to(device) + + with torch.no_grad(): + v_output = self.backbone(in_frames) + # torch.Size([11, 710]) + v_output = v_output.view(1, 11, -1) + v_score = self.classifier(v_output) + v_score = v_score.view(1, 11) + print(f"==>> v_score: {v_score}") + print(f"==>> v_score.shape: {v_score.shape}") + # torch.Size([1, 11]) + s_list = [v_score[0, i].cpu().item() for i in range(11)] + + self.v_frames = [] + for f_step, (frame_i, results_i, temp_for_db_i) in enumerate( + zip(self.frame_list, self.results_list, self.tfdb_list) + ): + if s_list[f_step // 16] > threshold: + # if True: + anomaly_text = ( + f"Anomaly detected, score: {s_list[f_step // 16]}" + ) + + if ( + results_i[0].boxes is not None + ): # Check if there are results and boxes + + # Get the boxes + boxes = results_i[0].boxes.xywh.cpu() + + if results_i[0].boxes.id is not None: + # If 'int' attribute exists (there are detections), get the track IDs + + track_ids = results_i[0].boxes.id.int().cpu().tolist() + + # Loop through the detections and add data to the DataFrame + # anomaly_text = "" # Initialize the anomaly text + + # 한 프레임에서 검출된 사람만큼 돌아가는 반복문. 2명이면 각 id 별로 아래 연산들이 진행됨. + for i, box in zip( + range(0, len(track_ids)), + results_i[0].boxes.xywhn.cpu(), + ): + + x, y, w, h = box + keypoints = ( + results_i[0] + .keypoints.xyn[i] + .cpu() + .numpy() + .flatten() + .tolist() + ) + + xywhk = np.array( + [float(x), float(y), float(w), float(h)] + + keypoints + ) + + xywhk = list(map(lambda x: str(round(x, 4)), xywhk)) + + temp_for_db_i["bbox"][f"id {i}"] = " ".join( + xywhk[:4] + ) + + temp_for_db_i["keypoints"][f"id {i}"] = " ".join( + xywhk[4:] + ) + + else: + # If 'int' attribute doesn't exist (no detections), set track_ids to an empty list + track_ids = [] + + # self.scores.append(mse_unit) + + # Display the annotated frame + # cv2.imshow("YOLOv8 Tracking", annotated_frame) + + # else: + # If no detections, display the original frame without annotations + # self.scores.append(mse_unit) + # cv2.imshow("YOLOv8 Tracking", frame) + + temp_for_db_i["score"] = s_list[f_step // 16] + + # upload frame to s3 + frame_url = await self.upload_frame_s3(self.s3, frame_i) + + # upload frame, ts, bbox, kp to db + await self.upload_frame_db(self.db, temp_for_db_i, frame_url) + + await self.websocket.send_text(f"{timestamp}: {anomaly_text}") + + # 초기화 + self.scores.extend(deepcopy(s_list)) + s_list = [] + self.frame_list = [] + self.results_list = [] + self.tfdb_list = [] diff --git a/app/inference/rt_anomaly_detector_lstmae.py b/app/inference/rt_anomaly_detector_lstmae.py new file mode 100644 index 0000000..6035a43 --- /dev/null +++ b/app/inference/rt_anomaly_detector_lstmae.py @@ -0,0 +1,304 @@ +import json +import os +import sys +import uuid +from collections import defaultdict +from datetime import datetime, time +from io import BytesIO + +import cv2 +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import torch +import torch.nn as nn +from database import crud, schemas +from fastapi import HTTPException +from sklearn.preprocessing import MinMaxScaler +from starlette import status +from ultralytics import YOLO + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(os.path.dirname(current_dir)) + +sys.path.append(os.path.join(parent_dir, "model")) + +from lstmae.lstm_ae import LSTMAutoEncoder + + +class RT_AnomalyDetector: + def __init__(self, info, s3_client, settings, db, websocket): + self.info = info + self.s3 = s3_client + self.settings = settings + self.frame_url_base = f"frame/{info['user_id']}/{info['upload_id']}/" + self.db = db + self.websocket = websocket + + async def upload_frame_db(self, db, temp_for_db, frame_url): + + temp_json_path = "./temp.json" + + with open(temp_json_path, "w") as f: + json.dump(temp_for_db, f) + + with open(temp_json_path, "r") as f: + box_kp_json = json.load(f) + + _frame_create = schemas.FrameCreate( + frame_url=frame_url, + time_stamp=temp_for_db["timestamp"], + box_kp_json=box_kp_json, + score=temp_for_db["score"], + video_id=self.info["video_id"], + ) + + crud.create_frame(db=db, frame=_frame_create) + + os.remove(temp_json_path) + + async def upload_frame_s3(self, s3, frame): + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Frame 을 s3 저장소 업로드에 실패했습니다.", + ) + frame_name = uuid.uuid1() + frame_url = self.frame_url_base + f"{frame_name}" + ".png" + # print(frame_url) + + try: + s3.upload_fileobj( + BytesIO(cv2.imencode(".png", frame)[1].tobytes()), + self.settings.BUCKET, + frame_url, + ExtraArgs={"ContentType": "image/png"}, + ) + except Exception as e: + # print(e) + raise s3_upload_exception + + return frame_url + + def upload_score_graph_s3(self): + plt.plot(self.scores, color="red") + plt.title("Anomaly Scores Over Time") + plt.xlabel(" ") + plt.ylabel(" ") + + plt.xticks([]) + plt.yticks([]) + + save_path = "./model_scores_plot.png" + plt.savefig(save_path) + + with open(save_path, "rb") as image_file: + graph = image_file.read() + + s3_upload_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="score Graph 를 s3 저장소 업로드에 실패했습니다.", + ) + score_graph_name = "score_graph.png" + score_graph_url = self.frame_url_base + score_graph_name + + try: + self.s3.upload_fileobj( + BytesIO(graph), + self.settings.BUCKET, + score_graph_url, + ExtraArgs={"ContentType": "image/png"}, + ) + except: + raise s3_upload_exception + + os.remove(save_path) + + def ready(self): + # YOLO + self.tracker_model = YOLO("yolov8n-pose.pt") + + # Define sequence_length, prediction_time, and n_features + sequence_length = 20 + prediction_time = 1 + n_features = 38 + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + # LSTM autoencoder + # LSTM autoencoder + checkpoint = torch.load( + "/data/ephemeral/home/level2-3-cv-finalproject-cv-06/model/pts/LSTM_20240324_222238_best.pth" + ) + self.autoencoder_model = LSTMAutoEncoder( + num_layers=2, hidden_size=50, n_features=n_features, device=device + ) + self.autoencoder_model.load_state_dict(checkpoint["model_state_dict"]) + + self.tracker_model.to(device) + self.autoencoder_model.to(device) + + # Store the track history + self.track_history = defaultdict(lambda: []) + + # Initialize a dictionary to store separate buffers for each ID + self.id_buffers = defaultdict(lambda: []) + + # Loop through the video frames + self.frame_count = 0 + self.net_mse = 0 + self.avg_mse = 0 + # score graph 를 위한 score list + self.scores = [] + + async def run(self, frame, timestamp): + + # Define the standard frame size + standard_width = 640 + standard_height = 480 + + # Define sequence_length, prediction_time, and n_features + sequence_length = 20 + prediction_time = 1 + n_features = 38 + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + # Define a function to calculate MSE between two sequences + def calculate_mse(seq1, seq2): + return np.mean(np.power(seq1 - seq2, 2)) + + # anomaly threshold (default 0.02) + threshold = self.info["thr"] + + self.frame_count += 1 # Increment frame count + + frame = cv2.resize(frame, (standard_width, standard_height)) + + temp_for_db = {"timestamp": None, "bbox": {}, "keypoints": {}, "score": None} + + # track id (사람) 별로 mse 점수가 나오기 때문에 한 frame 에 여러 mse 점수가 나옴. 이를 frame 별 점수로 구하기 위해서 변수 설정 + mse_unit = 0 + + timestamp = timestamp.strftime("%H:%M:%S") + temp_for_db["timestamp"] = timestamp + + # Run YOLOv8 tracking on the frame, persisting tracks between frames + results = self.tracker_model.track(frame, persist=True) + + if results[0].boxes is not None: # Check if there are results and boxes + + # Get the boxes + boxes = results[0].boxes.xywh.cpu() + + if results[0].boxes.id is not None: + # If 'int' attribute exists (there are detections), get the track IDs + + track_ids = results[0].boxes.id.int().cpu().tolist() + + # Loop through the detections and add data to the DataFrame + anomaly_text = "" # Initialize the anomaly text + + # 한 프레임에서 검출된 사람만큼 돌아가는 반복문. 2명이면 각 id 별로 아래 연산들이 진행됨. + for i, box in zip( + range(0, len(track_ids)), results[0].boxes.xywhn.cpu() + ): + + x, y, w, h = box + keypoints = ( + results[0].keypoints.xyn[i].cpu().numpy().flatten().tolist() + ) + + # Append the keypoints to the corresponding ID's buffer + # bbox(4), keypoints per id(34) + self.id_buffers[track_ids[i]].append( + [float(x), float(y), float(w), float(h)] + keypoints + ) + + # If the buffer size reaches the threshold (e.g., 20 data points), perform anomaly detection + # track_id 별 20프레임이 쌓이면 아래 연산 진행 + if len(self.id_buffers[track_ids[i]]) >= 20: + # Convert the buffer to a NumPy array + buffer_array = np.array(self.id_buffers[track_ids[i]]) + + # Scale the data (you can use the same scaler you used during training) + scaler = MinMaxScaler() + buffer_scaled = scaler.fit_transform(buffer_array) + + # Create sequences for prediction + x_pred = buffer_scaled[-sequence_length:].reshape( + 1, sequence_length, n_features + ) + + # Predict the next values using the autoencoder model + x_pred = torch.tensor(x_pred, dtype=torch.float32).to(device) + x_pred = self.autoencoder_model.forward(x_pred) + + # Inverse transform the predicted data to the original scale + x_pred_original = scaler.inverse_transform( + x_pred.cpu().detach().numpy().reshape(-1, n_features) + ) + + # Calculate the MSE between the predicted and actual values + mse = calculate_mse( + buffer_array[-prediction_time:], x_pred_original + ) + + # print(mse) + + self.net_mse = mse + self.net_mse + self.avg_mse = self.net_mse / self.frame_count + + mse_unit += mse + + # Check if the MSE exceeds the threshold to detect an anomaly + if mse > 1.5 * (self.avg_mse) * 0.25 + 0.75 * threshold: + + if anomaly_text == "": + anomaly_text = f"이상행동이 감지되었습니다." + else: + anomaly_text = f"이상행동이 감지되었습니다." + + # print(anomaly_text) + + temp_for_db["bbox"][f"id {i}"] = " ".join( + map( + lambda x: str(round(x, 4)), + buffer_array[-prediction_time:][0, :4], + ) + ) + + temp_for_db["keypoints"][f"id {i}"] = " ".join( + map( + lambda x: str(round(x, 4)), + buffer_array[-prediction_time:][0, 4:], + ) + ) + + # Remove the oldest data point from the buffer to maintain its size + self.id_buffers[track_ids[i]].pop(0) + + if temp_for_db["bbox"] != {}: + + temp_for_db["score"] = mse_unit + + # upload frame to s3 + frame_url = await self.upload_frame_s3(self.s3, frame) + + # upload frame, ts, bbox, kp to db + await self.upload_frame_db(self.db, temp_for_db, frame_url) + + await self.websocket.send_text(f"{timestamp}: {anomaly_text}") + + else: + anomaly_text = "" + # If 'int' attribute doesn't exist (no detections), set track_ids to an empty list + track_ids = [] + + self.scores.append(mse_unit) + + # Display the annotated frame + # cv2.imshow("YOLOv8 Tracking", annotated_frame) + + else: + # If no detections, display the original frame without annotations + self.scores.append(mse_unit) + # cv2.imshow("YOLOv8 Tracking", frame) diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..7152aa0 --- /dev/null +++ b/app/main.py @@ -0,0 +1,74 @@ +import os +from datetime import datetime, timedelta + +import uvicorn +from api import album_router, real_time_router, upload_router, user_router +from fastapi import FastAPI, Form, Request, Response +from fastapi.middleware.cors import CORSMiddleware +from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates +from jose import jwt +from utils.config import settings +from utils.security import get_current_user + +templates = Jinja2Templates(directory="templates") + +app = FastAPI() +static_dir = os.path.join(os.path.dirname(__file__), "templates", "src") +app.mount("/src", StaticFiles(directory=static_dir), name="src") + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.get("/") +async def main_get(request: Request): + user = get_current_user(request) + if user: + return templates.TemplateResponse( + "main.html", {"request": request, "token": user.email} + ) + else: + return templates.TemplateResponse( + "main.html", {"request": request, "token": None} + ) + + +@app.post("/") +async def main_post(request: Request): + body = await request.form() + email = body["email"] + data = { + "sub": email, + "exp": datetime.utcnow() + + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES), + } + token = jwt.encode(data, settings.SECRET_KEY, algorithm=settings.ALGORITHM) + + template_response = templates.TemplateResponse( + "main.html", {"request": request, "token": email} + ) + + # 쿠키 저장 + template_response.set_cookie( + key="access_token", + value=token, + expires=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES), + httponly=True, + ) + + return template_response + + +app.include_router(user_router.router) +app.include_router(upload_router.router) +app.include_router(album_router.router) +app.include_router(real_time_router.router) + +if __name__ == "__main__": + uvicorn.run("main:app", host="0.0.0.0", port=30011, reload=True) diff --git a/app/templates/album_detail.html b/app/templates/album_detail.html new file mode 100644 index 0000000..03f06d5 --- /dev/null +++ b/app/templates/album_detail.html @@ -0,0 +1,100 @@ +{% extends 'base.html' %} + +{% block title %} +사용자 ID: {{ video_info.user_id }}
+업로드 ID: {{ video_info.upload_id }}
+업로드 이름: {{ video_info.upload_name }}
+날짜: {{ video_info.date }}
+ {% if video_info.is_realtime %} +종류: 실시간
실시간 서비스는 영상을 녹화하지 않습니다. 따라서 앨범에서 전체 영상은 보이지 않습니다.
종류: 녹화 영상
+ {% endif %} +# | +이름 | +날짜&시간 | +이상탐지 | +분석 여부 | +Button | +
---|---|---|---|---|---|
{{ loop.index }} | ++ + + {{ upload.name }} + + | ++ {{ upload.date }} + | ++ {% if upload.is_realtime %} + 실시간 + {% else %} + 녹화 영상 + {% endif %} + | +
+ {% if upload.completes[0].completed %}
+ 분석 완료
+ {% else %}
+ 분석중
+ {% endif %}
+ |
+ + + + | +