"""Content generation router. Handles post generation via LLM, image generation, and CRUD operations on posts. """ import uuid from datetime import date, datetime from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.orm import Session from ..auth import get_current_user from ..database import get_db from ..models import AffiliateLink, Character, Post, SystemSetting, User from ..plan_limits import check_limit from ..schemas import ( GenerateContentRequest, GenerateImageRequest, PostResponse, PostUpdate, ) from ..services.content import generate_hashtags, generate_post_text, inject_affiliate_links from ..services.images import get_image_provider from ..services.llm import get_llm_provider router = APIRouter( prefix="/api/content", tags=["content"], ) def _get_setting(db: Session, key: str, user_id: int = None) -> str | None: """Retrieve a system setting value by key, preferring user-specific over global.""" if user_id is not None: setting = ( db.query(SystemSetting) .filter(SystemSetting.key == key, SystemSetting.user_id == user_id) .first() ) if setting is not None: return setting.value # Fallback to global (no user_id) setting = db.query(SystemSetting).filter(SystemSetting.key == key, SystemSetting.user_id == None).first() if setting is None: return None return setting.value @router.post("/generate", response_model=list[PostResponse]) def generate_content( request: GenerateContentRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Generate content for a character using LLM. One post per platform.""" character = ( db.query(Character) .filter(Character.id == request.character_id, Character.user_id == current_user.id) .first() ) if not character: raise HTTPException(status_code=404, detail="Character not found") # Determine platforms to generate for platforms = request.platforms if request.platforms else [request.platform] # Check monthly post limit first_of_month = date.today().replace(day=1) if current_user.posts_reset_date != first_of_month: current_user.posts_generated_this_month = 0 current_user.posts_reset_date = first_of_month db.commit() current_count = current_user.posts_generated_this_month or 0 allowed, msg = check_limit(current_user, "posts_per_month", current_count) if not allowed: raise HTTPException(status_code=403, detail={"message": msg, "upgrade_required": True}) # Also check if we have room for all platforms allowed_after, msg_after = check_limit(current_user, "posts_per_month", current_count + len(platforms) - 1) if not allowed_after: raise HTTPException(status_code=403, detail={ "message": f"Non hai abbastanza post rimanenti per generare su {len(platforms)} piattaforme. {msg_after}", "upgrade_required": True, }) # Get LLM settings provider_name = request.provider or _get_setting(db, "llm_provider", current_user.id) api_key = _get_setting(db, "llm_api_key", current_user.id) model = request.model or _get_setting(db, "llm_model", current_user.id) if not provider_name: raise HTTPException( status_code=400, detail={ "message": "Provider AI non configurato. Vai in Impostazioni → Provider AI per scegliere il provider e inserire la API key.", "missing_settings": True, }, ) if not api_key: raise HTTPException( status_code=400, detail={ "message": "API key non configurata. Vai in Impostazioni → Provider AI per inserire la tua API key.", "missing_settings": True, }, ) char_dict = { "name": character.name, "niche": character.niche, "topics": character.topics or [], "tone": character.tone or "professional", "brand_voice": character.brand_voice, "target_audience": character.target_audience, "business_goals": character.business_goals, "products_services": character.products_services, "content_rules": character.content_rules or {}, "hashtag_profiles": character.hashtag_profiles or {}, } base_url = _get_setting(db, "llm_base_url", current_user.id) llm = get_llm_provider(provider_name, api_key, model, base_url=base_url) # Preload affiliate links once affiliate_link_dicts: list[dict] = [] if request.include_affiliates: links = ( db.query(AffiliateLink) .filter( AffiliateLink.is_active == True, AffiliateLink.user_id == current_user.id, (AffiliateLink.character_id == character.id) | (AffiliateLink.character_id == None), ) .all() ) affiliate_link_dicts = [ {"url": link.url, "label": link.name, "keywords": link.topics or []} for link in links ] # Generate one post per platform, all sharing the same batch_id batch_id = str(uuid.uuid4()) posts_created: list[Post] = [] for platform in platforms: text = generate_post_text( character=char_dict, llm_provider=llm, platform=platform, topic_hint=request.topic_hint, brief=request.brief, ) # Hashtag generation with profile support ht_profile = char_dict.get("hashtag_profiles", {}).get(platform, {}) always_tags = ht_profile.get("always", []) max_generated = ht_profile.get("max_generated", 12) hashtags_generated = generate_hashtags(text, llm, platform, count=max_generated) # Merge: always tags first, then generated (no duplicates) seen = set() hashtags = [] for tag in always_tags + hashtags_generated: normalized = tag.lower() if normalized not in seen: seen.add(normalized) hashtags.append(tag) affiliate_links_used: list[dict] = [] if affiliate_link_dicts: text, affiliate_links_used = inject_affiliate_links( text, affiliate_link_dicts, character.topics or [] ) post = Post( batch_id=batch_id, character_id=character.id, user_id=current_user.id, content_type=request.content_type, text_content=text, hashtags=hashtags, affiliate_links_used=affiliate_links_used, llm_provider=provider_name, llm_model=model, platform_hint=platform, status="draft", ) db.add(post) posts_created.append(post) # Increment monthly counter by number of posts generated current_user.posts_generated_this_month = current_count + len(platforms) db.commit() for post in posts_created: db.refresh(post) return posts_created @router.post("/generate-image", response_model=PostResponse) def generate_image( request: GenerateImageRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Generate an image for a character and attach to a post.""" character = ( db.query(Character) .filter(Character.id == request.character_id, Character.user_id == current_user.id) .first() ) if not character: raise HTTPException(status_code=404, detail="Character not found") provider_name = request.provider or _get_setting(db, "image_provider", current_user.id) api_key = _get_setting(db, "image_api_key", current_user.id) if not provider_name: raise HTTPException(status_code=400, detail="Image provider not configured. Set 'image_provider' in settings.") if not api_key: raise HTTPException(status_code=400, detail="Image API key not configured. Set 'image_api_key' in settings.") prompt = request.prompt if not prompt: style_hint = request.style_hint or "" visual_style = character.visual_style or {} style_desc = visual_style.get("description", "") prompt = ( f"Create a social media image for {character.name}, " f"a content creator in the {character.niche} niche. " f"Style: {style_desc} {style_hint}".strip() ) image_provider = get_image_provider(provider_name, api_key) image_url = image_provider.generate(prompt, size=request.size) post = Post( character_id=character.id, user_id=current_user.id, content_type="image", image_url=image_url, platform_hint="instagram", status="draft", ) db.add(post) db.commit() db.refresh(post) return post @router.get("/posts", response_model=list[PostResponse]) def list_posts( character_id: int | None = Query(None), status: str | None = Query(None), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """List all posts with optional filters.""" query = db.query(Post).filter(Post.user_id == current_user.id) if character_id is not None: query = query.filter(Post.character_id == character_id) if status is not None: query = query.filter(Post.status == status) return query.order_by(Post.created_at.desc()).all() @router.get("/posts/{post_id}", response_model=PostResponse) def get_post( post_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get a single post by ID.""" post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first() if not post: raise HTTPException(status_code=404, detail="Post not found") return post @router.put("/posts/{post_id}", response_model=PostResponse) def update_post( post_id: int, data: PostUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Update a post.""" post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first() if not post: raise HTTPException(status_code=404, detail="Post not found") update_data = data.model_dump(exclude_unset=True) for key, value in update_data.items(): setattr(post, key, value) post.updated_at = datetime.utcnow() db.commit() db.refresh(post) return post @router.delete("/posts/{post_id}", status_code=204) def delete_post( post_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Delete a post.""" post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first() if not post: raise HTTPException(status_code=404, detail="Post not found") db.delete(post) db.commit() @router.post("/posts/{post_id}/approve", response_model=PostResponse) def approve_post( post_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Approve a post (set status to 'approved').""" post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first() if not post: raise HTTPException(status_code=404, detail="Post not found") post.status = "approved" post.updated_at = datetime.utcnow() db.commit() db.refresh(post) return post