"""GenerationPipeline — orchestra calendario -> LLM -> CSV con per-item error isolation. Gestisce: - Generazione async in background (asyncio.create_task) - Progresso real-time tracciato in _jobs dict (completed/total/current_post) - Per-item error isolation: un fallimento NON blocca il batch (Pitfall 5) - Persistenza su disco per ricaricamento dopo restart - Job status per polling dal frontend """ from __future__ import annotations import asyncio import json import logging import uuid from dataclasses import dataclass, field from pathlib import Path from typing import Literal, Optional from backend.schemas.calendar import CalendarRequest, CalendarResponse, CalendarSlot from backend.schemas.generate import ( GenerateResponse, GeneratedPost, PostResult, TopicResult, ) from backend.services.calendar_service import CalendarService from backend.services.csv_builder import CSVBuilder from backend.services.format_selector import FormatSelector from backend.services.llm_service import LLMService from backend.services.prompt_service import PromptService logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Mappa formato_narrativo -> nome file prompt # --------------------------------------------------------------------------- _FORMAT_TO_PROMPT: dict[str, str] = { "PAS": "pas_valore", "AIDA": "aida_promozione", "BAB": "bab_storytelling", "Listicle": "listicle_valore", "Storytelling": "bab_storytelling", # fallback su BAB per storytelling "Dato_Implicazione": "dato_news", "Obiezione_Risposta": "pas_valore", # fallback su PAS per obiezione/risposta } _DEFAULT_PROMPT = "pas_valore" # --------------------------------------------------------------------------- # Dataclass per tracciare lo stato di un job # --------------------------------------------------------------------------- @dataclass class JobStatus: """Stato real-time di un job di generazione batch.""" job_id: str status: Literal["running", "completed", "failed"] total: int completed: int current_post: int results: list[PostResult] = field(default_factory=list) calendar: Optional[CalendarResponse] = None error: Optional[str] = None campagna: str = "" # --------------------------------------------------------------------------- # Pipeline principale # --------------------------------------------------------------------------- class GenerationPipeline: """Orchestra il flusso completo: calendario -> topic -> LLM -> CSV.""" def __init__( self, llm_service: LLMService, prompt_service: PromptService, calendar_service: CalendarService, format_selector: FormatSelector, csv_builder: CSVBuilder, outputs_path: Path, ) -> None: """Inizializza la pipeline con i servizi necessari. Args: llm_service: Servizio per chiamate LLM con retry. prompt_service: Servizio per caricare/compilare prompt .txt. calendar_service: Servizio per generare il calendario editoriale. format_selector: Selettore formato narrativo per slot. csv_builder: Builder per CSV Canva-compatibile. outputs_path: Directory dove salvare i file CSV e JSON dei job. """ self._llm = llm_service self._prompts = prompt_service self._calendar = calendar_service self._formats = format_selector self._csv = csv_builder self._outputs_path = outputs_path # Dizionario in-memory per lo stato real-time dei job self._jobs: dict[str, JobStatus] = {} # --------------------------------------------------------------------------- # API pubblica # --------------------------------------------------------------------------- def generate_bulk_async( self, request: CalendarRequest, ) -> str: """Avvia la generazione batch come background task e ritorna subito. Questo metodo è SINCRONO dal punto di vista del caller: genera il job_id, inizializza lo stato, lancia il background task, e ritorna immediatamente. Il frontend può fare polling su get_job_status() per seguire il progresso. Args: request: Parametri del calendario (obiettivo, nicchie, etc.). Returns: job_id (UUID stringa) da usare per polling e download CSV. """ job_id = str(uuid.uuid4()) # Genera il calendario in modo sincrono (è CPU-bound, veloce) calendar = self._calendar.generate_calendar(request) # Inizializza stato job self._jobs[job_id] = JobStatus( job_id=job_id, status="running", total=len(calendar.slots), completed=0, current_post=0, results=[], calendar=calendar, campagna=request.obiettivo_campagna, ) logger.info( "Job avviato | job_id=%s | slot_totali=%d | campagna=%s", job_id, len(calendar.slots), request.obiettivo_campagna[:50], ) # Lancia il background task (non blocca) asyncio.create_task( self._run_generation(job_id, calendar, request) ) return job_id def get_job_status(self, job_id: str) -> Optional[JobStatus]: """Ritorna lo stato corrente del job per polling. Se il job non è in memoria, prova a caricarlo dal file JSON su disco (per supportare restart del server). Args: job_id: Identificatore del job. Returns: JobStatus aggiornato, o None se il job non esiste. """ if job_id in self._jobs: return self._jobs[job_id] # Prova a caricare da disco return self._load_job_from_disk(job_id) def get_job_results(self, job_id: str) -> Optional[GenerateResponse]: """Ritorna i risultati completi di un job completato. Args: job_id: Identificatore del job. Returns: GenerateResponse con tutti i PostResult, o None se non trovato. """ status = self.get_job_status(job_id) if status is None: return None success_count = sum(1 for r in status.results if r.status == "success") failed_count = sum(1 for r in status.results if r.status == "failed") return GenerateResponse( campagna=status.campagna, results=status.results, total=status.total, success_count=success_count, failed_count=failed_count, calendar=status.calendar, ) def generate_single( self, slot: CalendarSlot, obiettivo_campagna: str, brand_name: Optional[str] = None, tono: Optional[str] = None, ) -> PostResult: """Genera un singolo post per uno slot. Utile per rigenerare post falliti o per test. Args: slot: Slot del calendario con metadati strategici. obiettivo_campagna: Obiettivo principale della campagna. brand_name: Nome del brand (opzionale). tono: Tono di voce (opzionale). Returns: PostResult con status="success" o "failed". """ system_prompt = self._prompts.load_prompt("system_prompt") try: # Genera topic se mancante topic = slot.topic if not topic: topic = self._llm.generate_topic( system_prompt=system_prompt, obiettivo_campagna=obiettivo_campagna, tipo_contenuto=slot.tipo_contenuto, target_nicchia=slot.target_nicchia, fase_campagna=slot.fase_campagna, livello_schwartz=slot.livello_schwartz, prompt_service=self._prompts, ) # Genera il post post = self._generate_post_for_slot( slot=slot, topic=topic, obiettivo_campagna=obiettivo_campagna, brand_name=brand_name or "il tuo brand", tono=tono or "diretto e concreto", system_prompt=system_prompt, ) return PostResult( slot_index=slot.indice, status="success", post=post, ) except Exception as e: logger.error( "Errore generazione singola | slot_index=%d | errore=%s", slot.indice, str(e), ) return PostResult( slot_index=slot.indice, status="failed", error=str(e), ) # --------------------------------------------------------------------------- # Background task # --------------------------------------------------------------------------- async def _run_generation( self, job_id: str, calendar: CalendarResponse, request: CalendarRequest, ) -> None: """Task asincrono che genera tutti i post del batch. Esegue il loop su ogni slot del calendario con per-item error isolation: ogni slot è in un try/except INDIVIDUALE — un fallimento NON blocca il loop. Args: job_id: Identificatore del job. calendar: Calendario con gli slot da generare. request: Parametri della richiesta originale. """ job = self._jobs[job_id] system_prompt = self._prompts.load_prompt("system_prompt") brand_name = "il tuo brand" # Verrà preso da settings in fase successiva tono = "diretto e concreto" try: for slot in calendar.slots: # Aggiorna progresso real-time job.current_post = slot.indice # CRITICO Pitfall 5: try/except INDIVIDUALE per slot # Un fallimento NON blocca il batch try: logger.info( "Generazione slot | job_id=%s | slot=%d/%d | tipo=%s | nicchia=%s", job_id, slot.indice + 1, job.total, slot.tipo_contenuto, slot.target_nicchia, ) # Genera topic se non presente nello slot. # Controlla prima gli override passati dall'utente dallo Swipe File. topic = slot.topic if not topic and request.topic_overrides and slot.indice in request.topic_overrides: topic = request.topic_overrides[slot.indice] logger.info( "Topic override applicato | job_id=%s | slot=%d | topic=%s", job_id, slot.indice, topic[:60], ) if not topic: # Usa asyncio.to_thread per non bloccare l'event loop # (generate_topic usa time.sleep per inter_request_delay) topic = await asyncio.to_thread( self._llm.generate_topic, system_prompt, request.obiettivo_campagna, slot.tipo_contenuto, slot.target_nicchia, slot.fase_campagna, slot.livello_schwartz, self._prompts, ) # Genera il post completo post = await asyncio.to_thread( self._generate_post_for_slot, slot, topic, request.obiettivo_campagna, brand_name, tono, system_prompt, ) post_result = PostResult( slot_index=slot.indice, status="success", post=post, ) logger.info( "Slot completato | job_id=%s | slot=%d | status=success", job_id, slot.indice, ) except Exception as e: # Fallimento isolato — logga e continua con il prossimo slot logger.error( "Errore slot | job_id=%s | slot=%d | errore=%s", job_id, slot.indice, str(e), ) post_result = PostResult( slot_index=slot.indice, status="failed", error=str(e), ) # Aggiorna progresso (sia per success che per failed) job.results.append(post_result) job.completed += 1 # Genera CSV con i risultati success_results = [r for r in job.results if r.status == "success"] if success_results: self._outputs_path.mkdir(parents=True, exist_ok=True) self._csv.build_csv( posts=job.results, calendar=calendar, job_id=job_id, output_dir=self._outputs_path, ) logger.info( "CSV generato | job_id=%s | success=%d/%d", job_id, len(success_results), job.total, ) # Salva metadata job su disco per persistenza self._save_job_to_disk(job) # Aggiorna stato finale job.status = "completed" logger.info( "Job completato | job_id=%s | success=%d | failed=%d", job_id, sum(1 for r in job.results if r.status == "success"), sum(1 for r in job.results if r.status == "failed"), ) except Exception as e: # Errore catastrofico (non isolabile a un singolo slot) job.status = "failed" job.error = str(e) logger.error( "Job fallito | job_id=%s | errore=%s", job_id, str(e), exc_info=True, ) # --------------------------------------------------------------------------- # Generazione singolo post # --------------------------------------------------------------------------- def _generate_post_for_slot( self, slot: CalendarSlot, topic: str, obiettivo_campagna: str, brand_name: str, tono: str, system_prompt: str, ) -> GeneratedPost: """Genera il contenuto completo di un post dato lo slot e il topic. Seleziona il prompt template corretto in base al formato_narrativo, compila il prompt con le variabili, chiama l'LLM. Args: slot: Slot con metadati strategici. topic: Topic specifico del post (già generato). obiettivo_campagna: Obiettivo della campagna. brand_name: Nome del brand. tono: Tono di voce. system_prompt: System prompt caricato. Returns: GeneratedPost validato con Pydantic. """ prompt_name = self._select_prompt_template( slot.formato_narrativo, slot.tipo_contenuto ) # Verifica che il prompt template esista if not self._prompts.prompt_exists(prompt_name): logger.warning( "Prompt '%s' non trovato, uso fallback '%s'", prompt_name, _DEFAULT_PROMPT, ) prompt_name = _DEFAULT_PROMPT # Prepara le variabili comuni a tutti i prompt variables: dict[str, str] = { "obiettivo_campagna": obiettivo_campagna, "topic": topic, "target_nicchia": slot.target_nicchia, "livello_schwartz": slot.livello_schwartz, "brand_name": brand_name, } # Aggiungi variabili specifiche per alcuni prompt # aida_promozione ha {{call_to_action}} come variabile aggiuntiva if "call_to_action" in self._prompts.get_required_variables(prompt_name): variables["call_to_action"] = f"Scopri di più su {brand_name}" user_prompt = self._prompts.compile_prompt(prompt_name, variables) post = self._llm.generate( system_prompt=system_prompt, user_prompt=user_prompt, response_schema=GeneratedPost, ) return post def _select_prompt_template(self, formato: str, tipo: str) -> str: """Mappa formato_narrativo + tipo_contenuto al nome del file prompt. Logica di selezione: 1. Cerca corrispondenza esatta in _FORMAT_TO_PROMPT 2. Se non trovata, usa il fallback globale _DEFAULT_PROMPT Args: formato: Formato narrativo (es. "PAS", "AIDA", "BAB"). tipo: Tipo di contenuto (es. "valore", "promozione"). Returns: Nome del file prompt senza estensione (es. "pas_valore"). """ # Prova prima la combinazione formato + tipo (per future specializzazioni) combined_key = f"{formato.lower()}_{tipo.lower()}" if combined_key in {v for v in _FORMAT_TO_PROMPT.values()}: return combined_key # Altrimenti usa la mappa formato -> prompt return _FORMAT_TO_PROMPT.get(formato, _DEFAULT_PROMPT) # --------------------------------------------------------------------------- # Persistenza su disco # --------------------------------------------------------------------------- def _save_job_to_disk(self, job: JobStatus) -> None: """Salva i metadata del job in JSON per ricaricamento dopo restart. Args: job: JobStatus da persistere. """ self._outputs_path.mkdir(parents=True, exist_ok=True) job_path = self._outputs_path / f"{job.job_id}.json" # Serializza con Pydantic per gestire Optional e Literal data = { "job_id": job.job_id, "status": job.status, "total": job.total, "completed": job.completed, "current_post": job.current_post, "campagna": job.campagna, "error": job.error, "results": [r.model_dump() for r in job.results], "calendar": job.calendar.model_dump() if job.calendar else None, } with open(job_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.debug("Job salvato su disco | job_id=%s | path=%s", job.job_id, job_path) def _load_job_from_disk(self, job_id: str) -> Optional[JobStatus]: """Carica il job dal file JSON su disco. Args: job_id: Identificatore del job. Returns: JobStatus ricostruito, o None se il file non esiste. """ from backend.schemas.calendar import CalendarResponse job_path = self._outputs_path / f"{job_id}.json" if not job_path.exists(): return None try: with open(job_path, "r", encoding="utf-8") as f: data = json.load(f) results = [PostResult.model_validate(r) for r in data.get("results", [])] calendar = ( CalendarResponse.model_validate(data["calendar"]) if data.get("calendar") else None ) job_status = JobStatus( job_id=data["job_id"], status=data["status"], total=data["total"], completed=data["completed"], current_post=data.get("current_post", 0), results=results, calendar=calendar, error=data.get("error"), campagna=data.get("campagna", ""), ) # Metti in memoria per accesso futuro self._jobs[job_id] = job_status return job_status except Exception as e: logger.error( "Errore caricamento job da disco | job_id=%s | errore=%s", job_id, str(e), ) return None