Files
leopost-full/backend/app/routers/auth.py
Michele Borraccia 2ca8b957e9 feat: sync all BRAIN mobile changes - onboarding, cookies, legal, mobile UX, settings
- Add OnboardingWizard, BetaBanner, CookieBanner components
- Add legal pages (Privacy, Terms, Cookies)
- Update Layout with mobile topbar, sidebar drawer, plan banner
- Update SettingsPage with profile, API config, security
- Update CharacterForm with topic suggestions, niche chips
- Update EditorialCalendar with shared strategy card
- Update ContentPage with narrative technique + brief
- Update SocialAccounts with 4 platforms and token guides
- Fix CSS button color inheritance, mobile responsive
- Add backup script
- Update .gitignore for pgdata and backups

Co-Authored-By: Claude (BRAIN/StackOS) <noreply@anthropic.com>
2026-04-03 14:59:14 +00:00

392 lines
13 KiB
Python

"""Authentication router — multi-user SaaS with local + Google OAuth."""
import secrets
import uuid
from datetime import datetime, timedelta
from typing import Optional
import httpx
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import RedirectResponse
from pydantic import BaseModel
from sqlalchemy.orm import Session
from ..auth import (
create_access_token,
get_current_user,
get_user_by_email,
hash_password,
verify_password,
)
from ..config import settings
from ..database import get_db
from ..models import SubscriptionCode, User
router = APIRouter(prefix="/api/auth", tags=["auth"])
# === Schemas ===
class RegisterRequest(BaseModel):
email: str
password: str
display_name: Optional[str] = None
class LoginRequest(BaseModel):
email: Optional[str] = None
username: Optional[str] = None # backward compat
password: str
class RedeemCodeRequest(BaseModel):
code: str
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
def _user_response(user: User) -> dict:
return {
"id": user.id,
"email": user.email,
"username": user.username,
"display_name": user.display_name,
"avatar_url": user.avatar_url,
"subscription_plan": user.subscription_plan or "freemium",
"subscription_expires_at": user.subscription_expires_at.isoformat() if user.subscription_expires_at else None,
"is_admin": bool(user.is_admin),
"posts_generated_this_month": user.posts_generated_this_month or 0,
}
# === Endpoints ===
@router.post("/register")
def register(request: RegisterRequest, db: Session = Depends(get_db)):
"""Register a new user with email + password."""
# Check email uniqueness
existing = get_user_by_email(db, request.email)
if existing:
raise HTTPException(status_code=400, detail="Email già registrata.")
# Build username from email for backward compat
username_base = request.email.split("@")[0]
username = username_base
counter = 1
while db.query(User).filter(User.username == username).first():
username = f"{username_base}{counter}"
counter += 1
user = User(
username=username,
hashed_password=hash_password(request.password),
email=request.email,
display_name=request.display_name or username,
auth_provider="local",
subscription_plan="freemium",
is_admin=False,
)
db.add(user)
db.commit()
db.refresh(user)
token = create_access_token({"sub": user.username, "user_id": user.id})
return {
"access_token": token,
"token_type": "bearer",
"user": _user_response(user),
}
@router.post("/login")
def login(request: LoginRequest, db: Session = Depends(get_db)):
"""Login with email (or legacy username) + password."""
user = None
if request.email:
user = get_user_by_email(db, request.email)
elif request.username:
# Backward compat: check username OR email
user = db.query(User).filter(User.username == request.username).first()
if not user:
user = get_user_by_email(db, request.username)
if not user or not verify_password(request.password, user.hashed_password):
raise HTTPException(status_code=401, detail="Credenziali non valide.")
token = create_access_token({"sub": user.username, "user_id": user.id})
return {
"access_token": token,
"token_type": "bearer",
"user": _user_response(user),
}
@router.get("/me")
def me(user: User = Depends(get_current_user)):
"""Get current authenticated user info."""
return _user_response(user)
class UpdateProfileRequest(BaseModel):
display_name: str
@router.put("/me")
def update_profile(
request: UpdateProfileRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Update user display name."""
name = request.display_name.strip()
if not name:
raise HTTPException(status_code=400, detail="Il nome non può essere vuoto.")
current_user.display_name = name
db.commit()
db.refresh(current_user)
return _user_response(current_user)
@router.post("/logout")
def logout():
"""Logout — client should remove the token."""
return {"message": "ok"}
# === Google OAuth ===
@router.get("/oauth/google")
def oauth_google_start():
"""Redirect to Google OAuth consent screen."""
if not settings.google_client_id:
raise HTTPException(status_code=501, detail="Google OAuth non configurato.")
state_token = str(uuid.uuid4())
params = {
"client_id": settings.google_client_id,
"redirect_uri": f"{settings.app_url}/api/auth/oauth/google/callback",
"response_type": "code",
"scope": "openid email profile",
"state": state_token,
"access_type": "offline",
}
query = "&".join(f"{k}={v}" for k, v in params.items())
auth_url = f"https://accounts.google.com/o/oauth2/v2/auth?{query}"
return RedirectResponse(url=auth_url)
@router.get("/oauth/google/callback")
async def oauth_google_callback(code: str, state: Optional[str] = None, error: Optional[str] = None, db: Session = Depends(get_db)):
"""Exchange Google OAuth code for token, create/find user, redirect with JWT."""
# Handle Google OAuth user denial or access errors
if error:
return RedirectResponse(url=f"{settings.app_url}/login?oauth_error={error}")
if not settings.google_client_id or not settings.google_client_secret:
return RedirectResponse(url=f"{settings.app_url}/login?oauth_error=non_configurato")
try:
# Exchange code for tokens
async with httpx.AsyncClient() as client:
token_resp = await client.post(
"https://oauth2.googleapis.com/token",
data={
"code": code,
"client_id": settings.google_client_id,
"client_secret": settings.google_client_secret,
"redirect_uri": f"{settings.app_url}/api/auth/oauth/google/callback",
"grant_type": "authorization_code",
},
)
if token_resp.status_code != 200:
return RedirectResponse(url=f"{settings.app_url}/login?oauth_error=token_exchange")
token_data = token_resp.json()
# Get user info
userinfo_resp = await client.get(
"https://www.googleapis.com/oauth2/v3/userinfo",
headers={"Authorization": f"Bearer {token_data['access_token']}"},
)
if userinfo_resp.status_code != 200:
return RedirectResponse(url=f"{settings.app_url}/login?oauth_error=userinfo")
google_user = userinfo_resp.json()
google_id = google_user.get("sub")
email = google_user.get("email")
name = google_user.get("name")
picture = google_user.get("picture")
if not google_id or not email:
return RedirectResponse(url=f"{settings.app_url}/login?oauth_error=missing_data")
# Find existing user by google_id or email
user = db.query(User).filter(User.google_id == google_id).first()
if not user and email:
user = get_user_by_email(db, email)
if user:
# Update google_id and avatar if missing
if not user.google_id:
user.google_id = google_id
if not user.avatar_url and picture:
user.avatar_url = picture
db.commit()
else:
# Create new user
username_base = (email or google_id).split("@")[0]
username = username_base
counter = 1
while db.query(User).filter(User.username == username).first():
username = f"{username_base}{counter}"
counter += 1
user = User(
username=username,
hashed_password=hash_password(secrets.token_urlsafe(32)),
email=email,
display_name=name or username,
avatar_url=picture,
auth_provider="google",
google_id=google_id,
subscription_plan="freemium",
is_admin=False,
)
db.add(user)
db.commit()
db.refresh(user)
jwt_token = create_access_token({"sub": user.username, "user_id": user.id})
redirect_url = f"{settings.app_url}/auth/callback?token={jwt_token}"
return RedirectResponse(url=redirect_url)
except Exception as e:
import logging
logging.getLogger(__name__).error(f"Google OAuth callback error: {e}", exc_info=True)
return RedirectResponse(url=f"{settings.app_url}/login?oauth_error=server_error")
# === Change password ===
@router.post("/change-password")
def change_password(
request: ChangePasswordRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Change password for the current user (local accounts only)."""
if current_user.auth_provider != "local":
raise HTTPException(status_code=400, detail="Usa il provider di accesso originale per cambiare la password.")
if not verify_password(request.current_password, current_user.hashed_password):
raise HTTPException(status_code=400, detail="Password attuale non corretta.")
if len(request.new_password) < 8:
raise HTTPException(status_code=400, detail="La nuova password deve essere di almeno 8 caratteri.")
current_user.hashed_password = hash_password(request.new_password)
db.commit()
return {"message": "Password aggiornata con successo."}
# === Subscription code redemption ===
@router.post("/redeem")
def redeem_code(
request: RedeemCodeRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Redeem a Pro subscription code."""
code = db.query(SubscriptionCode).filter(
SubscriptionCode.code == request.code.upper().strip()
).first()
if not code:
raise HTTPException(status_code=404, detail="Codice non trovato.")
if code.used_by_user_id is not None:
raise HTTPException(status_code=400, detail="Codice già utilizzato.")
# Calculate new expiry
now = datetime.utcnow()
current_expiry = current_user.subscription_expires_at
if current_user.subscription_plan == "pro" and current_expiry and current_expiry > now:
# Extend existing pro subscription
base_date = current_expiry
else:
base_date = now
new_expiry = base_date + timedelta(days=30 * code.duration_months)
# Update user
current_user.subscription_plan = "pro"
current_user.subscription_expires_at = new_expiry
# Mark code as used
code.used_by_user_id = current_user.id
code.used_at = now
db.commit()
db.refresh(current_user)
return {
"subscription_plan": current_user.subscription_plan,
"subscription_expires_at": current_user.subscription_expires_at.isoformat(),
}
# === Export user data (GDPR) ===
@router.get("/export-data")
def export_user_data(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Export all personal data for the current user (GDPR compliance)."""
from app.models import Post, Character, AffiliateLinkModel, PublishingPlan, SocialAccount
posts = db.query(Post).filter(Post.user_id == current_user.id).all() if hasattr(Post, 'user_id') else []
data = {
"exported_at": datetime.utcnow().isoformat(),
"user": {
"id": current_user.id,
"username": current_user.username,
"email": current_user.email,
"display_name": current_user.display_name,
"auth_provider": current_user.auth_provider,
"subscription_plan": current_user.subscription_plan,
"subscription_expires_at": current_user.subscription_expires_at.isoformat() if current_user.subscription_expires_at else None,
"created_at": current_user.created_at.isoformat() if current_user.created_at else None,
},
}
from fastapi.responses import JSONResponse
from fastapi import Response
import json
content = json.dumps(data, ensure_ascii=False, indent=2)
return Response(
content=content,
media_type="application/json",
headers={"Content-Disposition": f'attachment; filename="leopost-data-{current_user.username}.json"'}
)
# === Delete account ===
class DeleteAccountRequest(BaseModel):
confirmation: str # must equal "ELIMINA"
@router.delete("/account")
def delete_account(
request: DeleteAccountRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Permanently delete the user account and all associated data."""
if request.confirmation != "ELIMINA":
raise HTTPException(status_code=400, detail="Conferma non valida. Digita ELIMINA per procedere.")
# Delete user (cascade should handle related records if FK set, else manual)
db.delete(current_user)
db.commit()
return {"message": "Account eliminato con successo."}