Files
back-end/src/crud/authentication.py
2023-04-06 14:10:31 -06:00

77 lines
2.5 KiB
Python

from datetime import datetime, timedelta
import jwt
from fastapi import Depends, HTTPException
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.orm import Session
from src.crud.users import get_user_by_username, pwd_context
from src.models import User
from src.schemas.users import UserCreate
DEFAULT_NR_HIGH_SCORES = 10
# JWT authentication setup
jwt_secret = "secret_key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 44640 # One month
bearer_scheme = HTTPBearer()
def get_current_user_name(
token: HTTPAuthorizationCredentials = Depends(bearer_scheme),
):
try:
payload = jwt.decode(
token.credentials,
jwt_secret,
algorithms=[ALGORITHM],
)
username = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid JWT token")
return username
except jwt.exceptions.DecodeError:
raise HTTPException(status_code=401, detail="Invalid JWT token")
def authenticate_user(db: Session, username: str, password: str):
"""Checks whether the provided credentials match with an existing User"""
db_user = get_user_by_username(db, username)
if not db_user:
return False
hashed_password = db_user.hashed_password
if not hashed_password or not pwd_context.verify(password, hashed_password):
return False
return db_user
def register(db: Session, username: str, password: str, avatar: str):
"""Register a new user"""
if avatar == "":
raise HTTPException(status_code=400, detail="No avatar was provided")
db_user = get_user_by_username(db, username)
if db_user:
raise HTTPException(status_code=400, detail="Username already registered")
db_user = User(
username=username, hashed_password=pwd_context.hash(password), avatar=avatar
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return login(db, username, password)
def login(db: Session, username: str, password: str):
user = authenticate_user(db, username, password)
if not user:
raise HTTPException(status_code=401, detail="Invalid username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token_payload = {
"sub": user.username,
"exp": datetime.utcnow() + access_token_expires,
}
access_token = jwt.encode(access_token_payload, jwt_secret, algorithm=ALGORITHM)
return {"access_token": access_token}