Files
leopost-full/backend/app/routers/social.py
Michele 77ca70cd48 feat: multi-user SaaS, piani Freemium/Pro, Google OAuth, admin panel
BLOCCO 1 - Multi-user data model:
- User: email, display_name, avatar_url, auth_provider, google_id
- User: subscription_plan, subscription_expires_at, is_admin, post counters
- SubscriptionCode table per redeem codes
- user_id FK su Character, Post, AffiliateLink, EditorialPlan, SocialAccount, SystemSetting
- Migrazione SQLite-safe (ALTER TABLE) + preserva dati esistenti

BLOCCO 2 - Auth completo:
- Registrazione email/password + login multi-user
- Google OAuth 2.0 (httpx, no deps esterne)
- Callback flow: Google -> /auth/callback?token=JWT -> frontend
- Backward compat login admin con username

BLOCCO 3 - Piani e abbonamenti:
- Freemium: 1 character, 15 post/mese, FB+IG only, no auto-plans
- Pro: illimitato, tutte le piattaforme, tutte le feature
- Enforcement automatico in tutti i router
- Redeem codes con durate 1/3/6/12 mesi
- Admin panel: genera codici, lista utenti

BLOCCO 4 - Frontend completo:
- Login page design Leopost (split coral/cream, Google, social coming soon)
- AuthCallback per OAuth redirect
- PlanBanner, UpgradeModal con pricing
- AdminSettings per generazione codici
- CharacterForm con tab Account Social + guide setup

Deploy:
- Dockerfile con ARG VITE_BASE_PATH/VITE_API_BASE
- docker-compose.prod.yml per leopost.it (no subpath)
- docker-compose.yml aggiornato per lab

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 20:01:07 +02:00

237 lines
7.6 KiB
Python

"""Social account management and publishing router.
Handles CRUD for social media accounts and manual publishing of scheduled posts.
"""
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from ..auth import get_current_user
from ..database import get_db
from ..models import Post, ScheduledPost, SocialAccount, User
from ..schemas import (
ScheduledPostResponse,
SocialAccountCreate,
SocialAccountResponse,
SocialAccountUpdate,
)
from ..services.social import get_publisher
router = APIRouter(
prefix="/api/social",
tags=["social"],
)
# === Social Accounts ===
@router.get("/accounts", response_model=list[SocialAccountResponse])
def list_social_accounts(
character_id: int | None = Query(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all social accounts, optionally filtered by character."""
query = db.query(SocialAccount).filter(SocialAccount.user_id == current_user.id)
if character_id is not None:
query = query.filter(SocialAccount.character_id == character_id)
return query.order_by(SocialAccount.created_at.desc()).all()
@router.get("/accounts/{account_id}", response_model=SocialAccountResponse)
def get_social_account(
account_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get a single social account by ID."""
account = (
db.query(SocialAccount)
.filter(SocialAccount.id == account_id, SocialAccount.user_id == current_user.id)
.first()
)
if not account:
raise HTTPException(status_code=404, detail="Social account not found")
return account
@router.post("/accounts", response_model=SocialAccountResponse, status_code=201)
def create_social_account(
data: SocialAccountCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Create/connect a new social account."""
account = SocialAccount(**data.model_dump())
account.user_id = current_user.id
db.add(account)
db.commit()
db.refresh(account)
return account
@router.put("/accounts/{account_id}", response_model=SocialAccountResponse)
def update_social_account(
account_id: int,
data: SocialAccountUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Update a social account."""
account = (
db.query(SocialAccount)
.filter(SocialAccount.id == account_id, SocialAccount.user_id == current_user.id)
.first()
)
if not account:
raise HTTPException(status_code=404, detail="Social account not found")
update_data = data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(account, key, value)
db.commit()
db.refresh(account)
return account
@router.delete("/accounts/{account_id}", status_code=204)
def delete_social_account(
account_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Delete a social account."""
account = (
db.query(SocialAccount)
.filter(SocialAccount.id == account_id, SocialAccount.user_id == current_user.id)
.first()
)
if not account:
raise HTTPException(status_code=404, detail="Social account not found")
db.delete(account)
db.commit()
@router.post("/accounts/{account_id}/test")
def test_social_account(
account_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Test connection to a social account by making a simple API call."""
account = (
db.query(SocialAccount)
.filter(SocialAccount.id == account_id, SocialAccount.user_id == current_user.id)
.first()
)
if not account:
raise HTTPException(status_code=404, detail="Social account not found")
if not account.access_token:
raise HTTPException(status_code=400, detail="No access token configured for this account")
try:
kwargs: dict = {}
if account.platform == "facebook":
if not account.page_id:
raise HTTPException(status_code=400, detail="Facebook account requires page_id")
kwargs["page_id"] = account.page_id
elif account.platform == "instagram":
ig_user_id = account.account_id or (account.extra_data or {}).get("ig_user_id")
if not ig_user_id:
raise HTTPException(status_code=400, detail="Instagram account requires ig_user_id")
kwargs["ig_user_id"] = ig_user_id
get_publisher(account.platform, account.access_token, **kwargs)
return {"status": "ok", "message": f"Connection to {account.platform} account is configured correctly"}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
raise HTTPException(status_code=502, detail=f"Connection test failed: {e}")
# === Publishing ===
@router.post("/publish/{scheduled_post_id}", response_model=ScheduledPostResponse)
def publish_scheduled_post(
scheduled_post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Manually trigger publishing of a scheduled post."""
scheduled = (
db.query(ScheduledPost)
.filter(ScheduledPost.id == scheduled_post_id)
.first()
)
if not scheduled:
raise HTTPException(status_code=404, detail="Scheduled post not found")
post = db.query(Post).filter(Post.id == scheduled.post_id, Post.user_id == current_user.id).first()
if not post:
raise HTTPException(status_code=404, detail="Associated post not found")
account = (
db.query(SocialAccount)
.filter(
SocialAccount.character_id == post.character_id,
SocialAccount.platform == scheduled.platform,
SocialAccount.is_active == True,
SocialAccount.user_id == current_user.id,
)
.first()
)
if not account:
raise HTTPException(
status_code=400,
detail=f"No active {scheduled.platform} account found for character {post.character_id}",
)
if not account.access_token:
raise HTTPException(status_code=400, detail="Social account has no access token configured")
kwargs: dict = {}
if account.platform == "facebook":
kwargs["page_id"] = account.page_id
elif account.platform == "instagram":
kwargs["ig_user_id"] = account.account_id or (account.extra_data or {}).get("ig_user_id")
try:
scheduled.status = "publishing"
db.commit()
publisher = get_publisher(account.platform, account.access_token, **kwargs)
text = post.text_content or ""
if post.hashtags:
text = f"{text}\n\n{' '.join(post.hashtags)}"
if post.video_url:
external_id = publisher.publish_video(text, post.video_url)
elif post.image_url:
external_id = publisher.publish_image(text, post.image_url)
else:
external_id = publisher.publish_text(text)
scheduled.status = "published"
scheduled.published_at = datetime.utcnow()
scheduled.external_post_id = external_id
post.status = "published"
post.updated_at = datetime.utcnow()
db.commit()
db.refresh(scheduled)
return scheduled
except (RuntimeError, ValueError) as e:
scheduled.status = "failed"
scheduled.error_message = str(e)
db.commit()
db.refresh(scheduled)
raise HTTPException(status_code=502, detail=f"Publishing failed: {e}")