- Aggiunto campo topic_overrides: Optional[dict[int, str]] a CalendarRequest - GenerationPipeline._run_generation ora controlla request.topic_overrides prima di chiamare LLM per generare il topic - Slot con override saltano la chiamata LLM per il topic - Log informativo quando un override viene applicato - Slot senza override continuano a funzionare come prima
582 lines
20 KiB
Python
582 lines
20 KiB
Python
"""GenerationPipeline — orchestra calendario -> LLM -> CSV con per-item error isolation.
|
|
|
|
Gestisce:
|
|
- Generazione async in background (asyncio.create_task)
|
|
- Progresso real-time tracciato in _jobs dict (completed/total/current_post)
|
|
- Per-item error isolation: un fallimento NON blocca il batch (Pitfall 5)
|
|
- Persistenza su disco per ricaricamento dopo restart
|
|
- Job status per polling dal frontend
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Literal, Optional
|
|
|
|
from backend.schemas.calendar import CalendarRequest, CalendarResponse, CalendarSlot
|
|
from backend.schemas.generate import (
|
|
GenerateResponse,
|
|
GeneratedPost,
|
|
PostResult,
|
|
TopicResult,
|
|
)
|
|
from backend.services.calendar_service import CalendarService
|
|
from backend.services.csv_builder import CSVBuilder
|
|
from backend.services.format_selector import FormatSelector
|
|
from backend.services.llm_service import LLMService
|
|
from backend.services.prompt_service import PromptService
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Mappa formato_narrativo -> nome file prompt
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_FORMAT_TO_PROMPT: dict[str, str] = {
|
|
"PAS": "pas_valore",
|
|
"AIDA": "aida_promozione",
|
|
"BAB": "bab_storytelling",
|
|
"Listicle": "listicle_valore",
|
|
"Storytelling": "bab_storytelling", # fallback su BAB per storytelling
|
|
"Dato_Implicazione": "dato_news",
|
|
"Obiezione_Risposta": "pas_valore", # fallback su PAS per obiezione/risposta
|
|
}
|
|
|
|
_DEFAULT_PROMPT = "pas_valore"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dataclass per tracciare lo stato di un job
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class JobStatus:
|
|
"""Stato real-time di un job di generazione batch."""
|
|
|
|
job_id: str
|
|
status: Literal["running", "completed", "failed"]
|
|
total: int
|
|
completed: int
|
|
current_post: int
|
|
results: list[PostResult] = field(default_factory=list)
|
|
calendar: Optional[CalendarResponse] = None
|
|
error: Optional[str] = None
|
|
campagna: str = ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pipeline principale
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class GenerationPipeline:
|
|
"""Orchestra il flusso completo: calendario -> topic -> LLM -> CSV."""
|
|
|
|
def __init__(
|
|
self,
|
|
llm_service: LLMService,
|
|
prompt_service: PromptService,
|
|
calendar_service: CalendarService,
|
|
format_selector: FormatSelector,
|
|
csv_builder: CSVBuilder,
|
|
outputs_path: Path,
|
|
) -> None:
|
|
"""Inizializza la pipeline con i servizi necessari.
|
|
|
|
Args:
|
|
llm_service: Servizio per chiamate LLM con retry.
|
|
prompt_service: Servizio per caricare/compilare prompt .txt.
|
|
calendar_service: Servizio per generare il calendario editoriale.
|
|
format_selector: Selettore formato narrativo per slot.
|
|
csv_builder: Builder per CSV Canva-compatibile.
|
|
outputs_path: Directory dove salvare i file CSV e JSON dei job.
|
|
"""
|
|
self._llm = llm_service
|
|
self._prompts = prompt_service
|
|
self._calendar = calendar_service
|
|
self._formats = format_selector
|
|
self._csv = csv_builder
|
|
self._outputs_path = outputs_path
|
|
|
|
# Dizionario in-memory per lo stato real-time dei job
|
|
self._jobs: dict[str, JobStatus] = {}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# API pubblica
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def generate_bulk_async(
|
|
self,
|
|
request: CalendarRequest,
|
|
) -> str:
|
|
"""Avvia la generazione batch come background task e ritorna subito.
|
|
|
|
Questo metodo è SINCRONO dal punto di vista del caller: genera il job_id,
|
|
inizializza lo stato, lancia il background task, e ritorna immediatamente.
|
|
Il frontend può fare polling su get_job_status() per seguire il progresso.
|
|
|
|
Args:
|
|
request: Parametri del calendario (obiettivo, nicchie, etc.).
|
|
|
|
Returns:
|
|
job_id (UUID stringa) da usare per polling e download CSV.
|
|
"""
|
|
job_id = str(uuid.uuid4())
|
|
|
|
# Genera il calendario in modo sincrono (è CPU-bound, veloce)
|
|
calendar = self._calendar.generate_calendar(request)
|
|
|
|
# Inizializza stato job
|
|
self._jobs[job_id] = JobStatus(
|
|
job_id=job_id,
|
|
status="running",
|
|
total=len(calendar.slots),
|
|
completed=0,
|
|
current_post=0,
|
|
results=[],
|
|
calendar=calendar,
|
|
campagna=request.obiettivo_campagna,
|
|
)
|
|
|
|
logger.info(
|
|
"Job avviato | job_id=%s | slot_totali=%d | campagna=%s",
|
|
job_id,
|
|
len(calendar.slots),
|
|
request.obiettivo_campagna[:50],
|
|
)
|
|
|
|
# Lancia il background task (non blocca)
|
|
asyncio.create_task(
|
|
self._run_generation(job_id, calendar, request)
|
|
)
|
|
|
|
return job_id
|
|
|
|
def get_job_status(self, job_id: str) -> Optional[JobStatus]:
|
|
"""Ritorna lo stato corrente del job per polling.
|
|
|
|
Se il job non è in memoria, prova a caricarlo dal file JSON su disco
|
|
(per supportare restart del server).
|
|
|
|
Args:
|
|
job_id: Identificatore del job.
|
|
|
|
Returns:
|
|
JobStatus aggiornato, o None se il job non esiste.
|
|
"""
|
|
if job_id in self._jobs:
|
|
return self._jobs[job_id]
|
|
|
|
# Prova a caricare da disco
|
|
return self._load_job_from_disk(job_id)
|
|
|
|
def get_job_results(self, job_id: str) -> Optional[GenerateResponse]:
|
|
"""Ritorna i risultati completi di un job completato.
|
|
|
|
Args:
|
|
job_id: Identificatore del job.
|
|
|
|
Returns:
|
|
GenerateResponse con tutti i PostResult, o None se non trovato.
|
|
"""
|
|
status = self.get_job_status(job_id)
|
|
if status is None:
|
|
return None
|
|
|
|
success_count = sum(1 for r in status.results if r.status == "success")
|
|
failed_count = sum(1 for r in status.results if r.status == "failed")
|
|
|
|
return GenerateResponse(
|
|
campagna=status.campagna,
|
|
results=status.results,
|
|
total=status.total,
|
|
success_count=success_count,
|
|
failed_count=failed_count,
|
|
calendar=status.calendar,
|
|
)
|
|
|
|
def generate_single(
|
|
self,
|
|
slot: CalendarSlot,
|
|
obiettivo_campagna: str,
|
|
brand_name: Optional[str] = None,
|
|
tono: Optional[str] = None,
|
|
) -> PostResult:
|
|
"""Genera un singolo post per uno slot.
|
|
|
|
Utile per rigenerare post falliti o per test.
|
|
|
|
Args:
|
|
slot: Slot del calendario con metadati strategici.
|
|
obiettivo_campagna: Obiettivo principale della campagna.
|
|
brand_name: Nome del brand (opzionale).
|
|
tono: Tono di voce (opzionale).
|
|
|
|
Returns:
|
|
PostResult con status="success" o "failed".
|
|
"""
|
|
system_prompt = self._prompts.load_prompt("system_prompt")
|
|
|
|
try:
|
|
# Genera topic se mancante
|
|
topic = slot.topic
|
|
if not topic:
|
|
topic = self._llm.generate_topic(
|
|
system_prompt=system_prompt,
|
|
obiettivo_campagna=obiettivo_campagna,
|
|
tipo_contenuto=slot.tipo_contenuto,
|
|
target_nicchia=slot.target_nicchia,
|
|
fase_campagna=slot.fase_campagna,
|
|
livello_schwartz=slot.livello_schwartz,
|
|
prompt_service=self._prompts,
|
|
)
|
|
|
|
# Genera il post
|
|
post = self._generate_post_for_slot(
|
|
slot=slot,
|
|
topic=topic,
|
|
obiettivo_campagna=obiettivo_campagna,
|
|
brand_name=brand_name or "il tuo brand",
|
|
tono=tono or "diretto e concreto",
|
|
system_prompt=system_prompt,
|
|
)
|
|
|
|
return PostResult(
|
|
slot_index=slot.indice,
|
|
status="success",
|
|
post=post,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Errore generazione singola | slot_index=%d | errore=%s",
|
|
slot.indice,
|
|
str(e),
|
|
)
|
|
return PostResult(
|
|
slot_index=slot.indice,
|
|
status="failed",
|
|
error=str(e),
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Background task
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _run_generation(
|
|
self,
|
|
job_id: str,
|
|
calendar: CalendarResponse,
|
|
request: CalendarRequest,
|
|
) -> None:
|
|
"""Task asincrono che genera tutti i post del batch.
|
|
|
|
Esegue il loop su ogni slot del calendario con per-item error isolation:
|
|
ogni slot è in un try/except INDIVIDUALE — un fallimento NON blocca il loop.
|
|
|
|
Args:
|
|
job_id: Identificatore del job.
|
|
calendar: Calendario con gli slot da generare.
|
|
request: Parametri della richiesta originale.
|
|
"""
|
|
job = self._jobs[job_id]
|
|
system_prompt = self._prompts.load_prompt("system_prompt")
|
|
brand_name = "il tuo brand" # Verrà preso da settings in fase successiva
|
|
tono = "diretto e concreto"
|
|
|
|
try:
|
|
for slot in calendar.slots:
|
|
# Aggiorna progresso real-time
|
|
job.current_post = slot.indice
|
|
|
|
# CRITICO Pitfall 5: try/except INDIVIDUALE per slot
|
|
# Un fallimento NON blocca il batch
|
|
try:
|
|
logger.info(
|
|
"Generazione slot | job_id=%s | slot=%d/%d | tipo=%s | nicchia=%s",
|
|
job_id,
|
|
slot.indice + 1,
|
|
job.total,
|
|
slot.tipo_contenuto,
|
|
slot.target_nicchia,
|
|
)
|
|
|
|
# Genera topic se non presente nello slot.
|
|
# Controlla prima gli override passati dall'utente dallo Swipe File.
|
|
topic = slot.topic
|
|
if not topic and request.topic_overrides and slot.indice in request.topic_overrides:
|
|
topic = request.topic_overrides[slot.indice]
|
|
logger.info(
|
|
"Topic override applicato | job_id=%s | slot=%d | topic=%s",
|
|
job_id,
|
|
slot.indice,
|
|
topic[:60],
|
|
)
|
|
if not topic:
|
|
# Usa asyncio.to_thread per non bloccare l'event loop
|
|
# (generate_topic usa time.sleep per inter_request_delay)
|
|
topic = await asyncio.to_thread(
|
|
self._llm.generate_topic,
|
|
system_prompt,
|
|
request.obiettivo_campagna,
|
|
slot.tipo_contenuto,
|
|
slot.target_nicchia,
|
|
slot.fase_campagna,
|
|
slot.livello_schwartz,
|
|
self._prompts,
|
|
)
|
|
|
|
# Genera il post completo
|
|
post = await asyncio.to_thread(
|
|
self._generate_post_for_slot,
|
|
slot,
|
|
topic,
|
|
request.obiettivo_campagna,
|
|
brand_name,
|
|
tono,
|
|
system_prompt,
|
|
)
|
|
|
|
post_result = PostResult(
|
|
slot_index=slot.indice,
|
|
status="success",
|
|
post=post,
|
|
)
|
|
logger.info(
|
|
"Slot completato | job_id=%s | slot=%d | status=success",
|
|
job_id,
|
|
slot.indice,
|
|
)
|
|
|
|
except Exception as e:
|
|
# Fallimento isolato — logga e continua con il prossimo slot
|
|
logger.error(
|
|
"Errore slot | job_id=%s | slot=%d | errore=%s",
|
|
job_id,
|
|
slot.indice,
|
|
str(e),
|
|
)
|
|
post_result = PostResult(
|
|
slot_index=slot.indice,
|
|
status="failed",
|
|
error=str(e),
|
|
)
|
|
|
|
# Aggiorna progresso (sia per success che per failed)
|
|
job.results.append(post_result)
|
|
job.completed += 1
|
|
|
|
# Genera CSV con i risultati
|
|
success_results = [r for r in job.results if r.status == "success"]
|
|
if success_results:
|
|
self._outputs_path.mkdir(parents=True, exist_ok=True)
|
|
self._csv.build_csv(
|
|
posts=job.results,
|
|
calendar=calendar,
|
|
job_id=job_id,
|
|
output_dir=self._outputs_path,
|
|
)
|
|
logger.info(
|
|
"CSV generato | job_id=%s | success=%d/%d",
|
|
job_id,
|
|
len(success_results),
|
|
job.total,
|
|
)
|
|
|
|
# Salva metadata job su disco per persistenza
|
|
self._save_job_to_disk(job)
|
|
|
|
# Aggiorna stato finale
|
|
job.status = "completed"
|
|
logger.info(
|
|
"Job completato | job_id=%s | success=%d | failed=%d",
|
|
job_id,
|
|
sum(1 for r in job.results if r.status == "success"),
|
|
sum(1 for r in job.results if r.status == "failed"),
|
|
)
|
|
|
|
except Exception as e:
|
|
# Errore catastrofico (non isolabile a un singolo slot)
|
|
job.status = "failed"
|
|
job.error = str(e)
|
|
logger.error(
|
|
"Job fallito | job_id=%s | errore=%s",
|
|
job_id,
|
|
str(e),
|
|
exc_info=True,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Generazione singolo post
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _generate_post_for_slot(
|
|
self,
|
|
slot: CalendarSlot,
|
|
topic: str,
|
|
obiettivo_campagna: str,
|
|
brand_name: str,
|
|
tono: str,
|
|
system_prompt: str,
|
|
) -> GeneratedPost:
|
|
"""Genera il contenuto completo di un post dato lo slot e il topic.
|
|
|
|
Seleziona il prompt template corretto in base al formato_narrativo,
|
|
compila il prompt con le variabili, chiama l'LLM.
|
|
|
|
Args:
|
|
slot: Slot con metadati strategici.
|
|
topic: Topic specifico del post (già generato).
|
|
obiettivo_campagna: Obiettivo della campagna.
|
|
brand_name: Nome del brand.
|
|
tono: Tono di voce.
|
|
system_prompt: System prompt caricato.
|
|
|
|
Returns:
|
|
GeneratedPost validato con Pydantic.
|
|
"""
|
|
prompt_name = self._select_prompt_template(
|
|
slot.formato_narrativo, slot.tipo_contenuto
|
|
)
|
|
|
|
# Verifica che il prompt template esista
|
|
if not self._prompts.prompt_exists(prompt_name):
|
|
logger.warning(
|
|
"Prompt '%s' non trovato, uso fallback '%s'",
|
|
prompt_name,
|
|
_DEFAULT_PROMPT,
|
|
)
|
|
prompt_name = _DEFAULT_PROMPT
|
|
|
|
# Prepara le variabili comuni a tutti i prompt
|
|
variables: dict[str, str] = {
|
|
"obiettivo_campagna": obiettivo_campagna,
|
|
"topic": topic,
|
|
"target_nicchia": slot.target_nicchia,
|
|
"livello_schwartz": slot.livello_schwartz,
|
|
"brand_name": brand_name,
|
|
}
|
|
|
|
# Aggiungi variabili specifiche per alcuni prompt
|
|
# aida_promozione ha {{call_to_action}} come variabile aggiuntiva
|
|
if "call_to_action" in self._prompts.get_required_variables(prompt_name):
|
|
variables["call_to_action"] = f"Scopri di più su {brand_name}"
|
|
|
|
user_prompt = self._prompts.compile_prompt(prompt_name, variables)
|
|
|
|
post = self._llm.generate(
|
|
system_prompt=system_prompt,
|
|
user_prompt=user_prompt,
|
|
response_schema=GeneratedPost,
|
|
)
|
|
return post
|
|
|
|
def _select_prompt_template(self, formato: str, tipo: str) -> str:
|
|
"""Mappa formato_narrativo + tipo_contenuto al nome del file prompt.
|
|
|
|
Logica di selezione:
|
|
1. Cerca corrispondenza esatta in _FORMAT_TO_PROMPT
|
|
2. Se non trovata, usa il fallback globale _DEFAULT_PROMPT
|
|
|
|
Args:
|
|
formato: Formato narrativo (es. "PAS", "AIDA", "BAB").
|
|
tipo: Tipo di contenuto (es. "valore", "promozione").
|
|
|
|
Returns:
|
|
Nome del file prompt senza estensione (es. "pas_valore").
|
|
"""
|
|
# Prova prima la combinazione formato + tipo (per future specializzazioni)
|
|
combined_key = f"{formato.lower()}_{tipo.lower()}"
|
|
if combined_key in {v for v in _FORMAT_TO_PROMPT.values()}:
|
|
return combined_key
|
|
|
|
# Altrimenti usa la mappa formato -> prompt
|
|
return _FORMAT_TO_PROMPT.get(formato, _DEFAULT_PROMPT)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Persistenza su disco
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _save_job_to_disk(self, job: JobStatus) -> None:
|
|
"""Salva i metadata del job in JSON per ricaricamento dopo restart.
|
|
|
|
Args:
|
|
job: JobStatus da persistere.
|
|
"""
|
|
self._outputs_path.mkdir(parents=True, exist_ok=True)
|
|
job_path = self._outputs_path / f"{job.job_id}.json"
|
|
|
|
# Serializza con Pydantic per gestire Optional e Literal
|
|
data = {
|
|
"job_id": job.job_id,
|
|
"status": job.status,
|
|
"total": job.total,
|
|
"completed": job.completed,
|
|
"current_post": job.current_post,
|
|
"campagna": job.campagna,
|
|
"error": job.error,
|
|
"results": [r.model_dump() for r in job.results],
|
|
"calendar": job.calendar.model_dump() if job.calendar else None,
|
|
}
|
|
|
|
with open(job_path, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
|
logger.debug("Job salvato su disco | job_id=%s | path=%s", job.job_id, job_path)
|
|
|
|
def _load_job_from_disk(self, job_id: str) -> Optional[JobStatus]:
|
|
"""Carica il job dal file JSON su disco.
|
|
|
|
Args:
|
|
job_id: Identificatore del job.
|
|
|
|
Returns:
|
|
JobStatus ricostruito, o None se il file non esiste.
|
|
"""
|
|
from backend.schemas.calendar import CalendarResponse
|
|
|
|
job_path = self._outputs_path / f"{job_id}.json"
|
|
if not job_path.exists():
|
|
return None
|
|
|
|
try:
|
|
with open(job_path, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
results = [PostResult.model_validate(r) for r in data.get("results", [])]
|
|
calendar = (
|
|
CalendarResponse.model_validate(data["calendar"])
|
|
if data.get("calendar")
|
|
else None
|
|
)
|
|
|
|
job_status = JobStatus(
|
|
job_id=data["job_id"],
|
|
status=data["status"],
|
|
total=data["total"],
|
|
completed=data["completed"],
|
|
current_post=data.get("current_post", 0),
|
|
results=results,
|
|
calendar=calendar,
|
|
error=data.get("error"),
|
|
campagna=data.get("campagna", ""),
|
|
)
|
|
|
|
# Metti in memoria per accesso futuro
|
|
self._jobs[job_id] = job_status
|
|
return job_status
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Errore caricamento job da disco | job_id=%s | errore=%s",
|
|
job_id,
|
|
str(e),
|
|
)
|
|
return None
|