""" Authentication according to the OAuth2 specification. When user is created it's password is stored in form of hashed_password returned from function hash_password Authentication flow Login using username and password, -> returns jwt token if successful When using other apis that needs authentication, get_current_user is called to get to validate and get the logged in user data get_current_user -> takes in bearer token and retrives user using oauth2_scheme """ import jwt from datetime import timedelta, datetime from typing import Annotated from fastapi import Depends, FastAPI, APIRouter, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm, SecurityScopes from pydantic import BaseModel, ConfigDict, ValidationError from sqlalchemy import select from pwdlib import PasswordHash from ..models import Users from .schemas import * from ..db.db import db_dependency from ..exceptions import ExceptionHandlerRoute #TODO: env file SECRET_KEY = "17267606688d2ef63beeb906b8dab8448cb8b1e3d147678ed6d69eb9eb147b26" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 120 router = APIRouter(prefix = '/auth', tags = ['Auth'], route_class = ExceptionHandlerRoute) oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/auth/login', scopes= { "me": "Read information about the current user", "Items": "Read items" } ) password_has = PasswordHash.recommended() """ Helper functions to encode and decode and retriver user """ def verify_password(plain_password, hashed_password): return password_has.verify(plain_password, hashed_password) def get_password_hash(password): return password_has.hash(password) def get_user(db, username: str): query = select(Users).where((Users.username == username) | (Users.email == username)) user = db.execute(query).first() if user: return user[0] return None """ Takes in the jwt token and returns the user value from database Throws exceptions if username doesn't exist or password is incorrect """ def get_current_user(db: db_dependency, token : Annotated[str, Depends(oauth2_scheme)]) -> Users: credentials_exceptions = HTTPException( status_code = status.HTTP_401_UNAUTHORIZED, detail = "Could not validate credientals", headers = {"WWW-Authenticate": "Bearer"} ) try: #decode payload payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username = payload.get("sub") if username is None: raise credentials_exceptions token_data = TokenData(username = username) except jwt.InvalidTokenError: raise credentials_exceptions #get user from userid user = get_user(db, username = token_data.username) if user is None: raise credentials_exceptions return user @router.get('/users/me') def get_curr_user(current_user : Annotated[Users, Depends(get_current_user)]): return current_user def create_access_token(data: dict, expires_delta: timedelta | None = None): to_encode = data.copy() if expires_delta: expire = datetime.now() + expires_delta else: expire = datetime.now() + timedelta(minutes = 15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(payload = to_encode, key = SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def create_refresh_token(data: dict): to_encode = data.copy() expire = datetime.now() + timedelta(days = 7) data.update({"exp": expire}) return jwt.encode(data, SECRET_KEY, algorithm=ALGORITHM) @router.post("/refresh") def refresh_token(refresh_token : str): try: payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM]) except Exception as e: raise HTTPException( status_code = status.HTTP_401_UNAUTHORIZED, detail = "Invlid or expired refresh token" ) if not payload: raise HTTPException( status_code = status.HTTP_401_UNAUTHORIZED, detail = "Invlid or expired refresh token" ) username = payload.get("sub") new_access_token = create_access_token({"sub": username}) new_refresh_token = create_refresh_token({"sub": username}) return { "access_token" : new_access_token, "refresh_token": new_refresh_token, } def authenticate_user(db, username: str, password: str): #get user, hash password, verify password return true or false user = get_user(db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user #authenticate user using autneticate_user and return token created from create_access_token @router.post("/login") def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: db_dependency): #get user from database user = authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code = status.HTTP_401_UNAUTHORIZED, detail = "Username or password is incorrect", headers = {"WWW-Authenticate": "Bearer"} ) #create access tokens access_token_expires = timedelta(minutes = ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token(data = { "sub": user.username }, expires_delta= access_token_expires) refresh_token = create_access_token(data = { "sub": user.username }) return Token( access_token = access_token, refresh_token = refresh_token, token_type = "bearer" ) user_dependency = Annotated[Users, Depends(get_current_user)]