First authentication prototype

This commit is contained in:
lvrossem
2023-03-30 07:22:30 -06:00
parent dcff7e4f35
commit 3e12125c09
3 changed files with 69 additions and 4 deletions

View File

@@ -7,4 +7,8 @@ fastapi_utils
flake8
black
isort
interrogate
interrogate
python-jose[cryptography]
passlib
jwt
PyJWT

View File

@@ -22,8 +22,8 @@ def get_users(db: Session):
return db.query(User).all()
def create_user(db: Session, user: UserCreate):
db_user = User(username=user.username, hashed_password=user.password)
def create_user(db: Session, username: str, hashed_password: str):
db_user = User(username=username, hashed_password=hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)

View File

@@ -1,6 +1,10 @@
from datetime import datetime, timedelta
from typing import List, Optional
import jwt
from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from passlib.context import CryptContext
from sqlalchemy.orm import Session
import crud
@@ -14,6 +18,16 @@ app = FastAPI()
Base.metadata.create_all(bind=engine)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT authentication setup
jwt_secret = "secret_key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
bearer_scheme = HTTPBearer()
def get_db():
db = SessionLocal()
try:
@@ -38,7 +52,9 @@ async def create_user(user: users.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_username(db, username=user.username)
if db_user:
raise HTTPException(status_code=400, detail="Username already registered")
return crud.create_user(db=db, user=user)
return crud.create_user(
db=db, username=user.username, hashed_password=pwd_context.hash(user.password)
)
@app.get("/highscores", response_model=List[users.UserHighScore])
@@ -56,3 +72,48 @@ async def create_high_score(
high_score: highscores.HighScoreCreate, db: Session = Depends(get_db)
):
return crud.create_high_score(db=db, high_score=high_score)
#### TESTING!! DELETE LATER
async def get_current_user(
token: HTTPAuthorizationCredentials = Depends(bearer_scheme),
):
try:
payload = jwt.decode(token.credentials, jwt_secret, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid JWT token")
return username
except jwt.exceptions.DecodeError:
raise HTTPException(status_code=401, detail="Invalid JWT token")
@app.get("/protected")
async def protected_route(current_user=Depends(get_current_user)):
return {"message": f"Hello, {current_user}!"}
def authenticate_user(user: users.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_username(db=db, username=user.username)
if not db_user:
return False
hashed_password = db_user.hashed_password
if not hashed_password or not pwd_context.verify(user.password, hashed_password):
return False
return db_user
@app.post("/login")
async def login(user: users.UserCreate, db: Session = Depends(get_db)):
user = authenticate_user(user, db)
if not user:
raise HTTPException(status_code=401, detail="Invalid username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token_payload = {
"sub": user.username,
"exp": datetime.utcnow() + access_token_expires,
}
access_token = jwt.encode(access_token_payload, jwt_secret, algorithm=ALGORITHM)
return {"access_token": access_token}