Files
leopost-full/backend/app/routers/content.py
Michele e77705d33b fix: SQLAlchemy JSON mutation detection for saved ideas and approved examples
SQLAlchemy doesn't detect in-place mutations on JSON columns (same object
reference). Fixed all JSON write operations to create new list/dict objects:
- save_idea: [new_idea] + old_ideas instead of ideas.insert()
- delete_idea: new filtered list
- mark_idea_used: new list with copied dicts
- approve_post: dict(examples) for content_rules

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 02:13:23 +02:00

566 lines
19 KiB
Python

"""Content generation router.
Handles post generation via LLM, image generation, and CRUD operations on posts.
"""
import uuid
from datetime import date, datetime
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from ..auth import get_current_user
from ..database import get_db
from ..models import AffiliateLink, Character, Post, SystemSetting, User
from ..plan_limits import check_limit
from ..schemas import (
GenerateContentRequest,
GenerateImageRequest,
PostResponse,
PostUpdate,
)
from ..services.content import generate_hashtags, generate_post_text, inject_affiliate_links
from ..services.images import get_image_provider
from ..services.llm import get_llm_provider
router = APIRouter(
prefix="/api/content",
tags=["content"],
)
def _get_setting(db: Session, key: str, user_id: int = None) -> str | None:
"""Retrieve a system setting value by key, preferring user-specific over global."""
if user_id is not None:
setting = (
db.query(SystemSetting)
.filter(SystemSetting.key == key, SystemSetting.user_id == user_id)
.first()
)
if setting is not None:
return setting.value
# Fallback to global (no user_id)
setting = db.query(SystemSetting).filter(SystemSetting.key == key, SystemSetting.user_id == None).first()
if setting is None:
return None
return setting.value
@router.post("/generate", response_model=list[PostResponse])
def generate_content(
request: GenerateContentRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Generate content for a character using LLM. One post per platform."""
character = (
db.query(Character)
.filter(Character.id == request.character_id, Character.user_id == current_user.id)
.first()
)
if not character:
raise HTTPException(status_code=404, detail="Character not found")
# Determine platforms to generate for
platforms = request.platforms if request.platforms else [request.platform]
# Check monthly post limit
first_of_month = date.today().replace(day=1)
if current_user.posts_reset_date != first_of_month:
current_user.posts_generated_this_month = 0
current_user.posts_reset_date = first_of_month
db.commit()
current_count = current_user.posts_generated_this_month or 0
allowed, msg = check_limit(current_user, "posts_per_month", current_count)
if not allowed:
raise HTTPException(status_code=403, detail={"message": msg, "upgrade_required": True})
# Also check if we have room for all platforms
allowed_after, msg_after = check_limit(current_user, "posts_per_month", current_count + len(platforms) - 1)
if not allowed_after:
raise HTTPException(status_code=403, detail={
"message": f"Non hai abbastanza post rimanenti per generare su {len(platforms)} piattaforme. {msg_after}",
"upgrade_required": True,
})
# Get LLM settings
provider_name = request.provider or _get_setting(db, "llm_provider", current_user.id)
api_key = _get_setting(db, "llm_api_key", current_user.id)
model = request.model or _get_setting(db, "llm_model", current_user.id)
if not provider_name:
raise HTTPException(
status_code=400,
detail={
"message": "Provider AI non configurato. Vai in Impostazioni → Provider AI per scegliere il provider e inserire la API key.",
"missing_settings": True,
},
)
if not api_key:
raise HTTPException(
status_code=400,
detail={
"message": "API key non configurata. Vai in Impostazioni → Provider AI per inserire la tua API key.",
"missing_settings": True,
},
)
char_dict = {
"name": character.name,
"niche": character.niche,
"topics": character.topics or [],
"tone": character.tone or "professional",
"brand_voice": character.brand_voice,
"target_audience": character.target_audience,
"business_goals": character.business_goals,
"products_services": character.products_services,
"content_rules": character.content_rules or {},
"hashtag_profiles": character.hashtag_profiles or {},
}
base_url = _get_setting(db, "llm_base_url", current_user.id)
llm = get_llm_provider(provider_name, api_key, model, base_url=base_url)
# Preload affiliate links once
affiliate_link_dicts: list[dict] = []
if request.include_affiliates:
links = (
db.query(AffiliateLink)
.filter(
AffiliateLink.is_active == True,
AffiliateLink.user_id == current_user.id,
(AffiliateLink.character_id == character.id) | (AffiliateLink.character_id == None),
)
.all()
)
affiliate_link_dicts = [
{"url": link.url, "label": link.name, "keywords": link.topics or []}
for link in links
]
# Generate one post per platform, all sharing the same batch_id
batch_id = str(uuid.uuid4())
posts_created: list[Post] = []
for platform in platforms:
text = generate_post_text(
character=char_dict,
llm_provider=llm,
platform=platform,
topic_hint=request.topic_hint,
brief=request.brief,
)
# Hashtag generation with profile support
ht_profile = char_dict.get("hashtag_profiles", {}).get(platform, {})
always_tags = ht_profile.get("always", [])
max_generated = ht_profile.get("max_generated", 12)
hashtags_generated = generate_hashtags(text, llm, platform, count=max_generated)
# Merge: always tags first, then generated (no duplicates)
seen = set()
hashtags = []
for tag in always_tags + hashtags_generated:
normalized = tag.lower()
if normalized not in seen:
seen.add(normalized)
hashtags.append(tag)
affiliate_links_used: list[dict] = []
if affiliate_link_dicts:
text, affiliate_links_used = inject_affiliate_links(
text, affiliate_link_dicts, character.topics or []
)
post = Post(
batch_id=batch_id,
character_id=character.id,
user_id=current_user.id,
content_type=request.content_type,
text_content=text,
hashtags=hashtags,
affiliate_links_used=affiliate_links_used,
llm_provider=provider_name,
llm_model=model,
platform_hint=platform,
status="draft",
)
db.add(post)
posts_created.append(post)
# Increment monthly counter by number of posts generated
current_user.posts_generated_this_month = current_count + len(platforms)
db.commit()
for post in posts_created:
db.refresh(post)
return posts_created
@router.post("/generate-image", response_model=PostResponse)
def generate_image(
request: GenerateImageRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Generate an image for a character and attach to a post."""
character = (
db.query(Character)
.filter(Character.id == request.character_id, Character.user_id == current_user.id)
.first()
)
if not character:
raise HTTPException(status_code=404, detail="Character not found")
provider_name = request.provider or _get_setting(db, "image_provider", current_user.id)
api_key = _get_setting(db, "image_api_key", current_user.id)
if not provider_name:
raise HTTPException(status_code=400, detail="Image provider not configured. Set 'image_provider' in settings.")
if not api_key:
raise HTTPException(status_code=400, detail="Image API key not configured. Set 'image_api_key' in settings.")
prompt = request.prompt
if not prompt:
style_hint = request.style_hint or ""
visual_style = character.visual_style or {}
style_desc = visual_style.get("description", "")
prompt = (
f"Create a social media image for {character.name}, "
f"a content creator in the {character.niche} niche. "
f"Style: {style_desc} {style_hint}".strip()
)
image_provider = get_image_provider(provider_name, api_key)
image_url = image_provider.generate(prompt, size=request.size)
post = Post(
character_id=character.id,
user_id=current_user.id,
content_type="image",
image_url=image_url,
platform_hint="instagram",
status="draft",
)
db.add(post)
db.commit()
db.refresh(post)
return post
@router.get("/posts", response_model=list[PostResponse])
def list_posts(
character_id: int | None = Query(None),
status: str | None = Query(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all posts with optional filters."""
query = db.query(Post).filter(Post.user_id == current_user.id)
if character_id is not None:
query = query.filter(Post.character_id == character_id)
if status is not None:
query = query.filter(Post.status == status)
return query.order_by(Post.created_at.desc()).all()
@router.get("/posts/{post_id}", response_model=PostResponse)
def get_post(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get a single post by ID."""
post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
return post
@router.put("/posts/{post_id}", response_model=PostResponse)
def update_post(
post_id: int,
data: PostUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Update a post."""
post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
update_data = data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(post, key, value)
post.updated_at = datetime.utcnow()
db.commit()
db.refresh(post)
return post
@router.delete("/posts/{post_id}", status_code=204)
def delete_post(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Delete a post."""
post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
db.delete(post)
db.commit()
@router.post("/posts/{post_id}/approve", response_model=PostResponse)
def approve_post(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Approve a post (set status to 'approved'). Also saves it as a reference example for the character."""
post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
post.status = "approved"
post.updated_at = datetime.utcnow()
# Save as reference example for future few-shot learning (keep last 5 per character)
if post.text_content and post.character_id:
character = db.query(Character).filter(Character.id == post.character_id).first()
if character:
examples = character.content_rules or {}
approved_examples = examples.get("approved_examples", [])
approved_examples.append({
"platform": post.platform_hint or "general",
"text": post.text_content[:500], # truncate for prompt efficiency
})
# Keep only last 5
examples["approved_examples"] = approved_examples[-5:]
character.content_rules = dict(examples) # new dict for SQLAlchemy change detection
character.updated_at = datetime.utcnow()
db.commit()
db.refresh(post)
return post
def _generate_suggestions(db: Session, current_user: User, character) -> list[str]:
"""Internal: call LLM to generate topic suggestions."""
provider_name = _get_setting(db, "llm_provider", current_user.id)
api_key = _get_setting(db, "llm_api_key", current_user.id)
model = _get_setting(db, "llm_model", current_user.id)
if not provider_name or not api_key:
return []
recent_posts = (
db.query(Post)
.filter(Post.character_id == character.id)
.order_by(Post.created_at.desc())
.limit(5)
.all()
)
recent_topics = [p.text_content[:100] for p in recent_posts if p.text_content]
recent_str = "\n".join(f"- {t}" for t in recent_topics) if recent_topics else "Nessun post recente."
base_url = _get_setting(db, "llm_base_url", current_user.id)
llm = get_llm_provider(provider_name, api_key, model, base_url=base_url)
topics = character.topics or []
niche = character.niche or "general"
target = character.target_audience or ""
system_prompt = (
"Sei un social media strategist esperto. "
"Suggerisci 3 idee per post social, ciascuna su una riga. "
"Ogni idea deve essere una frase breve (max 15 parole) che descrive il topic. "
"Non numerare, non aggiungere spiegazioni. Solo 3 righe, una per idea."
)
prompt = (
f"Personaggio: {character.name}, nicchia: {niche}\n"
f"Topic abituali: {', '.join(topics) if topics else 'generici'}\n"
f"Target: {target}\n"
f"Post recenti (evita ripetizioni):\n{recent_str}\n\n"
f"Suggerisci 3 idee per post nuovi e diversi dai recenti:"
)
try:
result = llm.generate(prompt, system=system_prompt)
lines = [line.strip() for line in result.strip().splitlines() if line.strip()]
return lines[:3]
except Exception:
return []
@router.get("/suggestions")
def get_topic_suggestions(
force: bool = Query(False),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get cached daily suggestions or generate new ones. Use force=true to regenerate."""
character = (
db.query(Character)
.filter(Character.user_id == current_user.id, Character.is_active == True)
.first()
)
if not character:
return {"suggestions": [], "character_id": None}
provider_name = _get_setting(db, "llm_provider", current_user.id)
api_key = _get_setting(db, "llm_api_key", current_user.id)
if not provider_name or not api_key:
return {"suggestions": [], "character_id": character.id, "needs_setup": True}
today = date.today().isoformat()
# Check cache
if not force:
cached = _get_setting(db, "daily_suggestions", current_user.id)
if cached and isinstance(cached, dict) and cached.get("date") == today:
return {
"suggestions": cached.get("suggestions", []),
"character_id": cached.get("character_id", character.id),
"character_name": cached.get("character_name", character.name),
"cached": True,
}
# Generate new suggestions
suggestions = _generate_suggestions(db, current_user, character)
# Save to cache
cache_data = {
"date": today,
"suggestions": suggestions,
"character_id": character.id,
"character_name": character.name,
}
existing = (
db.query(SystemSetting)
.filter(SystemSetting.key == "daily_suggestions", SystemSetting.user_id == current_user.id)
.first()
)
if existing:
existing.value = cache_data
existing.updated_at = datetime.utcnow()
else:
db.add(SystemSetting(key="daily_suggestions", value=cache_data, user_id=current_user.id))
db.commit()
return {
"suggestions": suggestions,
"character_id": character.id,
"character_name": character.name,
"cached": False,
}
# === Saved Ideas (Swipe File) ===
@router.get("/ideas")
def get_saved_ideas(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get all saved ideas for the user."""
setting = (
db.query(SystemSetting)
.filter(SystemSetting.key == "saved_ideas", SystemSetting.user_id == current_user.id)
.first()
)
ideas = setting.value if setting and isinstance(setting.value, list) else []
return {"ideas": ideas, "total": len(ideas)}
@router.post("/ideas")
def save_idea(
data: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Save an idea for later use."""
text = data.get("text", "").strip()
if not text:
raise HTTPException(status_code=400, detail="Text is required")
setting = (
db.query(SystemSetting)
.filter(SystemSetting.key == "saved_ideas", SystemSetting.user_id == current_user.id)
.first()
)
old_ideas = list(setting.value) if setting and isinstance(setting.value, list) else []
new_idea = {
"id": str(uuid.uuid4())[:8],
"text": text,
"note": data.get("note", ""),
"saved_at": datetime.utcnow().isoformat() + "Z",
"used": False,
}
new_ideas = [new_idea] + old_ideas # new list — forces SQLAlchemy to detect change
if setting:
setting.value = new_ideas
setting.updated_at = datetime.utcnow()
else:
db.add(SystemSetting(key="saved_ideas", value=new_ideas, user_id=current_user.id))
db.commit()
return new_idea
@router.delete("/ideas/{idea_id}")
def delete_idea(
idea_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Delete a saved idea."""
setting = (
db.query(SystemSetting)
.filter(SystemSetting.key == "saved_ideas", SystemSetting.user_id == current_user.id)
.first()
)
if not setting or not isinstance(setting.value, list):
raise HTTPException(status_code=404, detail="Idea not found")
new_ideas = [i for i in setting.value if i.get("id") != idea_id]
if len(new_ideas) == len(setting.value):
raise HTTPException(status_code=404, detail="Idea not found")
setting.value = new_ideas # new list object for SQLAlchemy change detection
setting.updated_at = datetime.utcnow()
db.commit()
return {"ok": True}
@router.post("/ideas/{idea_id}/mark-used")
def mark_idea_used(
idea_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Mark an idea as used."""
setting = (
db.query(SystemSetting)
.filter(SystemSetting.key == "saved_ideas", SystemSetting.user_id == current_user.id)
.first()
)
if not setting or not isinstance(setting.value, list):
raise HTTPException(status_code=404, detail="Idea not found")
updated = []
found = False
for idea in setting.value:
copy = dict(idea)
if copy.get("id") == idea_id:
copy["used"] = True
found = True
updated.append(copy)
if not found:
raise HTTPException(status_code=404, detail="Idea not found")
setting.value = updated # new list for SQLAlchemy change detection
setting.updated_at = datetime.utcnow()
db.commit()
return {"ok": True}