from datetime import datetime, timedelta import bcrypt from fastapi import APIRouter, Depends, HTTPException from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from sqlalchemy.orm import Session from .config import settings from .database import get_db from .models import User from .schemas import LoginRequest, Token router = APIRouter(prefix="/api/auth", tags=["auth"]) oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") def hash_password(password: str) -> str: return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def verify_password(plain: str, hashed: str) -> bool: return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) def create_access_token(data: dict) -> str: to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes) to_encode.update({"exp": expire}) return jwt.encode(to_encode, settings.secret_key, algorithm="HS256") def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db) ) -> User: try: payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"]) username: str = payload.get("sub") if username is None: raise HTTPException(status_code=401, detail="Invalid token") except JWTError: raise HTTPException(status_code=401, detail="Invalid token") user = db.query(User).filter(User.username == username).first() if user is None: raise HTTPException(status_code=401, detail="User not found") return user @router.post("/login", response_model=Token) def login(request: LoginRequest, db: Session = Depends(get_db)): user = db.query(User).filter(User.username == request.username).first() if not user or not verify_password(request.password, user.hashed_password): raise HTTPException(status_code=401, detail="Invalid credentials") token = create_access_token({"sub": user.username}) return Token(access_token=token) @router.get("/me") def me(user: User = Depends(get_current_user)): return {"username": user.username}