"""GenerationPipeline — orchestra calendario -> LLM -> CSV con per-item error isolation. Gestisce: - Generazione async in background (asyncio.create_task) - Progresso real-time tracciato in _jobs dict (completed/total/current_post) - Per-item error isolation: un fallimento NON blocca il batch (Pitfall 5) - Persistenza su disco per ricaricamento dopo restart - Job status per polling dal frontend """ from __future__ import annotations import asyncio import json import logging import uuid from dataclasses import dataclass, field from pathlib import Path from typing import Literal, Optional from backend.config import DATA_PATH from backend.schemas.calendar import CalendarRequest, CalendarResponse, CalendarSlot from backend.schemas.generate import ( GenerateResponse, GeneratedPost, PostResult, TopicResult, ) from backend.services.calendar_service import CalendarService from backend.services.csv_builder import CSVBuilder from backend.services.format_selector import FormatSelector from backend.services.llm_service import LLMService from backend.services.prompt_service import PromptService from backend.services.unsplash_service import UnsplashService logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Mappa formato_narrativo -> nome file prompt # --------------------------------------------------------------------------- _FORMAT_TO_PROMPT: dict[str, str] = { "PAS": "pas_valore", "AIDA": "aida_promozione", "BAB": "bab_storytelling", "Listicle": "listicle_valore", "Storytelling": "bab_storytelling", # fallback su BAB per storytelling "Dato_Implicazione": "dato_news", "Obiezione_Risposta": "pas_valore", # fallback su PAS per obiezione/risposta } _DEFAULT_PROMPT = "pas_valore" # --------------------------------------------------------------------------- # Dataclass per tracciare lo stato di un job # --------------------------------------------------------------------------- @dataclass class JobStatus: """Stato real-time di un job di generazione batch.""" job_id: str status: Literal["running", "completed", "failed"] total: int completed: int current_post: int results: list[PostResult] = field(default_factory=list) calendar: Optional[CalendarResponse] = None error: Optional[str] = None campagna: str = "" image_url_map: Optional[dict[str, str]] = None """Mappa keyword->URL Unsplash risolta dopo la generazione batch. None se Unsplash non configurato.""" # --------------------------------------------------------------------------- # Pipeline principale # --------------------------------------------------------------------------- class GenerationPipeline: """Orchestra il flusso completo: calendario -> topic -> LLM -> CSV.""" def __init__( self, llm_service: LLMService, prompt_service: PromptService, calendar_service: CalendarService, format_selector: FormatSelector, csv_builder: CSVBuilder, outputs_path: Path, ) -> None: """Inizializza la pipeline con i servizi necessari. Args: llm_service: Servizio per chiamate LLM con retry. prompt_service: Servizio per caricare/compilare prompt .txt. calendar_service: Servizio per generare il calendario editoriale. format_selector: Selettore formato narrativo per slot. csv_builder: Builder per CSV Canva-compatibile. outputs_path: Directory dove salvare i file CSV e JSON dei job. """ self._llm = llm_service self._prompts = prompt_service self._calendar = calendar_service self._formats = format_selector self._csv = csv_builder self._outputs_path = outputs_path # Dizionario in-memory per lo stato real-time dei job self._jobs: dict[str, JobStatus] = {} # --------------------------------------------------------------------------- # API pubblica # --------------------------------------------------------------------------- def generate_bulk_async( self, request: CalendarRequest, ) -> str: """Avvia la generazione batch come background task e ritorna subito. Questo metodo è SINCRONO dal punto di vista del caller: genera il job_id, inizializza lo stato, lancia il background task, e ritorna immediatamente. Il frontend può fare polling su get_job_status() per seguire il progresso. Args: request: Parametri del calendario (obiettivo, nicchie, etc.). Returns: job_id (UUID stringa) da usare per polling e download CSV. """ job_id = str(uuid.uuid4()) # Genera il calendario in modo sincrono (è CPU-bound, veloce) calendar = self._calendar.generate_calendar(request) # Inizializza stato job self._jobs[job_id] = JobStatus( job_id=job_id, status="running", total=len(calendar.slots), completed=0, current_post=0, results=[], calendar=calendar, campagna=request.obiettivo_campagna, ) logger.info( "Job avviato | job_id=%s | slot_totali=%d | campagna=%s", job_id, len(calendar.slots), request.obiettivo_campagna[:50], ) # Lancia il background task (non blocca) asyncio.create_task( self._run_generation(job_id, calendar, request) ) return job_id def get_job_status(self, job_id: str) -> Optional[JobStatus]: """Ritorna lo stato corrente del job per polling. Se il job non è in memoria, prova a caricarlo dal file JSON su disco (per supportare restart del server). Args: job_id: Identificatore del job. Returns: JobStatus aggiornato, o None se il job non esiste. """ if job_id in self._jobs: return self._jobs[job_id] # Prova a caricare da disco return self._load_job_from_disk(job_id) def get_job_results(self, job_id: str) -> Optional[GenerateResponse]: """Ritorna i risultati completi di un job completato. Args: job_id: Identificatore del job. Returns: GenerateResponse con tutti i PostResult, o None se non trovato. """ status = self.get_job_status(job_id) if status is None: return None success_count = sum(1 for r in status.results if r.status == "success") failed_count = sum(1 for r in status.results if r.status == "failed") return GenerateResponse( campagna=status.campagna, results=status.results, total=status.total, success_count=success_count, failed_count=failed_count, calendar=status.calendar, ) def generate_single( self, slot: CalendarSlot, obiettivo_campagna: str, brand_name: Optional[str] = None, tono: Optional[str] = None, ) -> PostResult: """Genera un singolo post per uno slot. Utile per rigenerare post falliti o per test. Args: slot: Slot del calendario con metadati strategici. obiettivo_campagna: Obiettivo principale della campagna. brand_name: Nome del brand (opzionale). tono: Tono di voce (opzionale). Returns: PostResult con status="success" o "failed". """ system_prompt = self._prompts.load_prompt("system_prompt") try: # Genera topic se mancante topic = slot.topic if not topic: topic = self._llm.generate_topic( system_prompt=system_prompt, obiettivo_campagna=obiettivo_campagna, tipo_contenuto=slot.tipo_contenuto, target_nicchia=slot.target_nicchia, fase_campagna=slot.fase_campagna, livello_schwartz=slot.livello_schwartz, prompt_service=self._prompts, ) # Genera il post post = self._generate_post_for_slot( slot=slot, topic=topic, obiettivo_campagna=obiettivo_campagna, brand_name=brand_name or "il tuo brand", tono=tono or "diretto e concreto", system_prompt=system_prompt, ) return PostResult( slot_index=slot.indice, status="success", post=post, ) except Exception as e: logger.error( "Errore generazione singola | slot_index=%d | errore=%s", slot.indice, str(e), ) return PostResult( slot_index=slot.indice, status="failed", error=str(e), ) # --------------------------------------------------------------------------- # Background task # --------------------------------------------------------------------------- async def _run_generation( self, job_id: str, calendar: CalendarResponse, request: CalendarRequest, ) -> None: """Task asincrono che genera tutti i post del batch. Esegue il loop su ogni slot del calendario con per-item error isolation: ogni slot è in un try/except INDIVIDUALE — un fallimento NON blocca il loop. Args: job_id: Identificatore del job. calendar: Calendario con gli slot da generare. request: Parametri della richiesta originale. """ job = self._jobs[job_id] system_prompt = self._prompts.load_prompt("system_prompt") brand_name = "il tuo brand" # Verrà preso da settings in fase successiva tono = "diretto e concreto" try: for slot in calendar.slots: # Aggiorna progresso real-time job.current_post = slot.indice # CRITICO Pitfall 5: try/except INDIVIDUALE per slot # Un fallimento NON blocca il batch try: logger.info( "Generazione slot | job_id=%s | slot=%d/%d | tipo=%s | nicchia=%s", job_id, slot.indice + 1, job.total, slot.tipo_contenuto, slot.target_nicchia, ) # Genera topic se non presente nello slot. # Controlla prima gli override passati dall'utente dallo Swipe File. topic = slot.topic if not topic and request.topic_overrides and slot.indice in request.topic_overrides: topic = request.topic_overrides[slot.indice] logger.info( "Topic override applicato | job_id=%s | slot=%d | topic=%s", job_id, slot.indice, topic[:60], ) if not topic: # Usa asyncio.to_thread per non bloccare l'event loop # (generate_topic usa time.sleep per inter_request_delay) topic = await asyncio.to_thread( self._llm.generate_topic, system_prompt, request.obiettivo_campagna, slot.tipo_contenuto, slot.target_nicchia, slot.fase_campagna, slot.livello_schwartz, self._prompts, ) # Genera il post completo post = await asyncio.to_thread( self._generate_post_for_slot, slot, topic, request.obiettivo_campagna, brand_name, tono, system_prompt, ) post_result = PostResult( slot_index=slot.indice, status="success", post=post, ) logger.info( "Slot completato | job_id=%s | slot=%d | status=success", job_id, slot.indice, ) except Exception as e: # Fallimento isolato — logga e continua con il prossimo slot logger.error( "Errore slot | job_id=%s | slot=%d | errore=%s", job_id, slot.indice, str(e), ) post_result = PostResult( slot_index=slot.indice, status="failed", error=str(e), ) # Aggiorna progresso (sia per success che per failed) job.results.append(post_result) job.completed += 1 # Risolvi keyword immagine via Unsplash (se API key configurata) image_url_map: Optional[dict[str, str]] = None try: image_url_map = await self._resolve_unsplash_keywords(job) job.image_url_map = image_url_map except Exception as e: logger.warning( "Unsplash resolution fallita | job_id=%s | errore=%s — continuo con keyword testuali", job_id, str(e), ) # Genera CSV con i risultati success_results = [r for r in job.results if r.status == "success"] if success_results: self._outputs_path.mkdir(parents=True, exist_ok=True) self._csv.build_csv( posts=job.results, calendar=calendar, job_id=job_id, output_dir=self._outputs_path, image_url_map=image_url_map, ) logger.info( "CSV generato | job_id=%s | success=%d/%d | url_unsplash=%d", job_id, len(success_results), job.total, len(image_url_map) if image_url_map else 0, ) # Salva metadata job su disco per persistenza self._save_job_to_disk(job) # Aggiorna stato finale job.status = "completed" logger.info( "Job completato | job_id=%s | success=%d | failed=%d", job_id, sum(1 for r in job.results if r.status == "success"), sum(1 for r in job.results if r.status == "failed"), ) except Exception as e: # Errore catastrofico (non isolabile a un singolo slot) job.status = "failed" job.error = str(e) logger.error( "Job fallito | job_id=%s | errore=%s", job_id, str(e), exc_info=True, ) # --------------------------------------------------------------------------- # Generazione singolo post # --------------------------------------------------------------------------- def _generate_post_for_slot( self, slot: CalendarSlot, topic: str, obiettivo_campagna: str, brand_name: str, tono: str, system_prompt: str, ) -> GeneratedPost: """Genera il contenuto completo di un post dato lo slot e il topic. Seleziona il prompt template corretto in base al formato_narrativo, compila il prompt con le variabili, chiama l'LLM. Args: slot: Slot con metadati strategici. topic: Topic specifico del post (già generato). obiettivo_campagna: Obiettivo della campagna. brand_name: Nome del brand. tono: Tono di voce. system_prompt: System prompt caricato. Returns: GeneratedPost validato con Pydantic. """ prompt_name = self._select_prompt_template( slot.formato_narrativo, slot.tipo_contenuto ) # Verifica che il prompt template esista if not self._prompts.prompt_exists(prompt_name): logger.warning( "Prompt '%s' non trovato, uso fallback '%s'", prompt_name, _DEFAULT_PROMPT, ) prompt_name = _DEFAULT_PROMPT # Prepara le variabili comuni a tutti i prompt variables: dict[str, str] = { "obiettivo_campagna": obiettivo_campagna, "topic": topic, "target_nicchia": slot.target_nicchia, "livello_schwartz": slot.livello_schwartz, "brand_name": brand_name, } # Aggiungi variabili specifiche per alcuni prompt # aida_promozione ha {{call_to_action}} come variabile aggiuntiva if "call_to_action" in self._prompts.get_required_variables(prompt_name): variables["call_to_action"] = f"Scopri di più su {brand_name}" user_prompt = self._prompts.compile_prompt(prompt_name, variables) post = self._llm.generate( system_prompt=system_prompt, user_prompt=user_prompt, response_schema=GeneratedPost, ) return post def _select_prompt_template(self, formato: str, tipo: str) -> str: """Mappa formato_narrativo + tipo_contenuto al nome del file prompt. Logica di selezione: 1. Cerca corrispondenza esatta in _FORMAT_TO_PROMPT 2. Se non trovata, usa il fallback globale _DEFAULT_PROMPT Args: formato: Formato narrativo (es. "PAS", "AIDA", "BAB"). tipo: Tipo di contenuto (es. "valore", "promozione"). Returns: Nome del file prompt senza estensione (es. "pas_valore"). """ # Prova prima la combinazione formato + tipo (per future specializzazioni) combined_key = f"{formato.lower()}_{tipo.lower()}" if combined_key in {v for v in _FORMAT_TO_PROMPT.values()}: return combined_key # Altrimenti usa la mappa formato -> prompt return _FORMAT_TO_PROMPT.get(formato, _DEFAULT_PROMPT) # --------------------------------------------------------------------------- # Integrazione Unsplash # --------------------------------------------------------------------------- async def _resolve_unsplash_keywords( self, job: JobStatus, ) -> Optional[dict[str, str]]: """Risolve le keyword immagine dei post in URL Unsplash. Carica la settings da disco per verificare se unsplash_api_key e' configurata. Se non e' configurata, ritorna None (nessuna risoluzione, usa keyword testuali). Estrae tutte le keyword uniche dai PostResult success: - cover_image_keyword - slide.image_keyword per ogni slide - cta_image_keyword Args: job: JobStatus con i risultati generati. Returns: Dizionario {keyword: url} per le keyword risolte, o None se Unsplash non configurato. """ import json as _json # Carica settings per controllare unsplash_api_key settings_path = DATA_PATH / "config" / "settings.json" unsplash_api_key: Optional[str] = None if settings_path.exists(): try: data = _json.loads(settings_path.read_text(encoding="utf-8")) unsplash_api_key = data.get("unsplash_api_key") except Exception as e: logger.warning("Errore lettura settings per Unsplash: %s", str(e)) if not unsplash_api_key: logger.debug("unsplash_api_key non configurata — skip risoluzione keyword") return None # Estrai tutte le keyword uniche dai post success keywords: list[str] = [] for post_result in job.results: if post_result.status != "success" or post_result.post is None: continue post = post_result.post if post.cover_image_keyword: keywords.append(post.cover_image_keyword) for slide in post.slides: if slide.image_keyword: keywords.append(slide.image_keyword) if post.cta_image_keyword: keywords.append(post.cta_image_keyword) if not keywords: logger.debug("Nessuna keyword immagine trovata nei post") return None logger.info( "Avvio risoluzione Unsplash | job_id=%s | keyword_totali=%d", job.job_id, len(keywords), ) # Crea e usa UnsplashService unsplash_cache_path = DATA_PATH / "unsplash_cache.json" unsplash = UnsplashService( api_key=unsplash_api_key, cache_path=unsplash_cache_path, ) try: image_url_map = await unsplash.resolve_keywords(keywords) logger.info( "Risoluzione Unsplash completata | job_id=%s | risolte=%d/%d", job.job_id, len(image_url_map), len(set(keywords)), ) return image_url_map if image_url_map else None finally: await unsplash.close() # --------------------------------------------------------------------------- # Persistenza su disco # --------------------------------------------------------------------------- def _save_job_to_disk(self, job: JobStatus) -> None: """Salva i metadata del job in JSON per ricaricamento dopo restart. Args: job: JobStatus da persistere. """ self._outputs_path.mkdir(parents=True, exist_ok=True) job_path = self._outputs_path / f"{job.job_id}.json" # Serializza con Pydantic per gestire Optional e Literal data = { "job_id": job.job_id, "status": job.status, "total": job.total, "completed": job.completed, "current_post": job.current_post, "campagna": job.campagna, "error": job.error, "results": [r.model_dump() for r in job.results], "calendar": job.calendar.model_dump() if job.calendar else None, "image_url_map": job.image_url_map, } with open(job_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.debug("Job salvato su disco | job_id=%s | path=%s", job.job_id, job_path) def _load_job_from_disk(self, job_id: str) -> Optional[JobStatus]: """Carica il job dal file JSON su disco. Args: job_id: Identificatore del job. Returns: JobStatus ricostruito, o None se il file non esiste. """ from backend.schemas.calendar import CalendarResponse job_path = self._outputs_path / f"{job_id}.json" if not job_path.exists(): return None try: with open(job_path, "r", encoding="utf-8") as f: data = json.load(f) results = [PostResult.model_validate(r) for r in data.get("results", [])] calendar = ( CalendarResponse.model_validate(data["calendar"]) if data.get("calendar") else None ) job_status = JobStatus( job_id=data["job_id"], status=data["status"], total=data["total"], completed=data["completed"], current_post=data.get("current_post", 0), results=results, calendar=calendar, error=data.get("error"), campagna=data.get("campagna", ""), image_url_map=data.get("image_url_map"), ) # Metti in memoria per accesso futuro self._jobs[job_id] = job_status return job_status except Exception as e: logger.error( "Errore caricamento job da disco | job_id=%s | errore=%s", job_id, str(e), ) return None