Files
leopost-full/backend/app/routers/content.py
Michele 16c7c4404c feat: Phase B learning + hashtag profiles Pro-only lock
- Approve action saves post as reference example in character's content_rules
- Keep last 5 approved examples per character (auto-rotating)
- Inject last 3 approved examples as few-shot in LLM system prompt
- Lock YouTube/TikTok hashtag profile tabs for Freemium users (Pro only)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 19:48:04 +02:00

343 lines
12 KiB
Python

"""Content generation router.
Handles post generation via LLM, image generation, and CRUD operations on posts.
"""
import uuid
from datetime import date, datetime
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from ..auth import get_current_user
from ..database import get_db
from ..models import AffiliateLink, Character, Post, SystemSetting, User
from ..plan_limits import check_limit
from ..schemas import (
GenerateContentRequest,
GenerateImageRequest,
PostResponse,
PostUpdate,
)
from ..services.content import generate_hashtags, generate_post_text, inject_affiliate_links
from ..services.images import get_image_provider
from ..services.llm import get_llm_provider
router = APIRouter(
prefix="/api/content",
tags=["content"],
)
def _get_setting(db: Session, key: str, user_id: int = None) -> str | None:
"""Retrieve a system setting value by key, preferring user-specific over global."""
if user_id is not None:
setting = (
db.query(SystemSetting)
.filter(SystemSetting.key == key, SystemSetting.user_id == user_id)
.first()
)
if setting is not None:
return setting.value
# Fallback to global (no user_id)
setting = db.query(SystemSetting).filter(SystemSetting.key == key, SystemSetting.user_id == None).first()
if setting is None:
return None
return setting.value
@router.post("/generate", response_model=list[PostResponse])
def generate_content(
request: GenerateContentRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Generate content for a character using LLM. One post per platform."""
character = (
db.query(Character)
.filter(Character.id == request.character_id, Character.user_id == current_user.id)
.first()
)
if not character:
raise HTTPException(status_code=404, detail="Character not found")
# Determine platforms to generate for
platforms = request.platforms if request.platforms else [request.platform]
# Check monthly post limit
first_of_month = date.today().replace(day=1)
if current_user.posts_reset_date != first_of_month:
current_user.posts_generated_this_month = 0
current_user.posts_reset_date = first_of_month
db.commit()
current_count = current_user.posts_generated_this_month or 0
allowed, msg = check_limit(current_user, "posts_per_month", current_count)
if not allowed:
raise HTTPException(status_code=403, detail={"message": msg, "upgrade_required": True})
# Also check if we have room for all platforms
allowed_after, msg_after = check_limit(current_user, "posts_per_month", current_count + len(platforms) - 1)
if not allowed_after:
raise HTTPException(status_code=403, detail={
"message": f"Non hai abbastanza post rimanenti per generare su {len(platforms)} piattaforme. {msg_after}",
"upgrade_required": True,
})
# Get LLM settings
provider_name = request.provider or _get_setting(db, "llm_provider", current_user.id)
api_key = _get_setting(db, "llm_api_key", current_user.id)
model = request.model or _get_setting(db, "llm_model", current_user.id)
if not provider_name:
raise HTTPException(
status_code=400,
detail={
"message": "Provider AI non configurato. Vai in Impostazioni → Provider AI per scegliere il provider e inserire la API key.",
"missing_settings": True,
},
)
if not api_key:
raise HTTPException(
status_code=400,
detail={
"message": "API key non configurata. Vai in Impostazioni → Provider AI per inserire la tua API key.",
"missing_settings": True,
},
)
char_dict = {
"name": character.name,
"niche": character.niche,
"topics": character.topics or [],
"tone": character.tone or "professional",
"brand_voice": character.brand_voice,
"target_audience": character.target_audience,
"business_goals": character.business_goals,
"products_services": character.products_services,
"content_rules": character.content_rules or {},
"hashtag_profiles": character.hashtag_profiles or {},
}
base_url = _get_setting(db, "llm_base_url", current_user.id)
llm = get_llm_provider(provider_name, api_key, model, base_url=base_url)
# Preload affiliate links once
affiliate_link_dicts: list[dict] = []
if request.include_affiliates:
links = (
db.query(AffiliateLink)
.filter(
AffiliateLink.is_active == True,
AffiliateLink.user_id == current_user.id,
(AffiliateLink.character_id == character.id) | (AffiliateLink.character_id == None),
)
.all()
)
affiliate_link_dicts = [
{"url": link.url, "label": link.name, "keywords": link.topics or []}
for link in links
]
# Generate one post per platform, all sharing the same batch_id
batch_id = str(uuid.uuid4())
posts_created: list[Post] = []
for platform in platforms:
text = generate_post_text(
character=char_dict,
llm_provider=llm,
platform=platform,
topic_hint=request.topic_hint,
brief=request.brief,
)
# Hashtag generation with profile support
ht_profile = char_dict.get("hashtag_profiles", {}).get(platform, {})
always_tags = ht_profile.get("always", [])
max_generated = ht_profile.get("max_generated", 12)
hashtags_generated = generate_hashtags(text, llm, platform, count=max_generated)
# Merge: always tags first, then generated (no duplicates)
seen = set()
hashtags = []
for tag in always_tags + hashtags_generated:
normalized = tag.lower()
if normalized not in seen:
seen.add(normalized)
hashtags.append(tag)
affiliate_links_used: list[dict] = []
if affiliate_link_dicts:
text, affiliate_links_used = inject_affiliate_links(
text, affiliate_link_dicts, character.topics or []
)
post = Post(
batch_id=batch_id,
character_id=character.id,
user_id=current_user.id,
content_type=request.content_type,
text_content=text,
hashtags=hashtags,
affiliate_links_used=affiliate_links_used,
llm_provider=provider_name,
llm_model=model,
platform_hint=platform,
status="draft",
)
db.add(post)
posts_created.append(post)
# Increment monthly counter by number of posts generated
current_user.posts_generated_this_month = current_count + len(platforms)
db.commit()
for post in posts_created:
db.refresh(post)
return posts_created
@router.post("/generate-image", response_model=PostResponse)
def generate_image(
request: GenerateImageRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Generate an image for a character and attach to a post."""
character = (
db.query(Character)
.filter(Character.id == request.character_id, Character.user_id == current_user.id)
.first()
)
if not character:
raise HTTPException(status_code=404, detail="Character not found")
provider_name = request.provider or _get_setting(db, "image_provider", current_user.id)
api_key = _get_setting(db, "image_api_key", current_user.id)
if not provider_name:
raise HTTPException(status_code=400, detail="Image provider not configured. Set 'image_provider' in settings.")
if not api_key:
raise HTTPException(status_code=400, detail="Image API key not configured. Set 'image_api_key' in settings.")
prompt = request.prompt
if not prompt:
style_hint = request.style_hint or ""
visual_style = character.visual_style or {}
style_desc = visual_style.get("description", "")
prompt = (
f"Create a social media image for {character.name}, "
f"a content creator in the {character.niche} niche. "
f"Style: {style_desc} {style_hint}".strip()
)
image_provider = get_image_provider(provider_name, api_key)
image_url = image_provider.generate(prompt, size=request.size)
post = Post(
character_id=character.id,
user_id=current_user.id,
content_type="image",
image_url=image_url,
platform_hint="instagram",
status="draft",
)
db.add(post)
db.commit()
db.refresh(post)
return post
@router.get("/posts", response_model=list[PostResponse])
def list_posts(
character_id: int | None = Query(None),
status: str | None = Query(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all posts with optional filters."""
query = db.query(Post).filter(Post.user_id == current_user.id)
if character_id is not None:
query = query.filter(Post.character_id == character_id)
if status is not None:
query = query.filter(Post.status == status)
return query.order_by(Post.created_at.desc()).all()
@router.get("/posts/{post_id}", response_model=PostResponse)
def get_post(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get a single post by ID."""
post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
return post
@router.put("/posts/{post_id}", response_model=PostResponse)
def update_post(
post_id: int,
data: PostUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Update a post."""
post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
update_data = data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(post, key, value)
post.updated_at = datetime.utcnow()
db.commit()
db.refresh(post)
return post
@router.delete("/posts/{post_id}", status_code=204)
def delete_post(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Delete a post."""
post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
db.delete(post)
db.commit()
@router.post("/posts/{post_id}/approve", response_model=PostResponse)
def approve_post(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Approve a post (set status to 'approved'). Also saves it as a reference example for the character."""
post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
post.status = "approved"
post.updated_at = datetime.utcnow()
# Save as reference example for future few-shot learning (keep last 5 per character)
if post.text_content and post.character_id:
character = db.query(Character).filter(Character.id == post.character_id).first()
if character:
examples = character.content_rules or {}
approved_examples = examples.get("approved_examples", [])
approved_examples.append({
"platform": post.platform_hint or "general",
"text": post.text_content[:500], # truncate for prompt efficiency
})
# Keep only last 5
examples["approved_examples"] = approved_examples[-5:]
character.content_rules = examples
character.updated_at = datetime.utcnow()
db.commit()
db.refresh(post)
return post