From 0e6700d9aa97df6d54e2677b779de6e8ea8fde0b Mon Sep 17 00:00:00 2001 From: victorleaoo Date: Wed, 4 Sep 2024 19:04:56 -0300 Subject: [PATCH] adicao de seguranca para criar admin --- src/controller/authController.py | 5 +++-- src/utils/security.py | 11 +++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/controller/authController.py b/src/controller/authController.py index eb796b1..2ce6a6f 100644 --- a/src/controller/authController.py +++ b/src/controller/authController.py @@ -8,6 +8,7 @@ from datetime import datetime, timedelta from constants import errorMessages from starlette.responses import JSONResponse +from utils import security, enumeration from domain import userSchema, authSchema from repository import userRepository @@ -130,7 +131,7 @@ async def validate_account(data: authSchema.AccountValidation, db: Session = Dep # cadastro da senha de admin / role do admin @auth.post('/admin-setup') -async def admin_setup(data: authSchema.AdminSetup, db: Session = Depends(get_db)): +async def admin_setup(data: authSchema.AdminSetup, db: Session = Depends(get_db), token: dict = Depends(security.verify_token_admin)): user = userRepository.get_user_by_email(db, data.email) if not user: raise HTTPException(status_code=404, detail=errorMessages.USER_NOT_FOUND) @@ -146,7 +147,7 @@ async def admin_setup(data: authSchema.AdminSetup, db: Session = Depends(get_db) return JSONResponse(status_code=200, content={"status": "success"}) @auth.post('/super-admin-setup') -async def super_admin_setup(data: authSchema.AdminSetup, db: Session = Depends(get_db)): +async def super_admin_setup(data: authSchema.AdminSetup, db: Session = Depends(get_db), token: dict = Depends(security.verify_token_admin)): user = userRepository.get_user_by_email(db, data.email) if not user: raise HTTPException(status_code=404, detail=errorMessages.USER_NOT_FOUND) diff --git a/src/utils/security.py b/src/utils/security.py index 8b26736..f398960 100644 --- a/src/utils/security.py +++ b/src/utils/security.py @@ -37,9 +37,20 @@ def create_access_token(data: dict): def verify_token(token: str = Depends(oauth2_scheme)): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + # print(payload["role"]) return payload except JWTError: raise HTTPException(status_code=401, detail=errorMessages.INVALID_TOKEN) + +def verify_token_admin(token: str = Depends(oauth2_scheme)): + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + if payload["role"] == "ADMIN": + return payload + else: + raise HTTPException(status_code=401, detail=errorMessages.INVALID_TOKEN) + except JWTError: + raise HTTPException(status_code=401, detail=errorMessages.INVALID_TOKEN) def generate_six_digit_number_code(): return secrets.randbelow(900000) + 100000