Files
postgenerator/backend/services/generation_pipeline.py
Michele d8e3eb9415 fix(01): calendar slot merge in OutputReview for PN/Schwartz badges
GenerateResponse now includes calendar field from backend.
OutputReview merges CalendarSlot into PostResult via slot_index,
enabling BadgePN, BadgeSchwartz rendering and Retry button.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 03:14:07 +01:00

573 lines
20 KiB
Python

"""GenerationPipeline — orchestra calendario -> LLM -> CSV con per-item error isolation.
Gestisce:
- Generazione async in background (asyncio.create_task)
- Progresso real-time tracciato in _jobs dict (completed/total/current_post)
- Per-item error isolation: un fallimento NON blocca il batch (Pitfall 5)
- Persistenza su disco per ricaricamento dopo restart
- Job status per polling dal frontend
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal, Optional
from backend.schemas.calendar import CalendarRequest, CalendarResponse, CalendarSlot
from backend.schemas.generate import (
GenerateResponse,
GeneratedPost,
PostResult,
TopicResult,
)
from backend.services.calendar_service import CalendarService
from backend.services.csv_builder import CSVBuilder
from backend.services.format_selector import FormatSelector
from backend.services.llm_service import LLMService
from backend.services.prompt_service import PromptService
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Mappa formato_narrativo -> nome file prompt
# ---------------------------------------------------------------------------
_FORMAT_TO_PROMPT: dict[str, str] = {
"PAS": "pas_valore",
"AIDA": "aida_promozione",
"BAB": "bab_storytelling",
"Listicle": "listicle_valore",
"Storytelling": "bab_storytelling", # fallback su BAB per storytelling
"Dato_Implicazione": "dato_news",
"Obiezione_Risposta": "pas_valore", # fallback su PAS per obiezione/risposta
}
_DEFAULT_PROMPT = "pas_valore"
# ---------------------------------------------------------------------------
# Dataclass per tracciare lo stato di un job
# ---------------------------------------------------------------------------
@dataclass
class JobStatus:
"""Stato real-time di un job di generazione batch."""
job_id: str
status: Literal["running", "completed", "failed"]
total: int
completed: int
current_post: int
results: list[PostResult] = field(default_factory=list)
calendar: Optional[CalendarResponse] = None
error: Optional[str] = None
campagna: str = ""
# ---------------------------------------------------------------------------
# Pipeline principale
# ---------------------------------------------------------------------------
class GenerationPipeline:
"""Orchestra il flusso completo: calendario -> topic -> LLM -> CSV."""
def __init__(
self,
llm_service: LLMService,
prompt_service: PromptService,
calendar_service: CalendarService,
format_selector: FormatSelector,
csv_builder: CSVBuilder,
outputs_path: Path,
) -> None:
"""Inizializza la pipeline con i servizi necessari.
Args:
llm_service: Servizio per chiamate LLM con retry.
prompt_service: Servizio per caricare/compilare prompt .txt.
calendar_service: Servizio per generare il calendario editoriale.
format_selector: Selettore formato narrativo per slot.
csv_builder: Builder per CSV Canva-compatibile.
outputs_path: Directory dove salvare i file CSV e JSON dei job.
"""
self._llm = llm_service
self._prompts = prompt_service
self._calendar = calendar_service
self._formats = format_selector
self._csv = csv_builder
self._outputs_path = outputs_path
# Dizionario in-memory per lo stato real-time dei job
self._jobs: dict[str, JobStatus] = {}
# ---------------------------------------------------------------------------
# API pubblica
# ---------------------------------------------------------------------------
def generate_bulk_async(
self,
request: CalendarRequest,
) -> str:
"""Avvia la generazione batch come background task e ritorna subito.
Questo metodo è SINCRONO dal punto di vista del caller: genera il job_id,
inizializza lo stato, lancia il background task, e ritorna immediatamente.
Il frontend può fare polling su get_job_status() per seguire il progresso.
Args:
request: Parametri del calendario (obiettivo, nicchie, etc.).
Returns:
job_id (UUID stringa) da usare per polling e download CSV.
"""
job_id = str(uuid.uuid4())
# Genera il calendario in modo sincrono (è CPU-bound, veloce)
calendar = self._calendar.generate_calendar(request)
# Inizializza stato job
self._jobs[job_id] = JobStatus(
job_id=job_id,
status="running",
total=len(calendar.slots),
completed=0,
current_post=0,
results=[],
calendar=calendar,
campagna=request.obiettivo_campagna,
)
logger.info(
"Job avviato | job_id=%s | slot_totali=%d | campagna=%s",
job_id,
len(calendar.slots),
request.obiettivo_campagna[:50],
)
# Lancia il background task (non blocca)
asyncio.create_task(
self._run_generation(job_id, calendar, request)
)
return job_id
def get_job_status(self, job_id: str) -> Optional[JobStatus]:
"""Ritorna lo stato corrente del job per polling.
Se il job non è in memoria, prova a caricarlo dal file JSON su disco
(per supportare restart del server).
Args:
job_id: Identificatore del job.
Returns:
JobStatus aggiornato, o None se il job non esiste.
"""
if job_id in self._jobs:
return self._jobs[job_id]
# Prova a caricare da disco
return self._load_job_from_disk(job_id)
def get_job_results(self, job_id: str) -> Optional[GenerateResponse]:
"""Ritorna i risultati completi di un job completato.
Args:
job_id: Identificatore del job.
Returns:
GenerateResponse con tutti i PostResult, o None se non trovato.
"""
status = self.get_job_status(job_id)
if status is None:
return None
success_count = sum(1 for r in status.results if r.status == "success")
failed_count = sum(1 for r in status.results if r.status == "failed")
return GenerateResponse(
campagna=status.campagna,
results=status.results,
total=status.total,
success_count=success_count,
failed_count=failed_count,
calendar=status.calendar,
)
def generate_single(
self,
slot: CalendarSlot,
obiettivo_campagna: str,
brand_name: Optional[str] = None,
tono: Optional[str] = None,
) -> PostResult:
"""Genera un singolo post per uno slot.
Utile per rigenerare post falliti o per test.
Args:
slot: Slot del calendario con metadati strategici.
obiettivo_campagna: Obiettivo principale della campagna.
brand_name: Nome del brand (opzionale).
tono: Tono di voce (opzionale).
Returns:
PostResult con status="success" o "failed".
"""
system_prompt = self._prompts.load_prompt("system_prompt")
try:
# Genera topic se mancante
topic = slot.topic
if not topic:
topic = self._llm.generate_topic(
system_prompt=system_prompt,
obiettivo_campagna=obiettivo_campagna,
tipo_contenuto=slot.tipo_contenuto,
target_nicchia=slot.target_nicchia,
fase_campagna=slot.fase_campagna,
livello_schwartz=slot.livello_schwartz,
prompt_service=self._prompts,
)
# Genera il post
post = self._generate_post_for_slot(
slot=slot,
topic=topic,
obiettivo_campagna=obiettivo_campagna,
brand_name=brand_name or "il tuo brand",
tono=tono or "diretto e concreto",
system_prompt=system_prompt,
)
return PostResult(
slot_index=slot.indice,
status="success",
post=post,
)
except Exception as e:
logger.error(
"Errore generazione singola | slot_index=%d | errore=%s",
slot.indice,
str(e),
)
return PostResult(
slot_index=slot.indice,
status="failed",
error=str(e),
)
# ---------------------------------------------------------------------------
# Background task
# ---------------------------------------------------------------------------
async def _run_generation(
self,
job_id: str,
calendar: CalendarResponse,
request: CalendarRequest,
) -> None:
"""Task asincrono che genera tutti i post del batch.
Esegue il loop su ogni slot del calendario con per-item error isolation:
ogni slot è in un try/except INDIVIDUALE — un fallimento NON blocca il loop.
Args:
job_id: Identificatore del job.
calendar: Calendario con gli slot da generare.
request: Parametri della richiesta originale.
"""
job = self._jobs[job_id]
system_prompt = self._prompts.load_prompt("system_prompt")
brand_name = "il tuo brand" # Verrà preso da settings in fase successiva
tono = "diretto e concreto"
try:
for slot in calendar.slots:
# Aggiorna progresso real-time
job.current_post = slot.indice
# CRITICO Pitfall 5: try/except INDIVIDUALE per slot
# Un fallimento NON blocca il batch
try:
logger.info(
"Generazione slot | job_id=%s | slot=%d/%d | tipo=%s | nicchia=%s",
job_id,
slot.indice + 1,
job.total,
slot.tipo_contenuto,
slot.target_nicchia,
)
# Genera topic se non presente nello slot
topic = slot.topic
if not topic:
# Usa asyncio.to_thread per non bloccare l'event loop
# (generate_topic usa time.sleep per inter_request_delay)
topic = await asyncio.to_thread(
self._llm.generate_topic,
system_prompt,
request.obiettivo_campagna,
slot.tipo_contenuto,
slot.target_nicchia,
slot.fase_campagna,
slot.livello_schwartz,
self._prompts,
)
# Genera il post completo
post = await asyncio.to_thread(
self._generate_post_for_slot,
slot,
topic,
request.obiettivo_campagna,
brand_name,
tono,
system_prompt,
)
post_result = PostResult(
slot_index=slot.indice,
status="success",
post=post,
)
logger.info(
"Slot completato | job_id=%s | slot=%d | status=success",
job_id,
slot.indice,
)
except Exception as e:
# Fallimento isolato — logga e continua con il prossimo slot
logger.error(
"Errore slot | job_id=%s | slot=%d | errore=%s",
job_id,
slot.indice,
str(e),
)
post_result = PostResult(
slot_index=slot.indice,
status="failed",
error=str(e),
)
# Aggiorna progresso (sia per success che per failed)
job.results.append(post_result)
job.completed += 1
# Genera CSV con i risultati
success_results = [r for r in job.results if r.status == "success"]
if success_results:
self._outputs_path.mkdir(parents=True, exist_ok=True)
self._csv.build_csv(
posts=job.results,
calendar=calendar,
job_id=job_id,
output_dir=self._outputs_path,
)
logger.info(
"CSV generato | job_id=%s | success=%d/%d",
job_id,
len(success_results),
job.total,
)
# Salva metadata job su disco per persistenza
self._save_job_to_disk(job)
# Aggiorna stato finale
job.status = "completed"
logger.info(
"Job completato | job_id=%s | success=%d | failed=%d",
job_id,
sum(1 for r in job.results if r.status == "success"),
sum(1 for r in job.results if r.status == "failed"),
)
except Exception as e:
# Errore catastrofico (non isolabile a un singolo slot)
job.status = "failed"
job.error = str(e)
logger.error(
"Job fallito | job_id=%s | errore=%s",
job_id,
str(e),
exc_info=True,
)
# ---------------------------------------------------------------------------
# Generazione singolo post
# ---------------------------------------------------------------------------
def _generate_post_for_slot(
self,
slot: CalendarSlot,
topic: str,
obiettivo_campagna: str,
brand_name: str,
tono: str,
system_prompt: str,
) -> GeneratedPost:
"""Genera il contenuto completo di un post dato lo slot e il topic.
Seleziona il prompt template corretto in base al formato_narrativo,
compila il prompt con le variabili, chiama l'LLM.
Args:
slot: Slot con metadati strategici.
topic: Topic specifico del post (già generato).
obiettivo_campagna: Obiettivo della campagna.
brand_name: Nome del brand.
tono: Tono di voce.
system_prompt: System prompt caricato.
Returns:
GeneratedPost validato con Pydantic.
"""
prompt_name = self._select_prompt_template(
slot.formato_narrativo, slot.tipo_contenuto
)
# Verifica che il prompt template esista
if not self._prompts.prompt_exists(prompt_name):
logger.warning(
"Prompt '%s' non trovato, uso fallback '%s'",
prompt_name,
_DEFAULT_PROMPT,
)
prompt_name = _DEFAULT_PROMPT
# Prepara le variabili comuni a tutti i prompt
variables: dict[str, str] = {
"obiettivo_campagna": obiettivo_campagna,
"topic": topic,
"target_nicchia": slot.target_nicchia,
"livello_schwartz": slot.livello_schwartz,
"brand_name": brand_name,
}
# Aggiungi variabili specifiche per alcuni prompt
# aida_promozione ha {{call_to_action}} come variabile aggiuntiva
if "call_to_action" in self._prompts.get_required_variables(prompt_name):
variables["call_to_action"] = f"Scopri di più su {brand_name}"
user_prompt = self._prompts.compile_prompt(prompt_name, variables)
post = self._llm.generate(
system_prompt=system_prompt,
user_prompt=user_prompt,
response_schema=GeneratedPost,
)
return post
def _select_prompt_template(self, formato: str, tipo: str) -> str:
"""Mappa formato_narrativo + tipo_contenuto al nome del file prompt.
Logica di selezione:
1. Cerca corrispondenza esatta in _FORMAT_TO_PROMPT
2. Se non trovata, usa il fallback globale _DEFAULT_PROMPT
Args:
formato: Formato narrativo (es. "PAS", "AIDA", "BAB").
tipo: Tipo di contenuto (es. "valore", "promozione").
Returns:
Nome del file prompt senza estensione (es. "pas_valore").
"""
# Prova prima la combinazione formato + tipo (per future specializzazioni)
combined_key = f"{formato.lower()}_{tipo.lower()}"
if combined_key in {v for v in _FORMAT_TO_PROMPT.values()}:
return combined_key
# Altrimenti usa la mappa formato -> prompt
return _FORMAT_TO_PROMPT.get(formato, _DEFAULT_PROMPT)
# ---------------------------------------------------------------------------
# Persistenza su disco
# ---------------------------------------------------------------------------
def _save_job_to_disk(self, job: JobStatus) -> None:
"""Salva i metadata del job in JSON per ricaricamento dopo restart.
Args:
job: JobStatus da persistere.
"""
self._outputs_path.mkdir(parents=True, exist_ok=True)
job_path = self._outputs_path / f"{job.job_id}.json"
# Serializza con Pydantic per gestire Optional e Literal
data = {
"job_id": job.job_id,
"status": job.status,
"total": job.total,
"completed": job.completed,
"current_post": job.current_post,
"campagna": job.campagna,
"error": job.error,
"results": [r.model_dump() for r in job.results],
"calendar": job.calendar.model_dump() if job.calendar else None,
}
with open(job_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.debug("Job salvato su disco | job_id=%s | path=%s", job.job_id, job_path)
def _load_job_from_disk(self, job_id: str) -> Optional[JobStatus]:
"""Carica il job dal file JSON su disco.
Args:
job_id: Identificatore del job.
Returns:
JobStatus ricostruito, o None se il file non esiste.
"""
from backend.schemas.calendar import CalendarResponse
job_path = self._outputs_path / f"{job_id}.json"
if not job_path.exists():
return None
try:
with open(job_path, "r", encoding="utf-8") as f:
data = json.load(f)
results = [PostResult.model_validate(r) for r in data.get("results", [])]
calendar = (
CalendarResponse.model_validate(data["calendar"])
if data.get("calendar")
else None
)
job_status = JobStatus(
job_id=data["job_id"],
status=data["status"],
total=data["total"],
completed=data["completed"],
current_post=data.get("current_post", 0),
results=results,
calendar=calendar,
error=data.get("error"),
campagna=data.get("campagna", ""),
)
# Metti in memoria per accesso futuro
self._jobs[job_id] = job_status
return job_status
except Exception as e:
logger.error(
"Errore caricamento job da disco | job_id=%s | errore=%s",
job_id,
str(e),
)
return None