-
Notifications
You must be signed in to change notification settings - Fork 0
/
auth.py
147 lines (125 loc) · 5.36 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import os
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import streamlit as st
from passlib.context import CryptContext
from jose import JWTError, jwt
import secrets
import psycopg2
from psycopg2.extras import RealDictCursor
from utils.session_manager import clear_user_session, initialize_user_session
# Password hashing configuration
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT Configuration
SECRET_KEY = os.environ.get("SECRET_KEY", secrets.token_urlsafe(32))
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def get_db_connection():
"""Get database connection using environment variables"""
return psycopg2.connect(
dbname=os.environ["PGDATABASE"],
user=os.environ["PGUSER"],
password=os.environ["PGPASSWORD"],
host=os.environ["PGHOST"],
port=os.environ["PGPORT"]
)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Generate password hash"""
return pwd_context.hash(password)
def create_access_token(data: Dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def register_user(email: str, password: str, name: str, surname: str,
cellphone: str, purpose: str) -> Dict:
"""Register a new user"""
try:
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# Check if user exists
cur.execute("SELECT id FROM users WHERE email = %s", (email,))
if cur.fetchone():
return {"error": "User already exists"}
# Insert new user
cur.execute("""
INSERT INTO users (email, password_hash, name, surname,
cellphone, purpose)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id, email
""", (email, get_password_hash(password), name, surname,
cellphone, purpose))
user = cur.fetchone()
conn.commit()
# Create access token
access_token = create_access_token(
data={"sub": user["email"]},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
# Initialize user session state
initialize_user_session(str(user["id"]))
return {
"message": "Registration successful",
"access_token": access_token,
"token_type": "bearer",
"user_id": user["id"],
"email": user["email"]
}
except Exception as e:
return {"error": str(e)}
def authenticate_user(email: str, password: str) -> Dict:
"""Authenticate user and create access token"""
try:
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT id, email, password_hash
FROM users WHERE email = %s
""", (email,))
user = cur.fetchone()
if not user:
return {"error": "Invalid credentials"}
if not verify_password(password, user["password_hash"]):
return {"error": "Invalid credentials"}
# Create access token
access_token = create_access_token(
data={"sub": user["email"]},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
# Update last login
cur.execute("""
UPDATE users
SET last_login = CURRENT_TIMESTAMP
WHERE id = %s
""", (user["id"],))
conn.commit()
# Initialize user session state
initialize_user_session(str(user["id"]))
return {
"access_token": access_token,
"token_type": "bearer",
"user_id": user["id"],
"email": user["email"]
}
except Exception as e:
return {"error": str(e)}
def logout_user():
"""Logout current user and clear their session state"""
if "user_id" in st.session_state:
user_id = st.session_state.user_id
clear_user_session(str(user_id))
del st.session_state.user_id
del st.session_state.user_email
if "access_token" in st.session_state:
del st.session_state.access_token
def is_authenticated() -> bool:
"""Check if user is authenticated in current session"""
return bool(st.session_state.get("user_id"))