Files
leopost-full/backend/app/routers/content.py
Michele 743a6c1324 feat: group posts by batch in archive + fullscreen confirm modal
Backend:
- Add batch_id column to Post model (UUID, groups posts from same generation)
- Set batch_id in /generate endpoint for all posts in same request

Frontend:
- ContentArchive: group posts by batch_id into single cards with platform tabs
- Character name at top, platform tabs below, status badge, text preview
- Click platform tab to switch between variants of same content
- ConfirmModal: render via React portal to document.body for true fullscreen overlay
- Add box-shadow and higher z-index for better visual separation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 12:52:01 +02:00

309 lines
10 KiB
Python

"""Content generation router.
Handles post generation via LLM, image generation, and CRUD operations on posts.
"""
import uuid
from datetime import date, datetime
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from ..auth import get_current_user
from ..database import get_db
from ..models import AffiliateLink, Character, Post, SystemSetting, User
from ..plan_limits import check_limit
from ..schemas import (
GenerateContentRequest,
GenerateImageRequest,
PostResponse,
PostUpdate,
)
from ..services.content import generate_hashtags, generate_post_text, inject_affiliate_links
from ..services.images import get_image_provider
from ..services.llm import get_llm_provider
router = APIRouter(
prefix="/api/content",
tags=["content"],
)
def _get_setting(db: Session, key: str, user_id: int = None) -> str | None:
"""Retrieve a system setting value by key, preferring user-specific over global."""
if user_id is not None:
setting = (
db.query(SystemSetting)
.filter(SystemSetting.key == key, SystemSetting.user_id == user_id)
.first()
)
if setting is not None:
return setting.value
# Fallback to global (no user_id)
setting = db.query(SystemSetting).filter(SystemSetting.key == key, SystemSetting.user_id == None).first()
if setting is None:
return None
return setting.value
@router.post("/generate", response_model=list[PostResponse])
def generate_content(
request: GenerateContentRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Generate content for a character using LLM. One post per platform."""
character = (
db.query(Character)
.filter(Character.id == request.character_id, Character.user_id == current_user.id)
.first()
)
if not character:
raise HTTPException(status_code=404, detail="Character not found")
# Determine platforms to generate for
platforms = request.platforms if request.platforms else [request.platform]
# Check monthly post limit
first_of_month = date.today().replace(day=1)
if current_user.posts_reset_date != first_of_month:
current_user.posts_generated_this_month = 0
current_user.posts_reset_date = first_of_month
db.commit()
current_count = current_user.posts_generated_this_month or 0
allowed, msg = check_limit(current_user, "posts_per_month", current_count)
if not allowed:
raise HTTPException(status_code=403, detail={"message": msg, "upgrade_required": True})
# Also check if we have room for all platforms
allowed_after, msg_after = check_limit(current_user, "posts_per_month", current_count + len(platforms) - 1)
if not allowed_after:
raise HTTPException(status_code=403, detail={
"message": f"Non hai abbastanza post rimanenti per generare su {len(platforms)} piattaforme. {msg_after}",
"upgrade_required": True,
})
# Get LLM settings
provider_name = request.provider or _get_setting(db, "llm_provider", current_user.id)
api_key = _get_setting(db, "llm_api_key", current_user.id)
model = request.model or _get_setting(db, "llm_model", current_user.id)
if not provider_name:
raise HTTPException(
status_code=400,
detail={
"message": "Provider AI non configurato. Vai in Impostazioni → Provider AI per scegliere il provider e inserire la API key.",
"missing_settings": True,
},
)
if not api_key:
raise HTTPException(
status_code=400,
detail={
"message": "API key non configurata. Vai in Impostazioni → Provider AI per inserire la tua API key.",
"missing_settings": True,
},
)
char_dict = {
"name": character.name,
"niche": character.niche,
"topics": character.topics or [],
"tone": character.tone or "professional",
}
base_url = _get_setting(db, "llm_base_url", current_user.id)
llm = get_llm_provider(provider_name, api_key, model, base_url=base_url)
# Preload affiliate links once
affiliate_link_dicts: list[dict] = []
if request.include_affiliates:
links = (
db.query(AffiliateLink)
.filter(
AffiliateLink.is_active == True,
AffiliateLink.user_id == current_user.id,
(AffiliateLink.character_id == character.id) | (AffiliateLink.character_id == None),
)
.all()
)
affiliate_link_dicts = [
{"url": link.url, "label": link.name, "keywords": link.topics or []}
for link in links
]
# Generate one post per platform, all sharing the same batch_id
batch_id = str(uuid.uuid4())
posts_created: list[Post] = []
for platform in platforms:
text = generate_post_text(
character=char_dict,
llm_provider=llm,
platform=platform,
topic_hint=request.topic_hint,
brief=request.brief,
)
hashtags = generate_hashtags(text, llm, platform)
affiliate_links_used: list[dict] = []
if affiliate_link_dicts:
text, affiliate_links_used = inject_affiliate_links(
text, affiliate_link_dicts, character.topics or []
)
post = Post(
batch_id=batch_id,
character_id=character.id,
user_id=current_user.id,
content_type=request.content_type,
text_content=text,
hashtags=hashtags,
affiliate_links_used=affiliate_links_used,
llm_provider=provider_name,
llm_model=model,
platform_hint=platform,
status="draft",
)
db.add(post)
posts_created.append(post)
# Increment monthly counter by number of posts generated
current_user.posts_generated_this_month = current_count + len(platforms)
db.commit()
for post in posts_created:
db.refresh(post)
return posts_created
@router.post("/generate-image", response_model=PostResponse)
def generate_image(
request: GenerateImageRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Generate an image for a character and attach to a post."""
character = (
db.query(Character)
.filter(Character.id == request.character_id, Character.user_id == current_user.id)
.first()
)
if not character:
raise HTTPException(status_code=404, detail="Character not found")
provider_name = request.provider or _get_setting(db, "image_provider", current_user.id)
api_key = _get_setting(db, "image_api_key", current_user.id)
if not provider_name:
raise HTTPException(status_code=400, detail="Image provider not configured. Set 'image_provider' in settings.")
if not api_key:
raise HTTPException(status_code=400, detail="Image API key not configured. Set 'image_api_key' in settings.")
prompt = request.prompt
if not prompt:
style_hint = request.style_hint or ""
visual_style = character.visual_style or {}
style_desc = visual_style.get("description", "")
prompt = (
f"Create a social media image for {character.name}, "
f"a content creator in the {character.niche} niche. "
f"Style: {style_desc} {style_hint}".strip()
)
image_provider = get_image_provider(provider_name, api_key)
image_url = image_provider.generate(prompt, size=request.size)
post = Post(
character_id=character.id,
user_id=current_user.id,
content_type="image",
image_url=image_url,
platform_hint="instagram",
status="draft",
)
db.add(post)
db.commit()
db.refresh(post)
return post
@router.get("/posts", response_model=list[PostResponse])
def list_posts(
character_id: int | None = Query(None),
status: str | None = Query(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all posts with optional filters."""
query = db.query(Post).filter(Post.user_id == current_user.id)
if character_id is not None:
query = query.filter(Post.character_id == character_id)
if status is not None:
query = query.filter(Post.status == status)
return query.order_by(Post.created_at.desc()).all()
@router.get("/posts/{post_id}", response_model=PostResponse)
def get_post(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get a single post by ID."""
post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
return post
@router.put("/posts/{post_id}", response_model=PostResponse)
def update_post(
post_id: int,
data: PostUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Update a post."""
post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
update_data = data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(post, key, value)
post.updated_at = datetime.utcnow()
db.commit()
db.refresh(post)
return post
@router.delete("/posts/{post_id}", status_code=204)
def delete_post(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Delete a post."""
post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
db.delete(post)
db.commit()
@router.post("/posts/{post_id}/approve", response_model=PostResponse)
def approve_post(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Approve a post (set status to 'approved')."""
post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
post.status = "approved"
post.updated_at = datetime.utcnow()
db.commit()
db.refresh(post)
return post