Files
leopost-full/backend/app/routers/auth.py
Michele 8b77f1b86b feat(fase0): fix title, add change-password endpoint
- index.html: title → "Leopost — Studio Editoriale AI"
- auth router: add POST /api/auth/change-password (local accounts only)
  validates current password, enforces min 8 chars, bcrypt update

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-01 17:36:27 +02:00

303 lines
9.6 KiB
Python

"""Authentication router — multi-user SaaS with local + Google OAuth."""
import secrets
import uuid
from datetime import datetime, timedelta
from typing import Optional
import httpx
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import RedirectResponse
from pydantic import BaseModel
from sqlalchemy.orm import Session
from ..auth import (
create_access_token,
get_current_user,
get_user_by_email,
hash_password,
verify_password,
)
from ..config import settings
from ..database import get_db
from ..models import SubscriptionCode, User
router = APIRouter(prefix="/api/auth", tags=["auth"])
# === Schemas ===
class RegisterRequest(BaseModel):
email: str
password: str
display_name: Optional[str] = None
class LoginRequest(BaseModel):
email: Optional[str] = None
username: Optional[str] = None # backward compat
password: str
class RedeemCodeRequest(BaseModel):
code: str
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
def _user_response(user: User) -> dict:
return {
"id": user.id,
"email": user.email,
"username": user.username,
"display_name": user.display_name,
"avatar_url": user.avatar_url,
"subscription_plan": user.subscription_plan or "freemium",
"subscription_expires_at": user.subscription_expires_at.isoformat() if user.subscription_expires_at else None,
"is_admin": bool(user.is_admin),
"posts_generated_this_month": user.posts_generated_this_month or 0,
}
# === Endpoints ===
@router.post("/register")
def register(request: RegisterRequest, db: Session = Depends(get_db)):
"""Register a new user with email + password."""
# Check email uniqueness
existing = get_user_by_email(db, request.email)
if existing:
raise HTTPException(status_code=400, detail="Email già registrata.")
# Build username from email for backward compat
username_base = request.email.split("@")[0]
username = username_base
counter = 1
while db.query(User).filter(User.username == username).first():
username = f"{username_base}{counter}"
counter += 1
user = User(
username=username,
hashed_password=hash_password(request.password),
email=request.email,
display_name=request.display_name or username,
auth_provider="local",
subscription_plan="freemium",
is_admin=False,
)
db.add(user)
db.commit()
db.refresh(user)
token = create_access_token({"sub": user.username, "user_id": user.id})
return {
"access_token": token,
"token_type": "bearer",
"user": _user_response(user),
}
@router.post("/login")
def login(request: LoginRequest, db: Session = Depends(get_db)):
"""Login with email (or legacy username) + password."""
user = None
if request.email:
user = get_user_by_email(db, request.email)
elif request.username:
# Backward compat: check username OR email
user = db.query(User).filter(User.username == request.username).first()
if not user:
user = get_user_by_email(db, request.username)
if not user or not verify_password(request.password, user.hashed_password):
raise HTTPException(status_code=401, detail="Credenziali non valide.")
token = create_access_token({"sub": user.username, "user_id": user.id})
return {
"access_token": token,
"token_type": "bearer",
"user": _user_response(user),
}
@router.get("/me")
def me(user: User = Depends(get_current_user)):
"""Get current authenticated user info."""
return _user_response(user)
@router.post("/logout")
def logout():
"""Logout — client should remove the token."""
return {"message": "ok"}
# === Google OAuth ===
@router.get("/oauth/google")
def oauth_google_start():
"""Redirect to Google OAuth consent screen."""
if not settings.google_client_id:
raise HTTPException(status_code=501, detail="Google OAuth non configurato.")
state_token = str(uuid.uuid4())
params = {
"client_id": settings.google_client_id,
"redirect_uri": f"{settings.app_url}/api/auth/oauth/google/callback",
"response_type": "code",
"scope": "openid email profile",
"state": state_token,
"access_type": "offline",
}
query = "&".join(f"{k}={v}" for k, v in params.items())
auth_url = f"https://accounts.google.com/o/oauth2/v2/auth?{query}"
return RedirectResponse(url=auth_url)
@router.get("/oauth/google/callback")
async def oauth_google_callback(code: str, state: Optional[str] = None, db: Session = Depends(get_db)):
"""Exchange Google OAuth code for token, create/find user, redirect with JWT."""
if not settings.google_client_id or not settings.google_client_secret:
raise HTTPException(status_code=501, detail="Google OAuth non configurato.")
# Exchange code for tokens
async with httpx.AsyncClient() as client:
token_resp = await client.post(
"https://oauth2.googleapis.com/token",
data={
"code": code,
"client_id": settings.google_client_id,
"client_secret": settings.google_client_secret,
"redirect_uri": f"{settings.app_url}/api/auth/oauth/google/callback",
"grant_type": "authorization_code",
},
)
if token_resp.status_code != 200:
raise HTTPException(status_code=400, detail="Errore scambio token Google.")
token_data = token_resp.json()
# Get user info
userinfo_resp = await client.get(
"https://www.googleapis.com/oauth2/v3/userinfo",
headers={"Authorization": f"Bearer {token_data['access_token']}"},
)
if userinfo_resp.status_code != 200:
raise HTTPException(status_code=400, detail="Errore recupero profilo Google.")
google_user = userinfo_resp.json()
google_id = google_user.get("sub")
email = google_user.get("email")
name = google_user.get("name")
picture = google_user.get("picture")
# Find existing user by google_id or email
user = db.query(User).filter(User.google_id == google_id).first()
if not user and email:
user = get_user_by_email(db, email)
if user:
# Update google_id and avatar if missing
if not user.google_id:
user.google_id = google_id
if not user.avatar_url and picture:
user.avatar_url = picture
db.commit()
else:
# Create new user
username_base = (email or google_id).split("@")[0]
username = username_base
counter = 1
while db.query(User).filter(User.username == username).first():
username = f"{username_base}{counter}"
counter += 1
user = User(
username=username,
hashed_password=hash_password(secrets.token_urlsafe(32)),
email=email,
display_name=name or username,
avatar_url=picture,
auth_provider="google",
google_id=google_id,
subscription_plan="freemium",
is_admin=False,
)
db.add(user)
db.commit()
db.refresh(user)
jwt_token = create_access_token({"sub": user.username, "user_id": user.id})
redirect_url = f"{settings.app_url}/auth/callback?token={jwt_token}"
return RedirectResponse(url=redirect_url)
# === Change password ===
@router.post("/change-password")
def change_password(
request: ChangePasswordRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Change password for the current user (local accounts only)."""
if current_user.auth_provider != "local":
raise HTTPException(status_code=400, detail="Usa il provider di accesso originale per cambiare la password.")
if not verify_password(request.current_password, current_user.hashed_password):
raise HTTPException(status_code=400, detail="Password attuale non corretta.")
if len(request.new_password) < 8:
raise HTTPException(status_code=400, detail="La nuova password deve essere di almeno 8 caratteri.")
current_user.hashed_password = hash_password(request.new_password)
db.commit()
return {"message": "Password aggiornata con successo."}
# === Subscription code redemption ===
@router.post("/redeem")
def redeem_code(
request: RedeemCodeRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Redeem a Pro subscription code."""
code = db.query(SubscriptionCode).filter(
SubscriptionCode.code == request.code.upper().strip()
).first()
if not code:
raise HTTPException(status_code=404, detail="Codice non trovato.")
if code.used_by_user_id is not None:
raise HTTPException(status_code=400, detail="Codice già utilizzato.")
# Calculate new expiry
now = datetime.utcnow()
current_expiry = current_user.subscription_expires_at
if current_user.subscription_plan == "pro" and current_expiry and current_expiry > now:
# Extend existing pro subscription
base_date = current_expiry
else:
base_date = now
new_expiry = base_date + timedelta(days=30 * code.duration_months)
# Update user
current_user.subscription_plan = "pro"
current_user.subscription_expires_at = new_expiry
# Mark code as used
code.used_by_user_id = current_user.id
code.used_at = now
db.commit()
db.refresh(current_user)
return {
"subscription_plan": current_user.subscription_plan,
"subscription_expires_at": current_user.subscription_expires_at.isoformat(),
}