"""Authentication router — multi-user SaaS with local + Google OAuth.""" import secrets import uuid from datetime import datetime, timedelta from typing import Optional import httpx from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import RedirectResponse from pydantic import BaseModel from sqlalchemy.orm import Session from ..auth import ( create_access_token, get_current_user, get_user_by_email, hash_password, verify_password, ) from ..config import settings from ..database import get_db from ..models import SubscriptionCode, User router = APIRouter(prefix="/api/auth", tags=["auth"]) # === Schemas === class RegisterRequest(BaseModel): email: str password: str display_name: Optional[str] = None class LoginRequest(BaseModel): email: Optional[str] = None username: Optional[str] = None # backward compat password: str class RedeemCodeRequest(BaseModel): code: str class ChangePasswordRequest(BaseModel): current_password: str new_password: str def _user_response(user: User) -> dict: return { "id": user.id, "email": user.email, "username": user.username, "display_name": user.display_name, "avatar_url": user.avatar_url, "subscription_plan": user.subscription_plan or "freemium", "subscription_expires_at": user.subscription_expires_at.isoformat() if user.subscription_expires_at else None, "is_admin": bool(user.is_admin), "posts_generated_this_month": user.posts_generated_this_month or 0, } # === Endpoints === @router.post("/register") def register(request: RegisterRequest, db: Session = Depends(get_db)): """Register a new user with email + password.""" # Check email uniqueness existing = get_user_by_email(db, request.email) if existing: raise HTTPException(status_code=400, detail="Email già registrata.") # Build username from email for backward compat username_base = request.email.split("@")[0] username = username_base counter = 1 while db.query(User).filter(User.username == username).first(): username = f"{username_base}{counter}" counter += 1 user = User( username=username, hashed_password=hash_password(request.password), email=request.email, display_name=request.display_name or username, auth_provider="local", subscription_plan="freemium", is_admin=False, ) db.add(user) db.commit() db.refresh(user) token = create_access_token({"sub": user.username, "user_id": user.id}) return { "access_token": token, "token_type": "bearer", "user": _user_response(user), } @router.post("/login") def login(request: LoginRequest, db: Session = Depends(get_db)): """Login with email (or legacy username) + password.""" user = None if request.email: user = get_user_by_email(db, request.email) elif request.username: # Backward compat: check username OR email user = db.query(User).filter(User.username == request.username).first() if not user: user = get_user_by_email(db, request.username) if not user or not verify_password(request.password, user.hashed_password): raise HTTPException(status_code=401, detail="Credenziali non valide.") token = create_access_token({"sub": user.username, "user_id": user.id}) return { "access_token": token, "token_type": "bearer", "user": _user_response(user), } @router.get("/me") def me(user: User = Depends(get_current_user)): """Get current authenticated user info.""" return _user_response(user) class UpdateProfileRequest(BaseModel): display_name: str @router.put("/me") def update_profile( request: UpdateProfileRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Update user display name.""" name = request.display_name.strip() if not name: raise HTTPException(status_code=400, detail="Il nome non può essere vuoto.") current_user.display_name = name db.commit() db.refresh(current_user) return _user_response(current_user) @router.post("/logout") def logout(): """Logout — client should remove the token.""" return {"message": "ok"} # === Google OAuth === @router.get("/oauth/google") def oauth_google_start(): """Redirect to Google OAuth consent screen.""" if not settings.google_client_id: raise HTTPException(status_code=501, detail="Google OAuth non configurato.") state_token = str(uuid.uuid4()) params = { "client_id": settings.google_client_id, "redirect_uri": f"{settings.app_url}/api/auth/oauth/google/callback", "response_type": "code", "scope": "openid email profile", "state": state_token, "access_type": "offline", } query = "&".join(f"{k}={v}" for k, v in params.items()) auth_url = f"https://accounts.google.com/o/oauth2/v2/auth?{query}" return RedirectResponse(url=auth_url) @router.get("/oauth/google/callback") async def oauth_google_callback(code: str, state: Optional[str] = None, error: Optional[str] = None, db: Session = Depends(get_db)): """Exchange Google OAuth code for token, create/find user, redirect with JWT.""" # Handle Google OAuth user denial or access errors if error: return RedirectResponse(url=f"{settings.app_url}/login?oauth_error={error}") if not settings.google_client_id or not settings.google_client_secret: return RedirectResponse(url=f"{settings.app_url}/login?oauth_error=non_configurato") try: # Exchange code for tokens async with httpx.AsyncClient() as client: token_resp = await client.post( "https://oauth2.googleapis.com/token", data={ "code": code, "client_id": settings.google_client_id, "client_secret": settings.google_client_secret, "redirect_uri": f"{settings.app_url}/api/auth/oauth/google/callback", "grant_type": "authorization_code", }, ) if token_resp.status_code != 200: return RedirectResponse(url=f"{settings.app_url}/login?oauth_error=token_exchange") token_data = token_resp.json() # Get user info userinfo_resp = await client.get( "https://www.googleapis.com/oauth2/v3/userinfo", headers={"Authorization": f"Bearer {token_data['access_token']}"}, ) if userinfo_resp.status_code != 200: return RedirectResponse(url=f"{settings.app_url}/login?oauth_error=userinfo") google_user = userinfo_resp.json() google_id = google_user.get("sub") email = google_user.get("email") name = google_user.get("name") picture = google_user.get("picture") if not google_id or not email: return RedirectResponse(url=f"{settings.app_url}/login?oauth_error=missing_data") # Find existing user by google_id or email user = db.query(User).filter(User.google_id == google_id).first() if not user and email: user = get_user_by_email(db, email) if user: # Update google_id and avatar if missing if not user.google_id: user.google_id = google_id if not user.avatar_url and picture: user.avatar_url = picture db.commit() else: # Create new user username_base = (email or google_id).split("@")[0] username = username_base counter = 1 while db.query(User).filter(User.username == username).first(): username = f"{username_base}{counter}" counter += 1 user = User( username=username, hashed_password=hash_password(secrets.token_urlsafe(32)), email=email, display_name=name or username, avatar_url=picture, auth_provider="google", google_id=google_id, subscription_plan="freemium", is_admin=False, ) db.add(user) db.commit() db.refresh(user) jwt_token = create_access_token({"sub": user.username, "user_id": user.id}) redirect_url = f"{settings.app_url}/auth/callback?token={jwt_token}" return RedirectResponse(url=redirect_url) except Exception as e: import logging logging.getLogger(__name__).error(f"Google OAuth callback error: {e}", exc_info=True) return RedirectResponse(url=f"{settings.app_url}/login?oauth_error=server_error") # === Change password === @router.post("/change-password") def change_password( request: ChangePasswordRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Change password for the current user (local accounts only).""" if current_user.auth_provider != "local": raise HTTPException(status_code=400, detail="Usa il provider di accesso originale per cambiare la password.") if not verify_password(request.current_password, current_user.hashed_password): raise HTTPException(status_code=400, detail="Password attuale non corretta.") if len(request.new_password) < 8: raise HTTPException(status_code=400, detail="La nuova password deve essere di almeno 8 caratteri.") current_user.hashed_password = hash_password(request.new_password) db.commit() return {"message": "Password aggiornata con successo."} # === Subscription code redemption === @router.post("/redeem") def redeem_code( request: RedeemCodeRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Redeem a Pro subscription code.""" code = db.query(SubscriptionCode).filter( SubscriptionCode.code == request.code.upper().strip() ).first() if not code: raise HTTPException(status_code=404, detail="Codice non trovato.") if code.used_by_user_id is not None: raise HTTPException(status_code=400, detail="Codice già utilizzato.") # Calculate new expiry now = datetime.utcnow() current_expiry = current_user.subscription_expires_at if current_user.subscription_plan == "pro" and current_expiry and current_expiry > now: # Extend existing pro subscription base_date = current_expiry else: base_date = now new_expiry = base_date + timedelta(days=30 * code.duration_months) # Update user current_user.subscription_plan = "pro" current_user.subscription_expires_at = new_expiry # Mark code as used code.used_by_user_id = current_user.id code.used_at = now db.commit() db.refresh(current_user) return { "subscription_plan": current_user.subscription_plan, "subscription_expires_at": current_user.subscription_expires_at.isoformat(), } # === Export user data (GDPR) === @router.get("/export-data") def export_user_data( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Export all personal data for the current user (GDPR compliance).""" from app.models import Post, Character, AffiliateLinkModel, PublishingPlan, SocialAccount posts = db.query(Post).filter(Post.user_id == current_user.id).all() if hasattr(Post, 'user_id') else [] data = { "exported_at": datetime.utcnow().isoformat(), "user": { "id": current_user.id, "username": current_user.username, "email": current_user.email, "display_name": current_user.display_name, "auth_provider": current_user.auth_provider, "subscription_plan": current_user.subscription_plan, "subscription_expires_at": current_user.subscription_expires_at.isoformat() if current_user.subscription_expires_at else None, "created_at": current_user.created_at.isoformat() if current_user.created_at else None, }, } from fastapi.responses import JSONResponse from fastapi import Response import json content = json.dumps(data, ensure_ascii=False, indent=2) return Response( content=content, media_type="application/json", headers={"Content-Disposition": f'attachment; filename="leopost-data-{current_user.username}.json"'} ) # === Delete account === class DeleteAccountRequest(BaseModel): confirmation: str # must equal "ELIMINA" @router.delete("/account") def delete_account( request: DeleteAccountRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Permanently delete the user account and all associated data.""" if request.confirmation != "ELIMINA": raise HTTPException(status_code=400, detail="Conferma non valida. Digita ELIMINA per procedere.") # Delete user (cascade should handle related records if FK set, else manual) db.delete(current_user) db.commit() return {"message": "Account eliminato con successo."}