-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'admin' of https://github.com/cf15-t5/seticket-backend
- Loading branch information
Showing
9 changed files
with
292 additions
and
189 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,19 +1,20 @@ | ||
from dotenv import dotenv_values | ||
|
||
env = dotenv_values(".env") | ||
BASE_URL = env["BASE_URL"] or "http://localhost" | ||
PORT= env['PORT'] or 5000 | ||
DEBUG= env['DEBUG'] or True | ||
PORT = env["PORT"] or 5000 | ||
DEBUG = env["DEBUG"] or True | ||
|
||
DATABASE_URL = env['DATABASE_URL'] or "mysq;://root:root@localhost:3306/se_ticket" | ||
DATABASE_URL = env["DATABASE_URL"] or "mysq;://root:root@localhost:3306/se_ticket" | ||
|
||
JWT_ACCESS_TOKEN_EXPIRES = env['JWT_ACCESS_TOKEN_EXPIRES'] or 60 * 60 * 24 * 7 | ||
JWT_ACCESS_TOKEN_SECRET = env['JWT_ACCESS_TOKEN_SECRET'] or "secret" | ||
JWT_ACCESS_TOKEN_ALGORITHM = env['JWT_ACCESS_TOKEN_ALGORITHM'] or "HS256" | ||
JWT_ACCESS_TOKEN_EXPIRES = env["JWT_ACCESS_TOKEN_EXPIRES"] or 60 * 60 * 24 * 7 | ||
JWT_ACCESS_TOKEN_SECRET = env["JWT_ACCESS_TOKEN_SECRET"] or "secret" | ||
JWT_ACCESS_TOKEN_ALGORITHM = env["JWT_ACCESS_TOKEN_ALGORITHM"] or "HS256" | ||
|
||
|
||
MAIL_SERVER = env['MAIL_SERVER'] or 'sandbox.smtp.mailtrap.io' | ||
MAIL_PORT = env['MAIL_PORT'] or 2525 | ||
MAIL_USE_TLS = env['MAIL_USE_TLS'] or True | ||
MAIL_USE_SSL = env['MAIL_USE_SSL'] or False | ||
MAIL_USERNAME = env['MAIL_USERNAME'] or '1190969a29319c' | ||
MAIL_PASSWORD = env['MAIL_PASSWORD'] or '39c02575b88bd3' | ||
MAIL_SERVER = env["MAIL_SERVER"] or "sandbox.smtp.mailtrap.io" | ||
MAIL_PORT = env["MAIL_PORT"] or 2525 | ||
MAIL_USE_TLS = env["MAIL_USE_TLS"] or True | ||
MAIL_USE_SSL = env["MAIL_USE_SSL"] or False | ||
MAIL_USERNAME = env["MAIL_USERNAME"] or "1190969a29319c" | ||
MAIL_PASSWORD = env["MAIL_PASSWORD"] or "39c02575b88bd3" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,45 +1,48 @@ | ||
from flask import Blueprint,g, request | ||
from flask import Blueprint, g, request | ||
from src.middlewares.AuthMiddleware import isAuthenticated | ||
from src.services.AuthService import AuthService as AuthService | ||
import src.utils.getResponse as Response | ||
import src.utils.getResponse as Response | ||
|
||
AuthApp = Blueprint('AuthApp', __name__) | ||
authService = AuthService() | ||
AuthApp = Blueprint("AuthApp", __name__) | ||
authService = AuthService() | ||
|
||
@AuthApp.route('/login', methods=['GET']) | ||
|
||
@AuthApp.route("/login", methods=["GET"]) | ||
def index(): | ||
users = [] | ||
return Response.success(users,"success get all user") | ||
|
||
@AuthApp.route('/register', methods=['POST']) | ||
users = [] | ||
return Response.success(users, "success get all user") | ||
|
||
|
||
@AuthApp.route("/register", methods=["POST"]) | ||
def register(): | ||
req = request.json | ||
result = authService.registerUser(req) | ||
if(result['status'] == 'failed'): | ||
return Response.error(result['data'],result['code']) | ||
return Response.success(result['data'],"success create new user") | ||
req = request.json | ||
result = authService.registerUser(req) | ||
if result["status"] == "failed": | ||
return Response.error(result["data"], result["code"]) | ||
return Response.success(result["data"], "success create new user") | ||
|
||
@AuthApp.route('/login', methods=['POST']) | ||
|
||
@AuthApp.route("/login", methods=["POST"]) | ||
def login(): | ||
req = request.json | ||
result = authService.login(req) | ||
if(result['status'] == 'failed'): | ||
return Response.error(result['data'],result['code']) | ||
return Response.success(result['data'],"success login") | ||
req = request.json | ||
result = authService.login(req) | ||
if result["status"] == "failed": | ||
return Response.error(result["data"], result["code"]) | ||
return Response.success(result["data"], "success login") | ||
|
||
|
||
@AuthApp.route('/me', methods=['GET']) | ||
@AuthApp.route("/me", methods=["GET"]) | ||
@isAuthenticated | ||
def me(): | ||
|
||
return Response.success(g.user,"success get user data") | ||
return Response.success(g.user, "success get user data") | ||
|
||
|
||
@AuthApp.route('/verify', methods=['post']) | ||
@AuthApp.route("/verify", methods=["post"]) | ||
@isAuthenticated | ||
def verify(): | ||
req = request.json | ||
result = authService.verify(req) | ||
if(result['status'] == 'failed'): | ||
return Response.error(result['data'],result['code']) | ||
return Response.success(result['data'],"success verify user") | ||
def verify(): | ||
req = request.json | ||
|
||
result = authService.verify(req) | ||
if result["status"] == "failed": | ||
return Response.error(result["data"], result["code"]) | ||
return Response.success(result["data"], "success verify user") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,40 +1,66 @@ | ||
from flask import Blueprint,request,g | ||
from flask import Blueprint, request, g | ||
from src.services.UserService import UserService as UserService | ||
from src.middlewares.AuthMiddleware import isAuthenticated | ||
import src.utils.getResponse as Response | ||
from src.middlewares.AuthMiddleware import isAuthenticated, isAdmin | ||
import src.utils.getResponse as Response | ||
|
||
UserApp = Blueprint('UserApp', __name__,) | ||
userService = UserService() | ||
UserApp = Blueprint( | ||
"UserApp", | ||
__name__, | ||
) | ||
userService = UserService() | ||
|
||
@UserApp.route('/', methods=['GET']) | ||
|
||
@UserApp.route("/", methods=["GET"]) | ||
@isAuthenticated | ||
def index(): | ||
result = userService.getAllUser() | ||
return Response.success(result['data'],"success get all user") | ||
result = userService.getAllUser() | ||
return Response.success(result["data"], "success get all user") | ||
|
||
|
||
@UserApp.route('/update-profile', methods=['post']) | ||
@UserApp.route("/update-profile", methods=["post"]) | ||
@isAuthenticated | ||
def updateProfile(): | ||
req = request.json | ||
result = userService.updateProfile(data=req,id=g.user['user_id']) | ||
if(result['status'] == 'failed'): | ||
return Response.error(result['data'],result['code']) | ||
return Response.success(result['data'],"success update profile user") | ||
|
||
@UserApp.route('/topup', methods=['post']) | ||
def updateProfile(): | ||
req = request.json | ||
result = userService.updateProfile(data=req, id=g.user["user_id"]) | ||
if result["status"] == "failed": | ||
return Response.error(result["data"], result["code"]) | ||
return Response.success(result["data"], "success update profile user") | ||
|
||
|
||
@UserApp.route("/topup", methods=["post"]) | ||
@isAuthenticated | ||
def topup(): | ||
req = request.json | ||
result = userService.topup(data=req,id=g.user['user_id']) | ||
if(result['status'] == 'failed'): | ||
return Response.error(result['data'],result['code']) | ||
return Response.success(result['data'],"success topup user") | ||
|
||
@UserApp.route('/withdraw', methods=['post']) | ||
def topup(): | ||
req = request.json | ||
result = userService.topup(data=req, id=g.user["user_id"]) | ||
if result["status"] == "failed": | ||
return Response.error(result["data"], result["code"]) | ||
return Response.success(result["data"], "success topup user") | ||
|
||
|
||
@UserApp.route("/withdraw", methods=["post"]) | ||
@isAuthenticated | ||
def withdraw(): | ||
req = request.json | ||
result = userService.withdraw(data=req,id=g.user['user_id']) | ||
if(result['status'] == 'failed'): | ||
return Response.error(result['data'],result['code']) | ||
return Response.success(result['data'],"success withdraw user") | ||
def withdraw(): | ||
req = request.json | ||
result = userService.withdraw(data=req, id=g.user["user_id"]) | ||
if result["status"] == "failed": | ||
return Response.error(result["data"], result["code"]) | ||
return Response.success(result["data"], "success withdraw user") | ||
|
||
|
||
@UserApp.route("/admin/view-users", methods=["GET"]) | ||
@isAdmin | ||
def viewUsers(): | ||
result = userService.getAllUser() | ||
return Response.success(result["data"], "success get all user data") | ||
|
||
|
||
@UserApp.route("/admin/search-users", methods=["POST"]) | ||
@isAdmin | ||
def searchUsers(): | ||
req = request.json | ||
if req is not None: | ||
user_id = req.get("id") | ||
result = userService.searchUser(id=user_id) | ||
return Response.success(result["data"], "success get user data") | ||
else: | ||
return Response.error("Not Found", "Request body is missing") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,50 @@ | ||
from functools import wraps # | ||
import src.utils.jwt as jwt | ||
import src.utils.getResponse as response | ||
from flask import request, g | ||
from flask import request, g, jsonify | ||
from src.repositories.UserRepository import UserRepository | ||
from src.utils.permission import check_role_is_have_access | ||
from src.utils.convert import queryResultToDict | ||
|
||
user_repository = UserRepository() | ||
|
||
|
||
def isAuthenticated(func): | ||
@wraps(func) # Apply the wraps decorator | ||
def wrapper(*args, **kwargs): | ||
if request.headers.get('Authorization') is None: | ||
if request.headers.get("Authorization") is None: | ||
return response.error(message="Unauthorized", errors=None, status_code=401) | ||
else: | ||
token = request.headers.get('Authorization').split(" ")[1] | ||
auth_header = request.headers.get("Authorization") | ||
if auth_header is not None: | ||
token = auth_header.split(" ")[1] | ||
else: | ||
return response.error( | ||
message="Missing Authorization Header", errors=None, status_code=401 | ||
) | ||
try: | ||
decode = jwt.decode(token) | ||
user = user_repository.getUserById(decode['user_id']) | ||
user = user_repository.getUserById(decode["user_id"]) | ||
is_have_access = check_role_is_have_access(user.role, request.path) | ||
if not is_have_access: | ||
return response.error(message="Forbidden", errors=None, status_code=403) | ||
return response.error( | ||
message="Forbidden", errors=None, status_code=403 | ||
) | ||
g.user = queryResultToDict([user])[0] | ||
return func(*args, **kwargs) | ||
except jwt.jwt.InvalidKeyError as e: | ||
return response.error(message="Unauthorized", errors=None, status_code=401) | ||
return wrapper | ||
return response.error( | ||
message="Unauthorized", errors=None, status_code=401 | ||
) | ||
|
||
return wrapper | ||
|
||
|
||
def isAdmin(f): | ||
@wraps(f) | ||
def decorated_function(*args, **kwargs): | ||
if g.user["role"] != "admin": | ||
return jsonify({"message": "Unauthorized access"}), 403 | ||
return f(*args, **kwargs) | ||
|
||
return decorated_function |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,48 +1,62 @@ | ||
from src.models.User import User,db | ||
from src.models.User import User, db | ||
import bcrypt | ||
import sys | ||
|
||
|
||
class UserRepository: | ||
def getAllUser(self): | ||
return User.query.all() | ||
|
||
def getUserByEmail(self,email): | ||
return User.query.filter_by(email=email).first() | ||
def createNewUser(self,data): | ||
password = bcrypt.hashpw(data['password'].encode('utf-8'), bcrypt.gensalt()) | ||
newUser = User( | ||
name=data['name'], | ||
email=data['email'], | ||
password=password, | ||
status='INACTIVE' if data['role'] == 'EVENT_ORGANIZER' else 'ACTIVE', | ||
role=data['role'], | ||
balance=0 | ||
) | ||
db.session.add(newUser) | ||
db.session.commit() | ||
return newUser | ||
def getUserById(self,user_id): | ||
return User.query.filter_by(user_id=user_id).first() | ||
def verifyUser(self,user_id,status): | ||
user = User.query.filter_by(user_id=user_id).first() | ||
if(not user) :return False | ||
user.status = status | ||
db.session.commit() | ||
return user | ||
def updateProfile(self,id,data): | ||
user = User.query.filter_by(user_id=id).first() | ||
if(not user) :return False | ||
user.name = data['name'] or user.name | ||
user.email = data['email'] or user.email | ||
user.password = bcrypt.hashpw(data['password'].encode('utf-8'), bcrypt.gensalt()) if data['password'] else user.password | ||
db.session.commit() | ||
return user | ||
def updateBalance(self,id,nominal,operator): | ||
user = User.query.filter_by(user_id=id).first() | ||
if(not user) :return False | ||
if(operator == 'plus'): | ||
user.balance += nominal | ||
if(operator == 'minus'): | ||
user.balance -= nominal | ||
|
||
db.session.commit() | ||
return user | ||
def getAllUser(self): | ||
return User.query.all() | ||
|
||
def getUserByEmail(self, email): | ||
return User.query.filter_by(email=email).first() | ||
|
||
def createNewUser(self, data): | ||
password = bcrypt.hashpw(data["password"].encode("utf-8"), bcrypt.gensalt()) | ||
newUser = User( | ||
name=data["name"], | ||
email=data["email"], | ||
password=password, | ||
status="INACTIVE" if data["role"] == "EVENT_ORGANIZER" else "ACTIVE", | ||
role=data["role"], | ||
balance=0, | ||
) | ||
db.session.add(newUser) | ||
db.session.commit() | ||
return newUser | ||
|
||
def getUserById(self, user_id): | ||
return User.query.filter_by(user_id=user_id).first() | ||
|
||
def verifyUser(self, user_id, status): | ||
user = User.query.filter_by(user_id=user_id).first() | ||
if not user: | ||
return False | ||
user.status = status | ||
db.session.commit() | ||
return user | ||
|
||
def updateProfile(self, id, data): | ||
user = User.query.filter_by(user_id=id).first() | ||
if not user: | ||
return False | ||
user.name = data["name"] or user.name | ||
user.email = data["email"] or user.email | ||
user.password = ( | ||
bcrypt.hashpw(data["password"].encode("utf-8"), bcrypt.gensalt()) | ||
if data["password"] | ||
else user.password | ||
) | ||
db.session.commit() | ||
return user | ||
|
||
def updateBalance(self, id, nominal, operator): | ||
user = User.query.filter_by(user_id=id).first() | ||
if not user: | ||
return False | ||
if operator == "plus": | ||
user.balance += nominal | ||
if operator == "minus": | ||
user.balance -= nominal | ||
|
||
db.session.commit() | ||
return user |
Oops, something went wrong.