"""Core authentication utilities used throughout the application.""" from datetime import datetime, timedelta import bcrypt from fastapi import Depends, HTTPException from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from sqlalchemy.orm import Session from .config import settings from .database import get_db from .models import User oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") def hash_password(password: str) -> str: return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def verify_password(plain: str, hashed: str) -> bool: return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) def create_access_token(data: dict) -> str: to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes) to_encode.update({"exp": expire}) return jwt.encode(to_encode, settings.secret_key, algorithm="HS256") def get_user_by_email(db: Session, email: str) -> User | None: return db.query(User).filter(User.email == email).first() def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db) ) -> User: try: payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"]) # Support both sub (user_id) and legacy username user_id = payload.get("user_id") username = payload.get("sub") if user_id is None and username is None: raise HTTPException(status_code=401, detail="Invalid token") except JWTError: raise HTTPException(status_code=401, detail="Invalid token") if user_id is not None: user = db.query(User).filter(User.id == user_id).first() else: # Legacy: username-based token user = db.query(User).filter(User.username == username).first() if user is None: raise HTTPException(status_code=401, detail="User not found") return user